大数据-131 Flink CEP 实战 24 小时≥5 次交易 & 10 分钟未支付检测 案例附代码

Flink CEP(Complex Event Processing)是Apache Flink提供的一个扩展库,用于实时复杂事件处理。通过Flink CEP,开发者可以从流数据中识别出特定的事件模式。这在欺诈检测网络安全实时监控物联网等场景中非常有用。

基本概念

Flink CEP的核心是通过定义事件模式,从流中检测复杂事件序列。

CEP允许用户:

  • 定义事件模式:用户可以描述感兴趣的事件组合(如连续事件、延迟事件等)
  • 匹配模式:Flink CEP从流中搜索与定义模式相匹配的事件序列
  • 处理匹配结果:一旦找到符合模式的事件序列,用户可以定义如何处理这些匹配

基本组成部分

  • Pattern(模式):描述要在事件流中匹配的事件序列。可以是单个事件或多个事件的组合。常用的模式操作包括next(紧邻)、followedBy(接续)等
  • PatternStream(模式流):通过应用模式定义,将事件流转变为模式流
  • Select函数:用于从模式流中提取匹配的事件序列

CEP开发步骤

  1. 定义事件流:创建一个DataStream,表示原始的事件流
  2. 定义事件模式:使用Flink CEP的API定义事件模式
  3. 将模式应用到流中:生成模式流PatternStream
  4. 提取匹配事件:使用select函数提取匹配模式的事件

使用场景

  • 欺诈检测:识别连续发生的异常行为
  • 网络监控:检测特定网络攻击模式
  • 物联网:分析传感器数据,检测设备异常
  • 用户行为分析:分析用户行为序列

案例2:检测交易活跃用户

业务需求

找出24小时内,至少5次有效交易的用户。

实现步骤

  1. 获取数据源
  2. Watermark转化
  3. keyBy转化
  4. 至少5次:timeOrMore(5)
  5. 24小时之内:within(Time.hours(24))
  6. 模式匹配
  7. 提取匹配成功的数据

代码实现

package icu.wzk;

public class FlinkCepActiveUser {

    public static void main(String[] args) throws Exception {
        // 1. 环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        // 2. 构造示例数据(username, price, eventTimeMillis)
        DataStreamSource<CepActiveUserBean> data = env.fromElements(
                new CepActiveUserBean("100XX", 0.0D, 1597905234000L),
                new CepActiveUserBean("100XX", 100.0D, 1597905235000L),
                new CepActiveUserBean("100XX", 200.0D, 1597905236000L),
                new CepActiveUserBean("100XX", 300.0D, 1597905237000L),
                new CepActiveUserBean("100XX", 400.0D, 1597905238000L),
                new CepActiveUserBean("100XX", 500.0D, 1597905239000L),
                new CepActiveUserBean("101XX", 0.0D, 1597905240000L),
                new CepActiveUserBean("101XX", 100.0D, 1597905241000L)
        );

        // 3. 事件时间抽取 + 自定义水位线
        SingleOutputStreamOperator<CepActiveUserBean> watermark = data
                .assignTimestampsAndWatermarks(new WatermarkStrategy<CepActiveUserBean>() {
                    @Override
                    public WatermarkGenerator<CepActiveUserBean> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                        return new WatermarkGenerator<CepActiveUserBean>() {
                            long maxTimestamp = Long.MAX_VALUE;
                            long maxOutOfOrderness = 500L;

                            @Override
                            public void onEvent(CepActiveUserBean event, long eventTimestamp, WatermarkOutput output) {
                                maxTimestamp = Math.max(event.getTimestamp(), maxTimestamp);
                            }

                            @Override
                            public void onPeriodicEmit(WatermarkOutput output) {
                                output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness));
                            }
                        };
                    }
                }.withTimestampAssigner((element, recordTimes) -> element.getTimestamp())
                );

        // 4. 按 username 分区
        KeyedStream<CepActiveUserBean, String> keyed = watermark
                .keyBy(new KeySelector<CepActiveUserBean, String>() {
                    @Override
                    public String getKey(CepActiveUserBean value) {
                        return value.getUsername();
                    }
                });

        // 5. 定义 CEP 模式
        Pattern<CepActiveUserBean, CepActiveUserBean> pattern = Pattern
                .<CepActiveUserBean>begin("start")
                .where(new SimpleCondition<CepActiveUserBean>() {
                    @Override
                    public boolean filter(CepActiveUserBean value) {
                        return value.getPrice() > 0;
                    }
                })
                .timesOrMore(5)
                .within(Time.hours(24));

        // 6. 构造 PatternStream 并处理匹配结果
        PatternStream<CepActiveUserBean> parentStream = CEP.pattern(keyed, pattern);

        SingleOutputStreamOperator<CepActiveUserBean> process = parentStream
                .process(new PatternProcessFunction<CepActiveUserBean, CepActiveUserBean>() {
                    @Override
                    public void processMatch(Map<String, List<CepActiveUserBean>> map,
                                             Context context,
                                             Collector<CepActiveUserBean> collector) {
                        System.out.println("map: " + map);
                    }
                });

        // 7. Sink
        process.print();

        env.execute("FlinkCepActiveUser");
    }
}

/** 简单事件 Bean */
class CepActiveUserBean {
    private String username;
    private Double price;
    private Long timestamp;

    public CepActiveUserBean(String username, Double price, Long timestamp) {
        this.username = username;
        this.price = price;
        this.timestamp = timestamp;
    }

    // getter / setter 省略
}

运行结果

map: {start=[CepActiveUserBean{username='100XX', price=100.0, timestamp=1597905235000}...

案例3:超时未支付

业务需求

找出下单后10分钟没有支付的订单

实现步骤

  1. 获取数据源
  2. 转 Watermark
  3. keyBy 转化
  4. 做出 Pattern(下单以后10分钟未支付)
  5. 模式匹配
  6. 取出匹配成功的数据

代码实现

package icu.wzk;

public class FlinkCepTimeOutPay {

    public static void main(String[] args) throws Exception {
        // 1. 环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        // 2. 构造示例数据:userId, operation, eventTimeMillis
        DataStreamSource<TimeOutPayBean> data = env.fromElements(
                new TimeOutPayBean(1L, "create", 1597905234000L),
                new TimeOutPayBean(1L, "pay",    1597905235000L),
                new TimeOutPayBean(2L, "create", 1597905236000L),
                new TimeOutPayBean(2L, "pay",    1597905237000L),
                new TimeOutPayBean(3L, "create", 1597905239000L) // 未支付
        );

        // 3. 事件时间抽取 + 自定义水位线
        DataStream<TimeOutPayBean> watermark = data
                .assignTimestampsAndWatermarks(new WatermarkStrategy<TimeOutPayBean>() {
                    @Override
                    public WatermarkGenerator<TimeOutPayBean> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                        return new WatermarkGenerator<TimeOutPayBean>() {
                            long maxTimestamp = Long.MAX_VALUE;
                            long maxOutOfOrderness = 500L;

                            @Override
                            public void onEvent(TimeOutPayBean e, long ts, WatermarkOutput out) {
                                maxTimestamp = Math.max(maxTimestamp, e.getTimestamp());
                            }

                            @Override
                            public void onPeriodicEmit(WatermarkOutput out) {
                                out.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness));
                            }
                        };
                    }
                }.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
                );

        // 4. 按 userId 分区
        KeyedStream<TimeOutPayBean, Long> keyedStream = watermark.keyBy(new KeySelector<TimeOutPayBean, Long>() {
            @Override
            public Long getKey(TimeOutPayBean value) { return value.getUserId(); }
        });

        // 5. 定义 CEP 模式
        OutputTag<TimeOutPayBean> orderTimeoutOutput = new OutputTag<>("orderTimeout") {};
        Pattern<TimeOutPayBean, TimeOutPayBean> pattern = Pattern
                .<TimeOutPayBean>begin("begin")
                .where(new IterativeCondition<TimeOutPayBean>() {
                    @Override
                    public boolean filter(TimeOutPayBean e, Context<TimeOutPayBean> ctx) {
                        return "create".equals(e.getOperation());
                    }
                })
                .followedBy("pay")
                .where(new IterativeCondition<TimeOutPayBean>() {
                    @Override
                    public boolean filter(TimeOutPayBean e, Context<TimeOutPayBean> ctx) {
                        return "pay".equals(e.getOperation());
                    }
                })
                .within(Time.seconds(600)); // 10 分钟

        // 6. 匹配选择
        PatternStream<TimeOutPayBean> patternStream = CEP.pattern(keyedStream, pattern);
        SingleOutputStreamOperator<TimeOutPayBean> result = patternStream.select(
                orderTimeoutOutput,
                new PatternTimeoutFunction<TimeOutPayBean, TimeOutPayBean>() {
                    @Override
                    public TimeOutPayBean timeout(Map<String, List<TimeOutPayBean>> map, long timeoutTs) {
                        return map.get("begin").get(0);
                    }
                },
                new PatternSelectFunction<TimeOutPayBean, TimeOutPayBean>() {
                    @Override
                    public TimeOutPayBean select(Map<String, List<TimeOutPayBean>> map) {
                        return map.get("pay").get(0);
                    }
                }
        );

        // 7. 输出
        DataStream<TimeOutPayBean> sideOutput = result.getSideOutput(orderTimeoutOutput);
        sideOutput.print();

        env.execute("FlinkCepTimeOutPay");
    }
}

/** 输入事件 */
class TimeOutPayBean {
    private Long userId;
    private String operation;
    private Long timestamp;

    public TimeOutPayBean(Long userId, String operation, Long timestamp) {
        this.userId = userId;
        this.operation = operation;
        this.timestamp = timestamp;
    }
    // getter / setter 省略
}

运行结果

TimeOutPayBean{userId=1, operation='pay', timestamp=1597905235000...

核心要点总结

场景模式配置
24h≥5次交易timesOrMore(5) + within(Time.hours(24))
10分钟未支付followedBy("pay") + within(Time.seconds(600))

关键配置:

  • Watermark:处理乱序事件,确保事件时间正确处理
  • keyBy:CEP必须在keyed流上应用
  • 侧输出流:用于处理超时未匹配的情况