State Storage

An important feature of Flink is stateful processing. Flink provides easy-to-use APIs to store and get state, but understanding the principles behind the API helps use it better.

State Storage Methods

Flink provides three out-of-box state backend storage methods:

  • Memory State Backend
  • File System (FS) State Backend
  • RocksDB State Backend

MemoryStateBackend

MemoryStateBackend stores working state data in TaskManager’s Java memory. Key/Value state and Window operators use hash tables to store values and triggers. During snapshot (Checkpointing), generated snapshot data is sent to JobManager along with Checkpoint ACK messages, and JobManager stores all received snapshot data in Java memory.

MemoryStateBackend is now configured as asynchronous by default, avoiding blocking the main thread’s pipeline processing. State access in MemoryStateBackend is very fast, but it’s not suitable for production use due to these limitations:

  • Each state is limited to 5MB by default (can be set via MemoryStateBackend constructor)
  • All state data for a Task (a Task may contain multiple Operators in a pipeline) cannot exceed the RPC system’s frame size (akk.framesize default 10MB)
  • Total state data received by JobManager cannot exceed JobManager memory

Suitable scenarios for MemoryStateBackend:

  • Local development and debugging
  • Jobs with very small state

FsStateBackend

FsStateBackend requires configuring a Checkpoint path, e.g., hdfs://xxxxxxx or file:///xxxxx. We usually configure HDFS directory.

FsStateBackend stores working state data in TaskManager’s Java memory. During snapshot, writes snapshot data to the configured path, then tells JobManager the file path written. JobManager stores metadata for all states (in HA mode, metadata is written to Checkpoint directory).

FsStateBackend uses asynchronous snapshot by default to prevent blocking main thread pipeline processing. Can disable this mode via FsStateBackend constructor:

new FsStateBackend(path, false)

Suitable scenarios for FsStateBackend:

  • Jobs with large state, long windows, large key-value state
  • Suitable for high availability solutions

RocksDBStateBackend

RocksDBStateBackend also requires configuring a Checkpoint path, e.g., hdfs://xxx or file:///xxx, usually HDFS path.

RocksDB is an embeddable persistent key-value storage engine with ACID support. Developed by Facebook based on LevelDB, uses LSM storage engine, is memory and disk hybrid storage.

RocksDBStateBackend stores working state in TaskManager’s RocksDB database. During Checkpoint, all data in RocksDB is transmitted to configured file directory, small amount of metadata stored in JobManager memory (in HA mode, stored in Checkpoint directory).

RocksDBStateBackend uses asynchronous snapshot. Limitations of RocksDBStateBackend:

  • Since RocksDB’s JNI Bridge API is based on byte[], maximum size for each Key or Value supported by RocksDBStateBackend cannot exceed 2^31 (2GB)
  • Note: State with merge operations (e.g., ListState) may exceed 2^31 during runtime, causing program failure

Suitable scenarios for RocksDBStateBackend:

  • Jobs with ultra-large state, long windows (days), large key-value state
  • Suitable for high availability mode

When using RocksDBStateBackend, state size can be limited to TaskManager’s disk space (relative to FsStateBackend where state size is limited by TaskManager memory). This also causes RocksDBStateBackend throughput to be lower than the other two, because state data read/write in RocksDB go through serialization/deserialization.

RocksDBStateBackend is currently the only one among the three that supports incremental Checkpoint.

Throughput Comparison

MemoryStateBackend > FsStateBackend > RocksDBStateBackend

KeyedState and Operator State

State Classification

Operator State

(or non-keyed state): Each Operator State is bound to a parallel operator instance. KafkaConnector using Operator State is a typical example: each parallel Kafka Consumer instance maintains mapping between each Kafka Topic partition and that partition’s Offset, saving this mapping as Operator State. When operator parallelism changes, Operator State is also redistributed.

KeyedState

This State only exists in functions and operators on KeyedStream, such as Keyed UDF (KeyedProcessFunction) Window State. KeyedState can be thought of as partitioned Operator State. Each KeyedState is logically bound to a <parallel-operator-instance, key> combination. Since a key definitely exists in only one operator instance, we can simply consider <operator, key> corresponds to one KeyedState.

Each KeyedState is also logically assigned a KeyGroup. Assignment method:

MathUtils.murmurHash(key.hashCode()) % maxParallelism;

Where maxParallelism is the maximum parallelism of Flink program. We usually don’t manually set this, using default value (128). Note that maxParallelism is different from operator parallelism specified when running program—parallelism cannot be greater than maxParallelism, at most they can be equal.

Why KeyGroup concept exists?

When we write programs, we specify a parallelism for operators. After running for a period, some State accumulates. Then data volume increases, we need to increase parallelism. After modifying parallelism and re-submitting, how should existing State be distributed to each Operator? This brings maxParallelism and KeyGroup concepts.

The calculation formula above also shows KeyGroup count is at most maxParallelism. After parallelism changes, we calculate which Operator this Key is assigned to:

keyGroupId * parallelism / maxParallelism;

可以看到,一个KeyGroupId会对应一个Operator,当并行度更改时,新的Operator会去拉取对应的KeyGroup的KeyedState,这样就把KeyedState尽量均匀的分配给所有的Operator了。

Based on whether State data is managed by Flink, Flink classifies State as:

  • Managed State: Managed by Flink, saved as internal hash tables or RocksDB. During Checkpoint, Flink serializes State. Example: ValueState, ListState
  • Raw State: Data structure managed by operator itself. During Checkpoint, they can only be written to Checkpoint as byte data

Recommend using Managed State. When using Managed State, Flink helps redistribute State when changing parallelism and optimize memory.

Use Managed KeyedState

How to create?

As mentioned, KeyedState can only be used on KeyedStream. KeyedStream can be created via Stream.keyBy. We can create these:

  • ValueState
  • ListState
  • ReducingState
  • AggregatingState<IN,OUT>
  • MapState<UK,UV>
  • FoldingState<T,ACC>

Each State has corresponding descriptor, obtained from RuntimeContext via descriptor. Only RichFunction can get RuntimeContext, so to use KeyedState, user’s class must inherit RichFunction or other subclasses.

  • ValueState getState(ValueStateDescriptor)
  • ReducingState getReducingState(ReducingStateDescriptor)
  • ListState getListState(ListStateDescriptor)
  • AggregationState<IN,OUT> getAggregatingState(AggregatingStateDescriptor<IN,ACC,OUT>)
  • FoldingState<T,ACC> getFoldingState(FoldingStateDescriptor<T,ACC>)
  • MapState<UK,UV> getMapState(MapStateDescriptor<UK,UV>)

Set TTL for KeyedState

After Flink 1.6.0, can also set TTL (Time-To-Live) for KeyedState. When a Key’s State data expires, StateBackend tries to delete it.

Official example:

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1)) // State TTL
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // When TTL is updated - configured when state is created and written
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();// Set expired state not to be returned
ValueStateDescriptor<String> stateDescriptor = new StateDescriptor<>("textstate", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);

When is State TTL updated?

Can configure, default is only when key’s state is modified (created or updated):

  • StateTtlConfig.UpdateType.OnCreateAndWrite: Only update TTL when key’s state is created and written (default)
  • StateTtlConfig.UpdateType.onReadAndWrite: Also update TTL when reading state

When State expires but hasn’t been deleted, is this state still visible?

Can configure, default is invisible:

  • StateTtlConfig.StateVisibility.NerverReturnExpired: Invisible (default)
  • StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp: Visible

Notes:

  1. State storage overhead

    • Enabling TTL feature increases state storage size, specific impact depends on State Backend type:
      • Heap State Backend: Stores additional Java 8 object containing user state and its timestamp
      • RocksDB State Backend: Adds 8 bytes timestamp overhead per state value (e.g., each element in List or Map after serialization)
  2. Time semantics limitation

    • Currently only supports TTL based on Processing Time, not Event Time or Ingestion Time
    • Suitable for scenarios requiring periodic cleanup of expired data, like session timeout, cache invalidation
  3. Checkpoint/Savepoint recovery requirements

    • When recovering from Checkpoint/Savepoint, TTL enabled state must match exactly with when saved, otherwise throws StateMigrationException
    • During job upgrade or migration, ensure TTL configuration consistency
  4. TTL configuration temporariness

    • TTL configuration (like expiration time) is not persisted to Checkpoint/Savepoint, only effective for current job
  5. NULL value support for MapState

    • MapState with TTL only allows storing NULL when user-defined serializer supports NULL values
    • If serializer doesn’t support NULL, can wrap with NullableSerializer:
MapStateDescriptor<String, String> descriptor = new MapStateDescriptor<>(
    "myMapState",
    String.class,
    NullableSerializer.wrap(StringSerializer.INSTANCE)
);