This is article 95 in the Big Data series, introducing Flink stateful computation and fault tolerance mechanisms.
Full illustrated version: CSDN Original | Juejin
State Types
Flink divides computation into two types based on whether intermediate results need to be saved: stateful and stateless computation.
1. Stateful Computation
Characteristics: Need to save intermediate results or state, rely on previous or subsequent events for data processing
State Types:
- Keyed State: State associated with specific keys
- Operator State: State bound to operator instances
Typical Application Scenarios:
- Window aggregation (e.g., calculate average temperature within 5 minutes)
- Pattern detection (e.g., detect abnormal login sequences)
- Data deduplication (e.g., e-commerce order deduplication)
State Backend Support:
- MemoryStateBackend (memory)
- FsStateBackend (file system)
- RocksDBStateBackend (embedded database)
2. Stateless Computation
Characteristics: Each data record is processed independently, no need to save intermediate state
Typical Operators:
- Map (one-to-one transformation)
- Filter (data filtering)
- FlatMap (one-to-many transformation)
Application Scenarios:
- Simple data format transformation
- Data filtering (e.g., filter debug info from logs)
- Data splitting (e.g., split CSV lines into multiple fields)
Advantages: High execution efficiency, low resource consumption, linearly scalable
State Types
According to different data structures, Flink defines multiple States:
- ValueState: Single value state of type T, bound to corresponding key. Simplest state. Can update state value via update method, get state value via value() method
- ListState: State value is a list on the key. Can add values to list via add method, return Iterable via get() to iterate state values
- ReducingState: This state uses user-provided ReduceFunction. When add method is called, calls ReduceFunction and merges to a single state value
- FoldingState: Similar to ReducingState, but state value type can differ from element type added via add method (will be removed in future Flink versions)
- MapState: State value is a Map. User adds elements via put and putAll methods
State is divided by whether it has Key:
- KeyedState
- OperatorState
Case: Using State to Calculate Average
Implementation Approach
- Read data source
- Group data source by Key
- For keyed stream data, call stateful processing: instantiate a state instance, update state as stream data arrives, output result
Write Code
package icu.wzk;
public class FlinkStateTest01 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Tuple2<Long, Long>> data = env
.fromElements(
Tuple2.of(1L, 3L),
Tuple2.of(1L, 5L),
Tuple2.of(1L, 7L),
Tuple2.of(1L, 4L),
Tuple2.of(1L, 2L)
);
KeyedStream<Tuple2<Long, Long>, Long> keyed = data
.keyBy(new KeySelector<Tuple2<Long, Long>, Long>() {
@Override
public Long getKey(Tuple2<Long, Long> value) throws Exception {
return value.f0;
}
});
SingleOutputStreamOperator<Tuple2<Long, Long>> flatMapped = keyed
.flatMap(new RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
private transient ValueState<Tuple2<Long, Long>> sum;
@Override
public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
Tuple2<Long, Long> currentSum = sum.value();
if (currentSum == null) {
currentSum = Tuple2.of(0L, 0L);
}
// Update
currentSum.f0 += 1L;
currentSum.f1 += value.f1;
System.out.println("currentValue: " + currentSum);
// Update state value
sum.update(currentSum);
// If count >= 5 clear state value and recalculate
if (currentSum.f0 >= 5) {
out.collect(new Tuple2<>(value.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>(
"average",
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})
);
sum = getRuntimeContext().getState(descriptor);
}
});
flatMapped.print();
env.execute("Flink State Test");
}
}
Run Results
Program outputs cumulative result after processing each piece of data. When count >= 5, outputs average and clears state.
Keyed State
Keyed State is a type of state in Flink associated with Keys. It can only be applied to Functions and Operators corresponding to KeyedStream type datasets. KeyedState is actually a special case of OperatorState—the key difference is that KeyedState pre-partitions the dataset by Key, making each KeyedState correspond only to a specific Operator and Key combination.
Core Features
- Key Association: Each state is strictly bound to a specific Key, data with the same Key accesses the same state instance
- Automatic Partitioning: State is automatically partitioned by Key, ensuring data with the same Key is always routed to the same parallel task instance
- Efficient Access: Through Key, can directly locate corresponding state, avoiding full scan
Management Mechanism
KeyedState is managed through KeyGroups mechanism, serving these scenarios:
- Dynamic Scaling: When operator parallelism changes, system automatically redistributes KeyedState data
- Fault Recovery: Ensures state is correctly assigned to new task instances during recovery
KeyGroups Working Principle
- Grouping Strategy: System distributes all possible Keys to fixed number of KeyGroups via hashing, etc.
- Dynamic Allocation: At runtime, a keyed operator instance may be responsible for processing one or more KeyGroups’ Keys
- Rebalancing: When parallelism changes, system recalculates KeyGroups to task instance mapping
Typical Application Scenarios
- Real-time aggregation calculation: e.g., calculate each user’s click count
// Example: Use ValueState to implement user click statistics
public class UserClickCounter extends KeyedProcessFunction<String, ClickEvent, Tuple2<String, Integer>> {
private ValueState<Integer> countState;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Integer> descriptor =
new ValueStateDescriptor<>("clickCount", Integer.class);
countState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(ClickEvent event, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
Integer currentCount = countState.value();
if (currentCount == null) {
currentCount = 0;
}
currentCount++;
countState.update(currentCount);
out.collect(new Tuple2<>(ctx.getCurrentKey(), currentCount));
}
}
- Pattern detection: e.g., detect user abnormal login behavior
- Session window processing: Track user session activity
State Types
Flink provides multiple KeyedState implementations:
- ValueState: Store single value
- ListState: Store list of values
- MapState: Store key-value mapping
- ReducingState: Store aggregation result
- AggregatingState: Support more complex aggregation
Performance Considerations
- State backend selection: Can choose MemoryStateBackend, FsStateBackend, or RocksDBStateBackend
- State serialization: Optimizing state serialization can significantly improve performance
- State cleanup: Reasonably set state TTL to avoid infinite state growth
Operator State
Operator State is an important state type in Flink, fundamentally different from Keyed State. Operator State is directly bound to specific operator instances, not dependent on keys in data.
Characteristics
-
State allocation mechanism:
- Each operator instance maintains state for data it processes
- State data is partitioned and stored across operator instances
- When parallelism changes, Flink automatically redistributes state data
-
Parallelism adaptability:
- Supports dynamic scaling up/down scenarios
- Provides three built-in redistribution strategies:
- Even-split redistribution: Evenly divide state to all new operator instances
- Union redistribution: Copy complete state to each new instance
- Custom redistribution: Implement via
ListCheckpointedinterface
-
Typical application scenarios:
- Source operators (e.g., Kafka Consumer) record consumption offsets
- Window operators maintain window trigger state
- Custom aggregate operators save intermediate results
State Management Forms
In Flink, both KeyedState and OperatorState support two management forms:
Managed State
This is Flink’s recommended usage, with these characteristics:
- Runtime management: Unified management by Flink runtime environment
- Storage optimization:
- Automatically convert to efficient in-memory data structures (e.g., HashTables)
- Optional RocksDB as state backend for large state storage
- Persistence mechanism:
- Periodically persist state through Checkpoint mechanism
- Provide exactly-once state guarantee
- Use efficient serialization framework (e.g., Kryo) for state serialization
- Recovery process:
- Automatically recover from last successful Checkpoint on task failure
- Support incremental Checkpoint to reduce recovery time
Raw State
This form provides greater flexibility but requires developers to take more responsibility:
- Self-management: Operator needs to maintain state data structure
- Serialization requirements:
- Developer needs to implement state serialization logic
- Need to handle version compatibility issues
- Checkpoint handling:
- Need explicit state snapshot when Checkpoint triggers
- Need to deserialize state data manually during recovery
- Typical use cases:
- Scenarios requiring special data structure optimization
- Integrating third-party libraries requiring specific state format
State Backend Comparison
For managed state, Flink provides multiple state backend implementations:
| Feature | MemoryStateBackend | FsStateBackend | RocksDBStateBackend |
|---|---|---|---|
| Storage Medium | JVM Heap Memory | File System | RocksDB |
| State Size Limit | <10MB | Per task up to TB level | Only limited by disk capacity |
| Access Speed | Fastest | Medium | Relatively slow |
| Use Case | Testing/small state | Regular production | Ultra-large state scenarios |
Note: DataStream API supports both Managed State and Raw State. In Flink, it is recommended to use Managed State to manage state data—mainly because Managed State better supports state redistribution during parallelism changes and more complete memory management.
State Descriptor
Since State is exposed to users, there are some attributes to specify:
- State name
- Value Serializer
- State Type Info
In corresponding StateBackend, will call corresponding create method to get value from stateDescriptor.
Flink defines a state through StateDescriptor—an abstract class that internally defines basic information like state name, type, serializer, etc. Corresponding to the above states, StateDescriptor derives ValueStateDescriptor, ListStateDescriptor, etc.:
- ValueState getState(ValueStateDescriptor)
- ReducingState getReducingState(ReducingStateDescriptor)
- ListState getListState(ListStateDescriptor)
- FoldingState getFoldingState(FoldingStateDescriptor)
- MapState getMapState(MapStateDescriptot)