Use Managed Operator State

We can use ManagedOperatorState by implementing CheckpointedFunction or ListCheckpointed interfaces.

CheckpointedFunction

CheckpointedFunction interface provides access to non-keyed state, needs to implement these two methods:

void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;

snapshotState() is called during Checkpoint, initializeState() is called during user-defined function initialization—initialization includes first function initialization and recovery from previous Checkpoint.

Currently, ManagedOperatorState exists in list form—these states are a List of serializable objects, independent of each other, convenient for redistributing state after changes. According to different state access methods, there are several redistribution patterns:

  • Even-split redistribution: Each operator saves a list of state collections, entire state is拼接 from all lists. When job recovers or redistributes, entire state is evenly distributed according to operator parallelism
  • Union redistribution: Each operator saves a list of state collections, entire state is拼接 from all lists. When job recovers or redistributes, each operator gets all state data

ListCheckpointed

ListCheckpointed interface is the streamlined version of CheckpointedFunction, only supports even-split redistribution list state, also needs to implement two methods:

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;

How StateBackend Saves

Three StateBackends:

  • MemoryStateBackend
  • FsStateBackend
  • RocksDBStateBackend

In actual Flink implementation, for the same StateBackend, different States have subdivided StateBackend托管 at runtime. For example, MemoryStateBackend has DefaultOperatorStateBackend managing OperatorState, HeapKeyedStateBackend managing KeyedState.

MemoryStateBackend and FsStateBackend both follow our previous understanding for KeyedState and OperatorState storage—State data saved in memory at runtime.

Source code:

// RocksDBStateBackend.java
// Create keyed statebackend
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(...){
    return new RocksDBKeyedStateBackend<>(...);
}
// Create Operator statebackend
public OperatorStateBackend createOperatorStateBackend(
    Environment env, String operatorIdentifier) throws Exception {
    final boolean asyncSnapshots = true;
    return new DefaultOperatorStateBackend(...);
}

Operator State Data Structure

Data structure 1: ListState corresponding actual implementation class is PartitionableListState

// DefaultOperatorStateBackend.java
private <S> ListState<S> getListState(...){
    partitionableListState = new PartitionableListState<>(
        new RegisteredOperatorStateBackendMetaInfo<>(
            name,
            partitionStateSerializer,
            mode));
    registeredOperatorStates.put(name, partitionableListState);
}

PartitionableListState saves State data via ArrayList.

Data structure 2: BroadcastState corresponding actual implementation class is HeapBroadcastState

public <K, V> BroadcastState<K, V> getBroadcastState(...) {
    broadcastState = new HeapBroadcastState<>(
        new RegisteredBroadcastStateBackendMetaInfo<>(
            name,
            OperatorStateHandle.Mode.BROADCAST,
            broadcastStateKeySerializer,
            broadcastStateValueSerializer));
    registeredBroadcastStates.put(name, broadcastState);
}

HeapBroadcastState saves State data via HashMap.

Configure StateBackend

Flink provides three StateBackends, default configuration in conf/flink-conf.yaml file specified by state.backend.

Per-job Setting

Set via StreamExecutionEnvironment:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

If want to use RocksDBStateBackend, need to add dependency:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
  <version>${flink.version}</version>
  <scope>provided</scope>
</dependency>

Default State Backend Setting Details

  1. JobManager - Corresponds to MemoryStateBackend

    • State data stored in JobManager’s memory
    • Checkpoint data stored in JobManager’s memory (default) or specified file system path
    • Typical application scenarios: Local testing, small-scale state jobs
  2. FileSystem - Corresponds to FsStateBackend

    • Working state stored in TaskManager’s memory
    • Checkpoint data persisted to distributed file system (e.g., HDFS, S3, etc.)
    • Configuration example:
      state.backend: filesystem
      state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints
  3. RocksDB - Corresponds to RocksDBStateBackend

    • Stores working state in local RocksDB instance
    • Checkpoint data persisted to distributed file system
    • Supports state size exceeding available memory
    • Configuration example:
      state.backend: rocksdb
      state.checkpoints.dir: hdfs://namenode:8020/flink/checkpoints
      state.backend.rocksdb.localdir: /mnt/flink/rocksdb

Enable Checkpoint

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000); // Trigger checkpoint every 30 seconds
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000);
env.getCheckpointConfig().setCheckpointTimeout(600000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(
    CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
env.getConfig().setUseSnapshotCompression(true);

FsStateBackend and RocksDBStateBackend after Checkpoint completion save to directory:

hdfs:///your/checkpoint/path/{JOB_ID}/chk-{CHECKPOINT_ID}/

Use CheckpointedFunction

public class StatefulProcess extends KeyedProcessFunction<String, KeyValue, KeyValue>
    implements CheckpointedFunction {
    ValueState<Integer> processedInt;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
    }

    @Override
    public void processElement(KeyValue keyValue, Context context,
        Collector<KeyValue> collector) throws Exception {
        try{
            Integer a = Integer.parseInt(keyValue.getValue());
            processedInt.update(a);
            collector.collect(keyValue);
        }catch(Exception e){
            e.printStackTrace();
        }
    }

    @Override
    public void initializeState(FunctionInitializationContext
        functionInitializationContext) throws Exception {
        processedInt = functionInitializationContext.getKeyedStateStore()
            .getState(new ValueStateDescriptor<>("processedInt", Integer.class));
        if(functionInitializationContext.isRestored()){
            //Apply logic to restore the data
        }
    }

    @Override
    public void snapshotState(FunctionSnapshotContext functionSnapshotContext)
        throws Exception {
        processedInt.clear();
    }
}