Big Data 131 - Flink CEP Practice: 24 Hours ≥5 Transactions & 10 Minutes Unpaid Detection Cases

Flink CEP (Complex Event Processing) is an extension library provided by Apache Flink for real-time complex event processing. Through Flink CEP, developers can identify specific event patterns from stream data. This is very useful in fraud detection, network security, real-time monitoring, IoT and other scenarios.

Basic Concepts

The core of Flink CEP is defining event patterns to detect complex event sequences from streams.

CEP allows users to:

  • Define event patterns: Users can describe combinations of events they are interested in (like consecutive events, delayed events, etc.)
  • Match patterns: Flink CEP searches event sequences matching defined patterns from streams
  • Process match results: Once event sequences matching patterns are found, users can define how to process these matches

Basic Components

  • Pattern: Describes event sequences to match in event stream. Can be single event or multiple events combination. Common pattern operations include next (immediate), followedBy (continuation), etc.
  • PatternStream: Transforms event stream into pattern stream by applying pattern definition
  • Select Function: Used to extract matched event sequences from pattern stream

CEP Development Steps

  1. Define event stream: Create a DataStream representing the original event stream
  2. Define event pattern: Use Flink CEP API to define event pattern
  3. Apply pattern to stream: Generate PatternStream
  4. Extract matched events: Use select function to extract events matching pattern

Use Cases

  • Fraud detection: Identify consecutively occurring abnormal behaviors
  • Network monitoring: Detect specific network attack patterns
  • IoT: Analyze sensor data, detect device anomalies
  • User behavior analysis: Analyze user behavior sequences

Case 2: Detect Active Trading Users

Business Requirement

Find users with at least 5 valid transactions within 24 hours.

Implementation Steps

  1. Get data source
  2. Watermark transformation
  3. keyBy transformation
  4. At least 5 times: timeOrMore(5)
  5. Within 24 hours: within(Time.hours(24))
  6. Pattern matching
  7. Extract successfully matched data

Code Implementation

package icu.wzk;

public class FlinkCepActiveUser {

    public static void main(String[] args) throws Exception {
        // 1. Environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        // 2. Construct sample data (username, price, eventTimeMillis)
        DataStreamSource<CepActiveUserBean> data = env.fromElements(
                new CepActiveUserBean("100XX", 0.0D, 1597905234000L),
                new CepActiveUserBean("100XX", 100.0D, 1597905235000L),
                new CepActiveUserBean("100XX", 200.0D, 1597905236000L),
                new CepActiveUserBean("100XX", 300.0D, 1597905237000L),
                new CepActiveUserBean("100XX", 400.0D, 1597905238000L),
                new CepActiveUserBean("100XX", 500.0D, 1597905239000L),
                new CepActiveUserBean("101XX", 0.0D, 1597905240000L),
                new CepActiveUserBean("101XX", 100.0D, 1597905241000L)
        );

        // 3. Event time extraction + custom watermark
        SingleOutputStreamOperator<CepActiveUserBean> watermark = data
                .assignTimestampsAndWatermarks(new WatermarkStrategy<CepActiveUserBean>() {
                    @Override
                    public WatermarkGenerator<CepActiveUserBean> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                        return new WatermarkGenerator<CepActiveUserBean>() {
                            long maxTimestamp = Long.MAX_VALUE;
                            long maxOutOfOrderness = 500L;

                            @Override
                            public void onEvent(CepActiveUserBean event, long eventTimestamp, WatermarkOutput output) {
                                maxTimestamp = Math.max(event.getTimestamp(), maxTimestamp);
                            }

                            @Override
                            public void onPeriodicEmit(WatermarkOutput output) {
                                output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness));
                            }
                        };
                    }
                }.withTimestampAssigner((element, recordTimes) -> element.getTimestamp())
                );

        // 4. Partition by username
        KeyedStream<CepActiveUserBean, String> keyed = watermark
                .keyBy(new KeySelector<CepActiveUserBean, String>() {
                    @Override
                    public String getKey(CepActiveUserBean value) {
                        return value.getUsername();
                    }
                });

        // 5. Define CEP pattern
        Pattern<CepActiveUserBean, CepActiveUserBean> pattern = Pattern
                .<CepActiveUserBean>begin("start")
                .where(new SimpleCondition<CepActiveUserBean>() {
                    @Override
                    public boolean filter(CepActiveUserBean value) {
                        return value.getPrice() > 0;
                    }
                })
                .timesOrMore(5)
                .within(Time.hours(24));

        // 6. Construct PatternStream and process matching results
        PatternStream<CepActiveUserBean> parentStream = CEP.pattern(keyed, pattern);

        SingleOutputStreamOperator<CepActiveUserBean> process = parentStream
                .process(new PatternProcessFunction<CepActiveUserBean, CepActiveUserBean>() {
                    @Override
                    public void processMatch(Map<String, List<CepActiveUserBean>> map,
                                             Context context,
                                             Collector<CepActiveUserBean> collector) {
                        System.out.println("map: " + map);
                    }
                });

        // 7. Sink
        process.print();

        env.execute("FlinkCepActiveUser");
    }
}

/** Simple Event Bean */
class CepActiveUserBean {
    private String username;
    private Double price;
    private Long timestamp;

    public CepActiveUserBean(String username, Double price, Long timestamp) {
        this.username = username;
        this.price = price;
        this.timestamp = timestamp;
    }

    // getter / setter omitted
}

Running Result

map: {start=[CepActiveUserBean{username='100XX', price=100.0, timestamp=1597905235000}...

Case 3: Timeout Unpaid

Business Requirement

Find orders not paid within 10 minutes after placing order

Implementation Steps

  1. Get data source
  2. Transform Watermark
  3. keyBy transformation
  4. Make Pattern (no payment within 10 minutes after order)
  5. Pattern matching
  6. Extract successfully matched data

Code Implementation

package icu.wzk;

public class FlinkCepTimeOutPay {

    public static void main(String[] args) throws Exception {
        // 1. Environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        // 2. Construct sample data: userId, operation, eventTimeMillis
        DataStreamSource<TimeOutPayBean> data = env.fromElements(
                new TimeOutPayBean(1L, "create", 1597905234000L),
                new TimeOutPayBean(1L, "pay",    1597905235000L),
                new TimeOutPayBean(2L, "create", 1597905236000L),
                new TimeOutPayBean(2L, "pay",    1597905237000L),
                new TimeOutPayBean(3L, "create", 1597905239000L) // Unpaid
        );

        // 3. Event time extraction + custom watermark
        DataStream<TimeOutPayBean> watermark = data
                .assignTimestampsAndWatermarks(new WatermarkStrategy<TimeOutPayBean>() {
                    @Override
                    public WatermarkGenerator<TimeOutPayBean> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                        return new WatermarkGenerator<TimeOutPayBean>() {
                            long maxTimestamp = Long.MAX_VALUE;
                            long maxOutOfOrderness = 500L;

                            @Override
                            public void onEvent(TimeOutPayBean e, long ts, WatermarkOutput out) {
                                maxTimestamp = Math.max(maxTimestamp, e.getTimestamp());
                            }

                            @Override
                            public void onPeriodicEmit(WatermarkOutput out) {
                                out.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness));
                            }
                        };
                    }
                }.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
                );

        // 4. Partition by userId
        KeyedStream<TimeOutPayBean, Long> keyedStream = watermark.keyBy(new KeySelector<TimeOutPayBean, Long>() {
            @Override
            public Long getKey(TimeOutPayBean value) { return value.getUserId(); }
        });

        // 5. Define CEP pattern
        OutputTag<TimeOutPayBean> orderTimeoutOutput = new OutputTag<>("orderTimeout") {};
        Pattern<TimeOutPayBean, TimeOutPayBean> pattern = Pattern
                .<TimeOutPayBean>begin("begin")
                .where(new IterativeCondition<TimeOutPayBean>() {
                    @Override
                    public boolean filter(TimeOutPayBean e, Context<TimeOutPayBean> ctx) {
                        return "create".equals(e.getOperation());
                    }
                })
                .followedBy("pay")
                .where(new IterativeCondition<TimeOutPayBean>() {
                    @Override
                    public boolean filter(TimeOutPayBean e, Context<TimeOutPayBean> ctx) {
                        return "pay".equals(e.getOperation());
                    }
                })
                .within(Time.seconds(600)); // 10 minutes

        // 6. Match selection
        PatternStream<TimeOutPayBean> patternStream = CEP.pattern(keyedStream, pattern);
        SingleOutputStreamOperator<TimeOutPayBean> result = patternStream.select(
                orderTimeoutOutput,
                new PatternTimeoutFunction<TimeOutPayBean, TimeOutPayBean>() {
                    @Override
                    public TimeOutPayBean timeout(Map<String, List<TimeOutPayBean>> map, long timeoutTs) {
                        return map.get("begin").get(0);
                    }
                },
                new PatternSelectFunction<TimeOutPayBean, TimeOutPayBean>() {
                    @Override
                    public TimeOutPayBean select(Map<String, List<TimeOutPayBean>> map) {
                        return map.get("pay").get(0);
                    }
                }
        );

        // 7. Output
        DataStream<TimeOutPayBean> sideOutput = result.getSideOutput(orderTimeoutOutput);
        sideOutput.print();

        env.execute("FlinkCepTimeOutPay");
    }
}

/** Input Event */
class TimeOutPayBean {
    private Long userId;
    private String operation;
    private Long timestamp;

    public TimeOutPayBean(Long userId, String operation, Long timestamp) {
        this.userId = userId;
        this.operation = operation;
        this.timestamp = timestamp;
    }
    // getter / setter omitted
}

Running Result

TimeOutPayBean{userId=1, operation='pay', timestamp=1597905235000...

Core Points Summary

ScenarioPattern Configuration
24h≥5 transactionstimesOrMore(5) + within(Time.hours(24))
10min unpaidfollowedBy("pay") + within(Time.seconds(600))

Key Configuration:

  • Watermark: Handle out-of-order events, ensure event time is processed correctly
  • keyBy: CEP must be applied on keyed stream
  • Side Output: Used to handle timeout unmatched cases