Broadcast State

Basic Concepts

All parallel task instances maintain these state data as persistent internal state, not sent to other stream processors via broadcast. This design enables each instance to independently process state data while collaborating with events from broadcast stream.

This new broadcast state mechanism is particularly suitable for two typical scenarios:

  1. Scenarios requiring associating low-throughput data stream with high-throughput data stream
  2. Application scenarios requiring dynamic adjustment of processing logic

Typical Case: Real-Time Fraud Detection System

Assuming we are building a real-time fraud detection system:

  • Regular transaction data stream as high-throughput main data stream
  • Rule update stream as low-throughput broadcast stream
  • Processing engine can dynamically load latest fraud detection rules without stopping or restarting processing pipeline

Broadcast state allows us to distribute updated detection rules to all parallel processing instances in real-time. These instances can immediately apply new rules to process subsequent transaction data, enabling dynamic business logic updates.

Pattern Matching Application Example

Scenario Description

An e-commerce website captures all user interactions as user action streams:

  • User Action Stream: User login, logout, add to cart, complete payment, etc.
  • Pattern Stream: Defines behavior patterns to detect

Pattern Examples:

  • Pattern 1: User logs in and immediately logs out without browsing other pages on the e-commerce site
  • Pattern 2: User adds product to cart and logs out without completing purchase

How to Use Broadcast State

Data Definition

// User action class
public static class UserAction {
    private Long userId;
    private String userAction;

    public UserAction(Long userId, String userAction) {
        this.userId = userId;
        this.userAction = userAction;
    }

    public Long getUserId() {
        return userId;
    }

    public String getUserAction() {
        return userAction;
    }
}

// Pattern class
public static class MyPattern {
    private String firstAction;
    private String secondAction;

    // No-arg constructor
    public MyPattern() {
    }

    public MyPattern(String firstAction, String secondAction) {
        this.firstAction = firstAction;
        this.secondAction = secondAction;
    }

    public String getFirstAction() {
        return firstAction;
    }

    public void setFirstAction(String firstAction) {
        this.firstAction = firstAction;
    }

    public String getSecondAction() {
        return secondAction;
    }

    public void setSecondAction(String secondAction) {
        this.secondAction = secondAction;
    }
}

Complete Code Implementation

package icu.wzk;

public class BroadCastDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        // Two sets of data streams: 1: user action 2: pattern
        UserAction user1 = new UserAction(1001L, "login");
        UserAction user2 = new UserAction(1003L, "pay");
        UserAction user3 = new UserAction(1002L, "car");
        UserAction user4 = new UserAction(1001L, "logout");
        UserAction user5 = new UserAction(1003L, "car");
        UserAction user6 = new UserAction(1002L, "logout");

        DataStreamSource<UserAction> actions = env.fromElements(user1, user2, user3, user4, user5, user6);

        MyPattern myPattern1 = new MyPattern("login", "logout");
        MyPattern myPattern2 = new MyPattern("car", "logout");
        DataStreamSource<MyPattern> patterns = env.fromElements(myPattern1, myPattern2);

        KeyedStream<UserAction, Long> keyed = actions
                .keyBy(new KeySelector<UserAction, Long>() {
                    @Override
                    public Long getKey(UserAction value) throws Exception {
                        return value.getUserId();
                    }
                });

        // Broadcast pattern stream to all downstream operators
        MapStateDescriptor<Void, MyPattern> broadcastStateDescriptor = new MapStateDescriptor<>(
                "patterns", Types.VOID, Types.POJO(MyPattern.class));
        BroadcastStream<MyPattern> broadcastPatterns = patterns.broadcast(broadcastStateDescriptor);

        SingleOutputStreamOperator<Tuple2<Long, MyPattern>> process = keyed
                .connect(broadcastPatterns)
                .process(new PatternEvaluator());

        // Output matched results to console
        process.print();
        env.execute("BroadCastDemo");
    }

    // Processing function
    public static class PatternEvaluator extends KeyedBroadcastProcessFunction<Long, UserAction, MyPattern, Tuple2<Long, MyPattern>> {

        private transient ValueState<String> prevActionState;

        @Override
        public void open(Configuration parameters) throws Exception {
            prevActionState = getRuntimeContext()
                    .getState(new ValueStateDescriptor<>("lastAction", Types.STRING));
        }

        /**
         * Each action data triggers one execution
         */
        @Override
        public void processElement(UserAction value, KeyedBroadcastProcessFunction<Long, UserAction, MyPattern, Tuple2<Long, MyPattern>>.ReadOnlyContext ctx, Collector<Tuple2<Long, MyPattern>> out) throws Exception {
            // Match user action stream with patterns from pattern stream
            ReadOnlyBroadcastState<Void, MyPattern> patterns = ctx
                    .getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(MyPattern.class)));
            MyPattern myPattern = patterns.get(null);
            String prevAction = prevActionState.value();
            if (myPattern != null && prevAction != null) {
                if (myPattern.getFirstAction().equals(prevAction) && myPattern.getSecondAction().equals(value.getUserAction())) {
                    // Match success
                    System.out.println("Match success: " + ctx.getCurrentKey());
                    out.collect(new Tuple2<>(ctx.getCurrentKey(), myPattern));
                } else {
                    // Match failed
                    System.out.println("Match failed: " + ctx.getCurrentKey());
                }
            }
            prevActionState.update(value.getUserAction());
        }

        /**
         * Triggered each time a pattern arrives
         */
        @Override
        public void processBroadcastElement(MyPattern value, KeyedBroadcastProcessFunction<Long, UserAction, MyPattern, Tuple2<Long, MyPattern>>.Context ctx, Collector<Tuple2<Long, MyPattern>> out) throws Exception {
            BroadcastState<Void, MyPattern> broadcastState = ctx
                    .getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(MyPattern.class)));
            broadcastState.put(null, value);
        }
    }
}

Core Method Explanation

processBroadcastElement()

Called for each record in the broadcast stream, responsible for handling updated data in broadcast stream (e.g., latest pattern rules):

  1. Extract pattern rules from broadcast stream
  2. Store pattern rules as key-value in BroadcastState
  3. Ensure each update overwrites previous pattern

processElement()

Called for each record in keyed stream—this is where core processing logic resides:

  • Can only read broadcast state (system automatically ensures read-only access to prevent state inconsistency between parallel instances)
  • Get current effective pattern rules from broadcast state
  • Get user’s historical behavior records from keyed state
  • Execute pattern matching detection
  • Update user’s current behavior to keyed state

onTimer()

Called when pre-registered timer triggers, main uses:

  • Delayed calculation (e.g., aggregation after window closes)
  • State cleanup (avoid long-inactive users consuming memory resources)

Run Test

Test Data:

  • User 1001: login -> logout
  • User 1002: car -> logout
  • User 1003: pay -> car

Output Results:

Match failed: 1003
Match success: 1001
Match failed: 1002
3> (1001,icu.wzk.BroadCastDemo$MyPattern@6d1e6dc7)

Only user 1001 matched successfully (login -> logout), because pattern is login->logout, while user 1002 and user 1003’s action sequences don’t match.