Broadcast State
Basic Concepts
All parallel task instances maintain these state data as persistent internal state, not sent to other stream processors via broadcast. This design enables each instance to independently process state data while collaborating with events from broadcast stream.
This new broadcast state mechanism is particularly suitable for two typical scenarios:
- Scenarios requiring associating low-throughput data stream with high-throughput data stream
- Application scenarios requiring dynamic adjustment of processing logic
Typical Case: Real-Time Fraud Detection System
Assuming we are building a real-time fraud detection system:
- Regular transaction data stream as high-throughput main data stream
- Rule update stream as low-throughput broadcast stream
- Processing engine can dynamically load latest fraud detection rules without stopping or restarting processing pipeline
Broadcast state allows us to distribute updated detection rules to all parallel processing instances in real-time. These instances can immediately apply new rules to process subsequent transaction data, enabling dynamic business logic updates.
Pattern Matching Application Example
Scenario Description
An e-commerce website captures all user interactions as user action streams:
- User Action Stream: User login, logout, add to cart, complete payment, etc.
- Pattern Stream: Defines behavior patterns to detect
Pattern Examples:
- Pattern 1: User logs in and immediately logs out without browsing other pages on the e-commerce site
- Pattern 2: User adds product to cart and logs out without completing purchase
How to Use Broadcast State
Data Definition
// User action class
public static class UserAction {
private Long userId;
private String userAction;
public UserAction(Long userId, String userAction) {
this.userId = userId;
this.userAction = userAction;
}
public Long getUserId() {
return userId;
}
public String getUserAction() {
return userAction;
}
}
// Pattern class
public static class MyPattern {
private String firstAction;
private String secondAction;
// No-arg constructor
public MyPattern() {
}
public MyPattern(String firstAction, String secondAction) {
this.firstAction = firstAction;
this.secondAction = secondAction;
}
public String getFirstAction() {
return firstAction;
}
public void setFirstAction(String firstAction) {
this.firstAction = firstAction;
}
public String getSecondAction() {
return secondAction;
}
public void setSecondAction(String secondAction) {
this.secondAction = secondAction;
}
}
Complete Code Implementation
package icu.wzk;
public class BroadCastDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// Two sets of data streams: 1: user action 2: pattern
UserAction user1 = new UserAction(1001L, "login");
UserAction user2 = new UserAction(1003L, "pay");
UserAction user3 = new UserAction(1002L, "car");
UserAction user4 = new UserAction(1001L, "logout");
UserAction user5 = new UserAction(1003L, "car");
UserAction user6 = new UserAction(1002L, "logout");
DataStreamSource<UserAction> actions = env.fromElements(user1, user2, user3, user4, user5, user6);
MyPattern myPattern1 = new MyPattern("login", "logout");
MyPattern myPattern2 = new MyPattern("car", "logout");
DataStreamSource<MyPattern> patterns = env.fromElements(myPattern1, myPattern2);
KeyedStream<UserAction, Long> keyed = actions
.keyBy(new KeySelector<UserAction, Long>() {
@Override
public Long getKey(UserAction value) throws Exception {
return value.getUserId();
}
});
// Broadcast pattern stream to all downstream operators
MapStateDescriptor<Void, MyPattern> broadcastStateDescriptor = new MapStateDescriptor<>(
"patterns", Types.VOID, Types.POJO(MyPattern.class));
BroadcastStream<MyPattern> broadcastPatterns = patterns.broadcast(broadcastStateDescriptor);
SingleOutputStreamOperator<Tuple2<Long, MyPattern>> process = keyed
.connect(broadcastPatterns)
.process(new PatternEvaluator());
// Output matched results to console
process.print();
env.execute("BroadCastDemo");
}
// Processing function
public static class PatternEvaluator extends KeyedBroadcastProcessFunction<Long, UserAction, MyPattern, Tuple2<Long, MyPattern>> {
private transient ValueState<String> prevActionState;
@Override
public void open(Configuration parameters) throws Exception {
prevActionState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("lastAction", Types.STRING));
}
/**
* Each action data triggers one execution
*/
@Override
public void processElement(UserAction value, KeyedBroadcastProcessFunction<Long, UserAction, MyPattern, Tuple2<Long, MyPattern>>.ReadOnlyContext ctx, Collector<Tuple2<Long, MyPattern>> out) throws Exception {
// Match user action stream with patterns from pattern stream
ReadOnlyBroadcastState<Void, MyPattern> patterns = ctx
.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(MyPattern.class)));
MyPattern myPattern = patterns.get(null);
String prevAction = prevActionState.value();
if (myPattern != null && prevAction != null) {
if (myPattern.getFirstAction().equals(prevAction) && myPattern.getSecondAction().equals(value.getUserAction())) {
// Match success
System.out.println("Match success: " + ctx.getCurrentKey());
out.collect(new Tuple2<>(ctx.getCurrentKey(), myPattern));
} else {
// Match failed
System.out.println("Match failed: " + ctx.getCurrentKey());
}
}
prevActionState.update(value.getUserAction());
}
/**
* Triggered each time a pattern arrives
*/
@Override
public void processBroadcastElement(MyPattern value, KeyedBroadcastProcessFunction<Long, UserAction, MyPattern, Tuple2<Long, MyPattern>>.Context ctx, Collector<Tuple2<Long, MyPattern>> out) throws Exception {
BroadcastState<Void, MyPattern> broadcastState = ctx
.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(MyPattern.class)));
broadcastState.put(null, value);
}
}
}
Core Method Explanation
processBroadcastElement()
Called for each record in the broadcast stream, responsible for handling updated data in broadcast stream (e.g., latest pattern rules):
- Extract pattern rules from broadcast stream
- Store pattern rules as key-value in
BroadcastState - Ensure each update overwrites previous pattern
processElement()
Called for each record in keyed stream—this is where core processing logic resides:
- Can only read broadcast state (system automatically ensures read-only access to prevent state inconsistency between parallel instances)
- Get current effective pattern rules from broadcast state
- Get user’s historical behavior records from keyed state
- Execute pattern matching detection
- Update user’s current behavior to keyed state
onTimer()
Called when pre-registered timer triggers, main uses:
- Delayed calculation (e.g., aggregation after window closes)
- State cleanup (avoid long-inactive users consuming memory resources)
Run Test
Test Data:
- User 1001: login -> logout
- User 1002: car -> logout
- User 1003: pay -> car
Output Results:
Match failed: 1003
Match success: 1001
Match failed: 1002
3> (1001,icu.wzk.BroadCastDemo$MyPattern@6d1e6dc7)
Only user 1001 matched successfully (login -> logout), because pattern is login->logout, while user 1002 and user 1003’s action sequences don’t match.