使用 Managed Operator State

我们可以通过实现CheckpointedFunctionListCheckpointed接口来使用ManagedOperatorState。

CheckpointedFunction

CheckpointedFunction接口提供了访问non-keyed state的方法,需要实现如下两个方法:

void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;

进行Checkpoint时会调用snapshotState(),用户自定义函数化时会调用initializeState(),初始化包括第一次自定义函数初始化和从之前的Checkpoint恢复。

当前,ManagedOperatorState以list的形式存在,这些状态是一个可序列化对象的集合List,彼此独立,方便在改变后进行状态的重新分派。根据状态不同访问方式,有如下几种重新分配的模式:

  • Even-split redistribution:每个算子都保存一个列表形式的状态集合,整个状态由所有的列表拼接而成,当作业恢复或重新分配的时候,整个状态会按照算子的并发度进行均匀分配
  • Union redistribution:每个算子保存一个列表形式的状态集合,整个状态由所有列表拼接而成,当作业恢复或重新分配时,每个算子都将获得所有的状态数据

ListCheckpointed

ListCheckpointed接口是CheckpointedFunction的精简版,仅支持even-split redistribution的list state,同样需要实现下面两个方法:

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;

StateBackend 如何保存

三种StateBackend:

  • MemoryStateBackend
  • FsStateBackend
  • RocksDBStateBackend

在Flink的实际实现中,对于同一种StateBackend,不同的State在运行时会有细分的StateBackend托管。例如MemoryStateBackend有DefaultOperatorStateBackend管理OperatorState,HeapKeyedStateBackend管理KeyedState。

MemoryStateBackend和FsStateBackend对于KeyedState和OperatorState的存储都符合我们之前的理解,运行时State数据保存于内存。

源码如下:

// RocksDBStateBackend.java
// 创建 keyed statebackend
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(...){
    return new RocksDBKeyedStateBackend<>(...);
}
// 创建 Operator statebackend
public OperatorStateBackend createOperatorStateBackend(
    Environment env, String operatorIdentifier) throws Exception {
    final boolean asyncSnapshots = true;
    return new DefaultOperatorStateBackend(...);
}

Operator State 数据结构

数据结构1:ListState对应的实际实现类为PartitionableListState

// DefaultOperatorStateBackend.java
private <S> ListState<S> getListState(...){
    partitionableListState = new PartitionableListState<>(
        new RegisteredOperatorStateBackendMetaInfo<>(
            name,
            partitionStateSerializer,
            mode));
    registeredOperatorStates.put(name, partitionableListState);
}

PartitionableListState中通过ArrayList来保存State数据。

数据结构2:BroadcastState对应的实际实现类为HeapBroadcastState

public <K, V> BroadcastState<K, V> getBroadcastState(...) {
    broadcastState = new HeapBroadcastState<>(
        new RegisteredBroadcastStateBackendMetaInfo<>(
            name,
            OperatorStateHandle.Mode.BROADCAST,
            broadcastStateKeySerializer,
            broadcastStateValueSerializer));
    registeredBroadcastStates.put(name, broadcastState);
}

HeapBroadcastState中通过HashMap来保存State数据。

配置 StateBackend

Flink提供了三个StateBackend,默认配置在conf/flink-conf.yaml文件中state.backend指定。

Per-job 设置

通过StreamExecutionEnvironment设置:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

如果想使用RocksDBStateBackend,需要加入依赖:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
  <version>${flink.version}</version>
  <scope>provided</scope>
</dependency>

默认状态后端设置详解

  1. JobManager - 对应MemoryStateBackend

    • 状态数据存储在JobManager的内存中
    • 检查点数据会存储在JobManager的内存中(默认)或指定的文件系统路径
    • 典型应用场景:本地测试、小规模状态作业
  2. FileSystem - 对应FsStateBackend

    • 工作状态存储在TaskManager的内存中
    • 检查点数据持久化到分布式文件系统(如HDFS、S3等)
    • 配置示例:
      state.backend: filesystem
      state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints
  3. RocksDB - 对应RocksDBStateBackend

    • 将工作状态存储在本地RocksDB实例中
    • 检查点数据持久化到分布式文件系统
    • 支持状态大小超过可用内存的情况
    • 配置示例:
      state.backend: rocksdb
      state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints
      state.backend.rocksdb.localdir: /mnt/flink/rocksdb

开启 Checkpoint

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000); // 每 30 秒触发一次 checkpoint
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(600000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(
    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
env.getConfig().setUseSnapshotCompression(true);

FsStateBackend和RocksDBStateBackend CheckPoint完成后最终保存到目录:

hdfs:///your/checkpoint/path/{JOB_ID}/chk-{CHECKPOINT_ID}/

使用 CheckpointedFunction

public class StatefulProcess extends KeyedProcessFunction<String, KeyValue, KeyValue>
    implements CheckpointedFunction {
    ValueState<Integer> processedInt;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
    }

    @Override
    public void processElement(KeyValue keyValue, Context context,
        Collector<KeyValue> collector) throws Exception {
        try{
            Integer a = Integer.parseInt(keyValue.getValue());
            processedInt.update(a);
            collector.collect(keyValue);
        }catch(Exception e){
            e.printStackTrace();
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext
        functionInitializationContext) throws Exception {
        processedInt = functionInitializationContext.getKeyedStateStore()
            .getState(new ValueStateDescriptor<>("processedInt", Integer.class));
        if(functionInitializationContext.isRestored()){
            //Apply logic to restore the data
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext functionSnapshotContext)
        throws Exception {
        processedInt.clear();
    }
}