广播状态

基本概念

所有并行任务实例(parallel task instances)会将这些状态数据(state data)作为一个持久化的内部状态来维护,而不是通过广播的方式发送给其他流处理器(stream processors)。这种设计使得每个实例都能独立处理状态数据,同时将这些状态与来自广播流(broadcast stream)的事件进行协同处理。

这种新型的广播状态(broadcast state)机制特别适用于以下两种典型场景:

  1. 需要将低吞吐量(low-throughput)数据流与高吞吐量(high-throughput)数据流进行关联处理的场景
  2. 需要动态调整处理逻辑(processing logic)的应用场景

典型案例:实时欺诈检测系统

假设我们正在构建一个实时欺诈检测系统:

  • 常规的交易数据流(transaction stream)作为高吞吐量主数据流
  • 规则更新流(rule update stream)作为低吞吐量的广播流
  • 处理引擎可以动态加载最新的欺诈检测规则,而无需停止或重启处理管道

广播状态允许我们将更新后的检测规则实时分发到所有并行处理实例,这些实例可以立即应用新规则来处理后续的交易数据,从而实现业务逻辑的动态更新。

模式匹配应用示例

场景描述

一个电子商务网站将所有用户的交互捕获为用户操作流:

  • 用户操作流:用户登录、用户注销、添加购物车、完成付款等操作
  • 模式流:定义需要检测的行为模式

模式示例

  • 模式1:用户登录并立即注销而不无需浏览电子商务网站上的其他页面
  • 模式2:用户将商品添加到购物车并在不完成购买的情况下注销

如何使用广播状态

数据定义

// 用户行为类
public static class UserAction {
    private Long userId;
    private String userAction;

    public UserAction(Long userId, String userAction) {
        this.userId = userId;
        this.userAction = userAction;
    }

    public Long getUserId() {
        return userId;
    }

    public String getUserAction() {
        return userAction;
    }
}

// 模式类
public static class MyPattern {
    private String firstAction;
    private String secondAction;

    // 无参构造函数
    public MyPattern() {
    }

    public MyPattern(String firstAction, String secondAction) {
        this.firstAction = firstAction;
        this.secondAction = secondAction;
    }

    public String getFirstAction() {
        return firstAction;
    }

    public void setFirstAction(String firstAction) {
        this.firstAction = firstAction;
    }

    public String getSecondAction() {
        return secondAction;
    }

    public void setSecondAction(String secondAction) {
        this.secondAction = secondAction;
    }
}

完整代码实现

package icu.wzk;

public class BroadCastDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        // 两套数据流 1:用户行为 2:模式
        UserAction user1 = new UserAction(1001L, "login");
        UserAction user2 = new UserAction(1003L, "pay");
        UserAction user3 = new UserAction(1002L, "car");
        UserAction user4 = new UserAction(1001L, "logout");
        UserAction user5 = new UserAction(1003L, "car");
        UserAction user6 = new UserAction(1002L, "logout");

        DataStreamSource<UserAction> actions = env.fromElements(user1, user2, user3, user4, user5, user6);

        MyPattern myPattern1 = new MyPattern("login", "logout");
        MyPattern myPattern2 = new MyPattern("car", "logout");
        DataStreamSource<MyPattern> patterns = env.fromElements(myPattern1, myPattern2);

        KeyedStream<UserAction, Long> keyed = actions
                .keyBy(new KeySelector<UserAction, Long>() {
                    @Override
                    public Long getKey(UserAction value) throws Exception {
                        return value.getUserId();
                    }
                });

        // 将模式流广播到下游的所有算子
        MapStateDescriptor<Void, MyPattern> broadcastStateDescriptor = new MapStateDescriptor<>(
                "patterns", Types.VOID, Types.POJO(MyPattern.class));
        BroadcastStream<MyPattern> broadcastPatterns = patterns.broadcast(broadcastStateDescriptor);

        SingleOutputStreamOperator<Tuple2<Long, MyPattern>> process = keyed
                .connect(broadcastPatterns)
                .process(new PatternEvaluator());

        // 匹配结果输出到控制台
        process.print();
        env.execute("BroadCastDemo");
    }

    // 处理函数
    public static class PatternEvaluator extends KeyedBroadcastProcessFunction<Long, UserAction, MyPattern, Tuple2<Long, MyPattern>> {

        private transient ValueState<String> prevActionState;

        @Override
        public void open(Configuration parameters) throws Exception {
            prevActionState = getRuntimeContext()
                    .getState(new ValueStateDescriptor<>("lastAction", Types.STRING));
        }

        /**
         * 每个一个Action数据,触发一次执行
         */
        @Override
        public void processElement(UserAction value, KeyedBroadcastProcessFunction<Long, UserAction, MyPattern, Tuple2<Long, MyPattern>>.ReadOnlyContext ctx, Collector<Tuple2<Long, MyPattern>> out) throws Exception {
            // 把用户行为流和模式流中的模式进行匹配
            ReadOnlyBroadcastState<Void, MyPattern> patterns = ctx
                    .getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(MyPattern.class)));
            MyPattern myPattern =  patterns.get(null);
            String prevAction = prevActionState.value();
            if (myPattern != null && prevAction != null) {
                if (myPattern.getFirstAction().equals(prevAction) && myPattern.getSecondAction().equals(value.getUserAction())) {
                    // 匹配成功
                    System.out.println("匹配成功: " + ctx.getCurrentKey());
                    out.collect(new Tuple2<>(ctx.getCurrentKey(), myPattern));
                } else {
                    // 匹配失败
                    System.out.println("匹配失败: " + ctx.getCurrentKey());
                }
            }
            prevActionState.update(value.getUserAction());
        }

        /**
         * 每次来一个模式 Pattern 的时候触发
         */
        @Override
        public void processBroadcastElement(MyPattern value, KeyedBroadcastProcessFunction<Long, UserAction, MyPattern, Tuple2<Long, MyPattern>>.Context ctx, Collector<Tuple2<Long, MyPattern>> out) throws Exception {
            BroadcastState<Void, MyPattern> broadcastState = ctx
                    .getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(MyPattern.class)));
            broadcastState.put(null, value);
        }
    }
}

核心方法说明

processBroadcastElement()

为广播流的每个记录调用,该函数负责处理广播流中的更新数据(例如最新的模式规则):

  1. 从广播流中提取模式规则
  2. 将模式规则以键值形式存入BroadcastState
  3. 确保每次更新都会覆盖之前的模式

processElement()

为键控流的每个记录调用,这是核心的处理逻辑所在:

  • 只能读取广播状态(系统自动保证广播状态的只读访问,防止并行实例间状态不一致)
  • 从广播状态获取当前生效的模式规则
  • 从键控状态(KeyedState)中获取该用户的历史行为记录
  • 执行模式匹配检测
  • 将用户当前行为更新到键控状态中

onTimer()

在预注册的定时器触发时调用,主要用途:

  • 延迟计算(如窗口关闭后的聚合操作)
  • 状态清理(避免长期不活跃用户占用内存资源)

运行测试

测试数据

  • 用户1001:login -> logout
  • 用户1002:car -> logout
  • 用户1003:pay -> car

输出结果

匹配失败: 1003
匹配成功: 1001
匹配失败: 1002
3> (1001,icu.wzk.BroadCastDemo$MyPattern@6d1e6dc7)

只有用户1001匹配成功(login -> logout),因为模式是 login->logout,而用户1002和用户1003的操作序列不匹配。