使用 Managed Operator State
我们可以通过实现CheckpointedFunction或ListCheckpointed接口来使用ManagedOperatorState。
CheckpointedFunction
CheckpointedFunction接口提供了访问non-keyed state的方法,需要实现如下两个方法:
void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
进行Checkpoint时会调用snapshotState(),用户自定义函数化时会调用initializeState(),初始化包括第一次自定义函数初始化和从之前的Checkpoint恢复。
当前,ManagedOperatorState以list的形式存在,这些状态是一个可序列化对象的集合List,彼此独立,方便在改变后进行状态的重新分派。根据状态不同访问方式,有如下几种重新分配的模式:
- Even-split redistribution:每个算子都保存一个列表形式的状态集合,整个状态由所有的列表拼接而成,当作业恢复或重新分配的时候,整个状态会按照算子的并发度进行均匀分配
- Union redistribution:每个算子保存一个列表形式的状态集合,整个状态由所有列表拼接而成,当作业恢复或重新分配时,每个算子都将获得所有的状态数据
ListCheckpointed
ListCheckpointed接口是CheckpointedFunction的精简版,仅支持even-split redistribution的list state,同样需要实现下面两个方法:
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;
StateBackend 如何保存
三种StateBackend:
- MemoryStateBackend
- FsStateBackend
- RocksDBStateBackend
在Flink的实际实现中,对于同一种StateBackend,不同的State在运行时会有细分的StateBackend托管。例如MemoryStateBackend有DefaultOperatorStateBackend管理OperatorState,HeapKeyedStateBackend管理KeyedState。
MemoryStateBackend和FsStateBackend对于KeyedState和OperatorState的存储都符合我们之前的理解,运行时State数据保存于内存。
源码如下:
// RocksDBStateBackend.java
// 创建 keyed statebackend
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(...){
return new RocksDBKeyedStateBackend<>(...);
}
// 创建 Operator statebackend
public OperatorStateBackend createOperatorStateBackend(
Environment env, String operatorIdentifier) throws Exception {
final boolean asyncSnapshots = true;
return new DefaultOperatorStateBackend(...);
}
Operator State 数据结构
数据结构1:ListState对应的实际实现类为PartitionableListState
// DefaultOperatorStateBackend.java
private <S> ListState<S> getListState(...){
partitionableListState = new PartitionableListState<>(
new RegisteredOperatorStateBackendMetaInfo<>(
name,
partitionStateSerializer,
mode));
registeredOperatorStates.put(name, partitionableListState);
}
PartitionableListState中通过ArrayList来保存State数据。
数据结构2:BroadcastState对应的实际实现类为HeapBroadcastState
public <K, V> BroadcastState<K, V> getBroadcastState(...) {
broadcastState = new HeapBroadcastState<>(
new RegisteredBroadcastStateBackendMetaInfo<>(
name,
OperatorStateHandle.Mode.BROADCAST,
broadcastStateKeySerializer,
broadcastStateValueSerializer));
registeredBroadcastStates.put(name, broadcastState);
}
HeapBroadcastState中通过HashMap来保存State数据。
配置 StateBackend
Flink提供了三个StateBackend,默认配置在conf/flink-conf.yaml文件中state.backend指定。
Per-job 设置
通过StreamExecutionEnvironment设置:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
如果想使用RocksDBStateBackend,需要加入依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
默认状态后端设置详解
-
JobManager - 对应
MemoryStateBackend- 状态数据存储在JobManager的内存中
- 检查点数据会存储在JobManager的内存中(默认)或指定的文件系统路径
- 典型应用场景:本地测试、小规模状态作业
-
FileSystem - 对应
FsStateBackend- 工作状态存储在TaskManager的内存中
- 检查点数据持久化到分布式文件系统(如HDFS、S3等)
- 配置示例:
state.backend: filesystem state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints
-
RocksDB - 对应
RocksDBStateBackend- 将工作状态存储在本地RocksDB实例中
- 检查点数据持久化到分布式文件系统
- 支持状态大小超过可用内存的情况
- 配置示例:
state.backend: rocksdb state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints state.backend.rocksdb.localdir: /mnt/flink/rocksdb
开启 Checkpoint
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000); // 每 30 秒触发一次 checkpoint
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(600000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
env.getConfig().setUseSnapshotCompression(true);
FsStateBackend和RocksDBStateBackend CheckPoint完成后最终保存到目录:
hdfs:///your/checkpoint/path/{JOB_ID}/chk-{CHECKPOINT_ID}/
使用 CheckpointedFunction
public class StatefulProcess extends KeyedProcessFunction<String, KeyValue, KeyValue>
implements CheckpointedFunction {
ValueState<Integer> processedInt;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public void processElement(KeyValue keyValue, Context context,
Collector<KeyValue> collector) throws Exception {
try{
Integer a = Integer.parseInt(keyValue.getValue());
processedInt.update(a);
collector.collect(keyValue);
}catch(Exception e){
e.printStackTrace();
}
}
@Override
public void initializeState(FunctionInitializationContext
functionInitializationContext) throws Exception {
processedInt = functionInitializationContext.getKeyedStateStore()
.getState(new ValueStateDescriptor<>("processedInt", Integer.class));
if(functionInitializationContext.isRestored()){
//Apply logic to restore the data
}
}
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext)
throws Exception {
processedInt.clear();
}
}