This is article 95 in the Big Data series, introducing Flink stateful computation and fault tolerance mechanisms.

Full illustrated version: CSDN Original | Juejin

State Types

Flink divides computation into two types based on whether intermediate results need to be saved: stateful and stateless computation.

1. Stateful Computation

Characteristics: Need to save intermediate results or state, rely on previous or subsequent events for data processing

State Types:

  • Keyed State: State associated with specific keys
  • Operator State: State bound to operator instances

Typical Application Scenarios:

  • Window aggregation (e.g., calculate average temperature within 5 minutes)
  • Pattern detection (e.g., detect abnormal login sequences)
  • Data deduplication (e.g., e-commerce order deduplication)

State Backend Support:

  • MemoryStateBackend (memory)
  • FsStateBackend (file system)
  • RocksDBStateBackend (embedded database)

2. Stateless Computation

Characteristics: Each data record is processed independently, no need to save intermediate state

Typical Operators:

  • Map (one-to-one transformation)
  • Filter (data filtering)
  • FlatMap (one-to-many transformation)

Application Scenarios:

  • Simple data format transformation
  • Data filtering (e.g., filter debug info from logs)
  • Data splitting (e.g., split CSV lines into multiple fields)

Advantages: High execution efficiency, low resource consumption, linearly scalable


State Types

According to different data structures, Flink defines multiple States:

  • ValueState: Single value state of type T, bound to corresponding key. Simplest state. Can update state value via update method, get state value via value() method
  • ListState: State value is a list on the key. Can add values to list via add method, return Iterable via get() to iterate state values
  • ReducingState: This state uses user-provided ReduceFunction. When add method is called, calls ReduceFunction and merges to a single state value
  • FoldingState: Similar to ReducingState, but state value type can differ from element type added via add method (will be removed in future Flink versions)
  • MapState: State value is a Map. User adds elements via put and putAll methods

State is divided by whether it has Key:

  • KeyedState
  • OperatorState

Case: Using State to Calculate Average

Implementation Approach

  1. Read data source
  2. Group data source by Key
  3. For keyed stream data, call stateful processing: instantiate a state instance, update state as stream data arrives, output result

Write Code

package icu.wzk;

public class FlinkStateTest01 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Tuple2<Long, Long>> data = env
                .fromElements(
                        Tuple2.of(1L, 3L),
                        Tuple2.of(1L, 5L),
                        Tuple2.of(1L, 7L),
                        Tuple2.of(1L, 4L),
                        Tuple2.of(1L, 2L)
                );
        KeyedStream<Tuple2<Long, Long>, Long> keyed = data
                .keyBy(new KeySelector<Tuple2<Long, Long>, Long>() {
                    @Override
                    public Long getKey(Tuple2<Long, Long> value) throws Exception {
                        return value.f0;
                    }
                });
        SingleOutputStreamOperator<Tuple2<Long, Long>> flatMapped = keyed
                .flatMap(new RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
                    private transient ValueState<Tuple2<Long, Long>> sum;

                    @Override
                    public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
                        Tuple2<Long, Long> currentSum = sum.value();
                        if (currentSum == null) {
                            currentSum = Tuple2.of(0L, 0L);
                        }
                        // Update
                        currentSum.f0 += 1L;
                        currentSum.f1 += value.f1;
                        System.out.println("currentValue: " + currentSum);
                        // Update state value
                        sum.update(currentSum);
                        // If count >= 5 clear state value and recalculate
                        if (currentSum.f0 >= 5) {
                            out.collect(new Tuple2<>(value.f0, currentSum.f1 / currentSum.f0));
                            sum.clear();
                        }
                    }

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>(
                                "average",
                                TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})
                        );
                        sum = getRuntimeContext().getState(descriptor);
                    }
                });
        flatMapped.print();
        env.execute("Flink State Test");
    }
}

Run Results

Program outputs cumulative result after processing each piece of data. When count >= 5, outputs average and clears state.


Keyed State

Keyed State is a type of state in Flink associated with Keys. It can only be applied to Functions and Operators corresponding to KeyedStream type datasets. KeyedState is actually a special case of OperatorState—the key difference is that KeyedState pre-partitions the dataset by Key, making each KeyedState correspond only to a specific Operator and Key combination.

Core Features

  1. Key Association: Each state is strictly bound to a specific Key, data with the same Key accesses the same state instance
  2. Automatic Partitioning: State is automatically partitioned by Key, ensuring data with the same Key is always routed to the same parallel task instance
  3. Efficient Access: Through Key, can directly locate corresponding state, avoiding full scan

Management Mechanism

KeyedState is managed through KeyGroups mechanism, serving these scenarios:

  • Dynamic Scaling: When operator parallelism changes, system automatically redistributes KeyedState data
  • Fault Recovery: Ensures state is correctly assigned to new task instances during recovery

KeyGroups Working Principle

  1. Grouping Strategy: System distributes all possible Keys to fixed number of KeyGroups via hashing, etc.
  2. Dynamic Allocation: At runtime, a keyed operator instance may be responsible for processing one or more KeyGroups’ Keys
  3. Rebalancing: When parallelism changes, system recalculates KeyGroups to task instance mapping

Typical Application Scenarios

  1. Real-time aggregation calculation: e.g., calculate each user’s click count
// Example: Use ValueState to implement user click statistics
public class UserClickCounter extends KeyedProcessFunction<String, ClickEvent, Tuple2<String, Integer>> {
    private ValueState<Integer> countState;

    @Override
    public void open(Configuration parameters) {
        ValueStateDescriptor<Integer> descriptor =
            new ValueStateDescriptor<>("clickCount", Integer.class);
        countState = getRuntimeContext().getState(descriptor);
    }

    @Override
    public void processElement(ClickEvent event, Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
        Integer currentCount = countState.value();
        if (currentCount == null) {
            currentCount = 0;
        }
        currentCount++;
        countState.update(currentCount);
        out.collect(new Tuple2<>(ctx.getCurrentKey(), currentCount));
    }
}
  1. Pattern detection: e.g., detect user abnormal login behavior
  2. Session window processing: Track user session activity

State Types

Flink provides multiple KeyedState implementations:

  1. ValueState: Store single value
  2. ListState: Store list of values
  3. MapState: Store key-value mapping
  4. ReducingState: Store aggregation result
  5. AggregatingState: Support more complex aggregation

Performance Considerations

  1. State backend selection: Can choose MemoryStateBackend, FsStateBackend, or RocksDBStateBackend
  2. State serialization: Optimizing state serialization can significantly improve performance
  3. State cleanup: Reasonably set state TTL to avoid infinite state growth

Operator State

Operator State is an important state type in Flink, fundamentally different from Keyed State. Operator State is directly bound to specific operator instances, not dependent on keys in data.

Characteristics

  1. State allocation mechanism:

    • Each operator instance maintains state for data it processes
    • State data is partitioned and stored across operator instances
    • When parallelism changes, Flink automatically redistributes state data
  2. Parallelism adaptability:

    • Supports dynamic scaling up/down scenarios
    • Provides three built-in redistribution strategies:
      • Even-split redistribution: Evenly divide state to all new operator instances
      • Union redistribution: Copy complete state to each new instance
      • Custom redistribution: Implement via ListCheckpointed interface
  3. Typical application scenarios:

    • Source operators (e.g., Kafka Consumer) record consumption offsets
    • Window operators maintain window trigger state
    • Custom aggregate operators save intermediate results

State Management Forms

In Flink, both KeyedState and OperatorState support two management forms:

Managed State

This is Flink’s recommended usage, with these characteristics:

  • Runtime management: Unified management by Flink runtime environment
  • Storage optimization:
    • Automatically convert to efficient in-memory data structures (e.g., HashTables)
    • Optional RocksDB as state backend for large state storage
  • Persistence mechanism:
    • Periodically persist state through Checkpoint mechanism
    • Provide exactly-once state guarantee
    • Use efficient serialization framework (e.g., Kryo) for state serialization
  • Recovery process:
    • Automatically recover from last successful Checkpoint on task failure
    • Support incremental Checkpoint to reduce recovery time

Raw State

This form provides greater flexibility but requires developers to take more responsibility:

  • Self-management: Operator needs to maintain state data structure
  • Serialization requirements:
    • Developer needs to implement state serialization logic
    • Need to handle version compatibility issues
  • Checkpoint handling:
    • Need explicit state snapshot when Checkpoint triggers
    • Need to deserialize state data manually during recovery
  • Typical use cases:
    • Scenarios requiring special data structure optimization
    • Integrating third-party libraries requiring specific state format

State Backend Comparison

For managed state, Flink provides multiple state backend implementations:

FeatureMemoryStateBackendFsStateBackendRocksDBStateBackend
Storage MediumJVM Heap MemoryFile SystemRocksDB
State Size Limit<10MBPer task up to TB levelOnly limited by disk capacity
Access SpeedFastestMediumRelatively slow
Use CaseTesting/small stateRegular productionUltra-large state scenarios

Note: DataStream API supports both Managed State and Raw State. In Flink, it is recommended to use Managed State to manage state data—mainly because Managed State better supports state redistribution during parallelism changes and more complete memory management.


State Descriptor

Since State is exposed to users, there are some attributes to specify:

  • State name
  • Value Serializer
  • State Type Info

In corresponding StateBackend, will call corresponding create method to get value from stateDescriptor.

Flink defines a state through StateDescriptor—an abstract class that internally defines basic information like state name, type, serializer, etc. Corresponding to the above states, StateDescriptor derives ValueStateDescriptor, ListStateDescriptor, etc.:

  • ValueState getState(ValueStateDescriptor)
  • ReducingState getReducingState(ReducingStateDescriptor)
  • ListState getListState(ListStateDescriptor)
  • FoldingState getFoldingState(FoldingStateDescriptor)
  • MapState getMapState(MapStateDescriptot)