Rich Parallel Source

Basic Overview

In Apache Flink’s stream processing framework, RichSourceFunction is an enhanced source function that provides developers with a more powerful feature set than ordinary SourceFunction. RichSourceFunction is mainly used for data source definition and implementation, supporting more complex business scenarios and data processing requirements.

Core Features

  1. Lifecycle methods: RichSourceFunction provides complete lifecycle management capabilities:

    • open(): Executes initialization when task starts
    • close(): Executes cleanup when task ends
    • run(): Main method defining data generation logic
    • cancel(): Gracefully terminates data generation
  2. State management: By inheriting RichFunction interface, can access:

    • Keyed State
    • Operator State
    • State Backend configuration
  3. Runtime context: Can obtain through getRuntimeContext() method:

    • Task execution environment information
    • Parallelism settings
    • Task instance number
    • Distributed cache access
  4. Configuration access: Supports reading parameters from Flink’s configuration system for more flexible configuration.

Inheritance Hierarchy

SourceFunction

AbstractRichFunction

RichSourceFunction

Typical Application Scenarios

  1. Sources requiring state management: For example, consuming data from message queues with checkpoint resume
  2. Sources requiring distributed cache access: Like querying external reference data during data generation
  3. Sources requiring complex initialization: Such as establishing database connections, loading configuration files
  4. Sources requiring precise lifecycle control: Like resource-intensive data sources

Example Code

public class CustomRichSource extends RichSourceFunction<String> {
    private transient Connection dbConnection;

    @Override
    public void open(Configuration parameters) {
        // Initialize database connection
        dbConnection = DriverManager.getConnection("jdbc:mysql://...");
    }

    @Override
    public void run(SourceContext<String> ctx) {
        // Read data from database and emit
        while (isRunning) {
            ResultSet rs = dbConnection.executeQuery(...);
            while (rs.next()) {
                ctx.collect(rs.getString(1));
            }
        }
    }

    @Override
    public void cancel() {
        // Set cancel flag
        isRunning = false;
    }

    @Override
    public void close() {
        // Close database connection
        if (dbConnection != null) {
            dbConnection.close();
        }
    }
}

Differences from Regular SourceFunction

FeatureSourceFunctionRichSourceFunction
Lifecycle managementLimitedComplete
State accessNot supportedSupported
Runtime contextNot availableAvailable
Initialization/cleanupCannot implementCan implement via open/close
Configuration parametersNeed to pass via constructorCan read from config

Main Features

  • Lifecycle methods: RichSourceFunction provides open() and close() methods, called when job starts and ends respectively. This allows initialization operations before data reading (like opening connections, loading configuration) and cleanup work when job ends (like closing connections, releasing resources).
  • Access runtime context: Through getRuntimeContext() method, RichSourceFunction can access Flink’s runtime context, obtain parallelism information, task names, metric managers, and state-related operations.
  • State management: RichSourceFunction can combine with Flink’s state management mechanism to save and restore state. This is very useful for source functions that need to maintain intermediate state in stream processing, especially during fault recovery where state can help recover to pre-fault state.
  • Parallel execution: Like regular SourceFunction, RichSourceFunction can also execute in parallel by setting parallelism, enabling it to handle large-scale data sources.

State Management

RichFunction is highly integrated with Flink’s state management system, providing a complete mechanism for maintaining and managing operator intermediate state. This state management capability is the core guarantee for Flink to achieve exactly-once semantics. Flink’s State Backend is responsible for state storage and access, supporting multiple storage methods including memory, file system and RocksDB.

Flink supports four main types of states, each with specific use cases:

ValueState

Suitable for scenarios needing to save a single value:

  • Typical use: Counters (like counting event numbers), flags (like processing status markers)
  • Example: Counting total transaction amount in data stream processing
ValueState<Double> totalAmount = getRuntimeContext()
    .getStateProsition(new ValueStateDescriptor<>("total-amount", Types.DOUBLE));

ListState

Suitable for scenarios needing to save multiple values:

  • Typical use: Intermediate results in window calculations, historical record storage
  • Example: Storing recent N failed login IP addresses
ListState<String> failedIPs = getRuntimeContext()
    .getListState(new ListStateDescriptor<>("failed-ips", Types.STRING));

MapState

Suitable for scenarios needing to maintain key-value pairs:

  • Typical use: Complex data association, user profile feature storage
  • Example: Maintaining mapping from user ID to their behavior features
MapState<String, UserBehavior> userBehaviors = getRuntimeContext()
    .getMapState(new MapStateDescriptor<>("user-behaviors", Types.STRING, Types.POJO(UserBehavior.class)));

ReducingState

Suitable for scenarios needing to continuously aggregate data:

  • Typical use: Real-time calculation of average, maximum and other aggregated metrics
  • Example: Continuously calculating moving average temperature
ReducingState<Double> avgTemperature = getRuntimeContext()
    .getReducingState(new ReducingStateDescriptor<>("avg-temp", new AverageReducer(), Types.DOUBLE));

RichParallelSource Example

package icu.wzk;

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;

public class RichParallelSourceRich extends RichParallelSourceFunction<String> {

    private long count = 1L;
    private boolean running = true;

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (running) {
            count ++;
            ctx.collect(String.valueOf(count));
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

Why Rich Classes Are Widely Used

  • Lifecycle management: Rich classes provide open() and close() methods, allowing developers to perform initialization and cleanup operations when tasks start and end. This is very useful for operations that need to set up resources (like database connections, file I/O, external service connections).
  • Runtime context access: Through getRuntimeContext(), Rich classes can access task parallelism information, task name, subtask index, state management, etc. This information is crucial for scenarios where behavior needs to be adjusted based on task context or state needs to be shared across parallel instances.
  • State management: RichFunction can easily combine with Flink’s state management. In state-rich application scenarios (like needing to maintain intermediate calculation results, counters, caches) in stream processing, Rich classes are very useful.
  • Performance monitoring: Rich classes allow developers to register Flink’s Metrics in open() method, helping monitor and optimize job performance.

When Not to Use Rich Classes

  • Simple operations: If you only need to perform simple transformation or filtering operations without complex initialization, state management or cleanup requirements, then Rich class’s additional features may not be necessary.
  • High performance requirements: In scenarios with extremely high performance requirements, try to minimize complex operations and extra context access. Using lightweight MapFunction, FilterFunction, etc. directly may have better performance.