Use Managed Operator State
We can use ManagedOperatorState by implementing CheckpointedFunction or ListCheckpointed interfaces.
CheckpointedFunction
CheckpointedFunction interface provides access to non-keyed state, needs to implement these two methods:
void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
snapshotState() is called during Checkpoint, initializeState() is called during user-defined function initialization—initialization includes first function initialization and recovery from previous Checkpoint.
Currently, ManagedOperatorState exists in list form—these states are a List of serializable objects, independent of each other, convenient for redistributing state after changes. According to different state access methods, there are several redistribution patterns:
- Even-split redistribution: Each operator saves a list of state collections, entire state is拼接 from all lists. When job recovers or redistributes, entire state is evenly distributed according to operator parallelism
- Union redistribution: Each operator saves a list of state collections, entire state is拼接 from all lists. When job recovers or redistributes, each operator gets all state data
ListCheckpointed
ListCheckpointed interface is the streamlined version of CheckpointedFunction, only supports even-split redistribution list state, also needs to implement two methods:
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;
How StateBackend Saves
Three StateBackends:
- MemoryStateBackend
- FsStateBackend
- RocksDBStateBackend
In actual Flink implementation, for the same StateBackend, different States have subdivided StateBackend托管 at runtime. For example, MemoryStateBackend has DefaultOperatorStateBackend managing OperatorState, HeapKeyedStateBackend managing KeyedState.
MemoryStateBackend and FsStateBackend both follow our previous understanding for KeyedState and OperatorState storage—State data saved in memory at runtime.
Source code:
// RocksDBStateBackend.java
// Create keyed statebackend
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(...){
return new RocksDBKeyedStateBackend<>(...);
}
// Create Operator statebackend
public OperatorStateBackend createOperatorStateBackend(
Environment env, String operatorIdentifier) throws Exception {
final boolean asyncSnapshots = true;
return new DefaultOperatorStateBackend(...);
}
Operator State Data Structure
Data structure 1: ListState corresponding actual implementation class is PartitionableListState
// DefaultOperatorStateBackend.java
private <S> ListState<S> getListState(...){
partitionableListState = new PartitionableListState<>(
new RegisteredOperatorStateBackendMetaInfo<>(
name,
partitionStateSerializer,
mode));
registeredOperatorStates.put(name, partitionableListState);
}
PartitionableListState saves State data via ArrayList.
Data structure 2: BroadcastState corresponding actual implementation class is HeapBroadcastState
public <K, V> BroadcastState<K, V> getBroadcastState(...) {
broadcastState = new HeapBroadcastState<>(
new RegisteredBroadcastStateBackendMetaInfo<>(
name,
OperatorStateHandle.Mode.BROADCAST,
broadcastStateKeySerializer,
broadcastStateValueSerializer));
registeredBroadcastStates.put(name, broadcastState);
}
HeapBroadcastState saves State data via HashMap.
Configure StateBackend
Flink provides three StateBackends, default configuration in conf/flink-conf.yaml file specified by state.backend.
Per-job Setting
Set via StreamExecutionEnvironment:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
If want to use RocksDBStateBackend, need to add dependency:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Default State Backend Setting Details
-
JobManager - Corresponds to
MemoryStateBackend- State data stored in JobManager’s memory
- Checkpoint data stored in JobManager’s memory (default) or specified file system path
- Typical application scenarios: Local testing, small-scale state jobs
-
FileSystem - Corresponds to
FsStateBackend- Working state stored in TaskManager’s memory
- Checkpoint data persisted to distributed file system (e.g., HDFS, S3, etc.)
- Configuration example:
state.backend: filesystem state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints
-
RocksDB - Corresponds to
RocksDBStateBackend- Stores working state in local RocksDB instance
- Checkpoint data persisted to distributed file system
- Supports state size exceeding available memory
- Configuration example:
state.backend: rocksdb state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints state.backend.rocksdb.localdir: /mnt/flink/rocksdb
Enable Checkpoint
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000); // Trigger checkpoint every 30 seconds
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(600000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
env.getConfig().setUseSnapshotCompression(true);
FsStateBackend and RocksDBStateBackend after Checkpoint completion save to directory:
hdfs:///your/checkpoint/path/{JOB_ID}/chk-{CHECKPOINT_ID}/
Use CheckpointedFunction
public class StatefulProcess extends KeyedProcessFunction<String, KeyValue, KeyValue>
implements CheckpointedFunction {
ValueState<Integer> processedInt;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public void processElement(KeyValue keyValue, Context context,
Collector<KeyValue> collector) throws Exception {
try{
Integer a = Integer.parseInt(keyValue.getValue());
processedInt.update(a);
collector.collect(keyValue);
}catch(Exception e){
e.printStackTrace();
}
}
@Override
public void initializeState(FunctionInitializationContext
functionInitializationContext) throws Exception {
processedInt = functionInitializationContext.getKeyedStateStore()
.getState(new ValueStateDescriptor<>("processedInt", Integer.class));
if(functionInitializationContext.isRestored()){
//Apply logic to restore the data
}
}
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext)
throws Exception {
processedInt.clear();
}
}