Rich Parallel Source
Basic Overview
In Apache Flink’s stream processing framework, RichSourceFunction is an enhanced source function that provides developers with a more powerful feature set than ordinary SourceFunction. RichSourceFunction is mainly used for data source definition and implementation, supporting more complex business scenarios and data processing requirements.
Core Features
-
Lifecycle methods: RichSourceFunction provides complete lifecycle management capabilities:
open(): Executes initialization when task startsclose(): Executes cleanup when task endsrun(): Main method defining data generation logiccancel(): Gracefully terminates data generation
-
State management: By inheriting RichFunction interface, can access:
- Keyed State
- Operator State
- State Backend configuration
-
Runtime context: Can obtain through
getRuntimeContext()method:- Task execution environment information
- Parallelism settings
- Task instance number
- Distributed cache access
-
Configuration access: Supports reading parameters from Flink’s configuration system for more flexible configuration.
Inheritance Hierarchy
SourceFunction
↑
AbstractRichFunction
↑
RichSourceFunction
Typical Application Scenarios
- Sources requiring state management: For example, consuming data from message queues with checkpoint resume
- Sources requiring distributed cache access: Like querying external reference data during data generation
- Sources requiring complex initialization: Such as establishing database connections, loading configuration files
- Sources requiring precise lifecycle control: Like resource-intensive data sources
Example Code
public class CustomRichSource extends RichSourceFunction<String> {
private transient Connection dbConnection;
@Override
public void open(Configuration parameters) {
// Initialize database connection
dbConnection = DriverManager.getConnection("jdbc:mysql://...");
}
@Override
public void run(SourceContext<String> ctx) {
// Read data from database and emit
while (isRunning) {
ResultSet rs = dbConnection.executeQuery(...);
while (rs.next()) {
ctx.collect(rs.getString(1));
}
}
}
@Override
public void cancel() {
// Set cancel flag
isRunning = false;
}
@Override
public void close() {
// Close database connection
if (dbConnection != null) {
dbConnection.close();
}
}
}
Differences from Regular SourceFunction
| Feature | SourceFunction | RichSourceFunction |
|---|---|---|
| Lifecycle management | Limited | Complete |
| State access | Not supported | Supported |
| Runtime context | Not available | Available |
| Initialization/cleanup | Cannot implement | Can implement via open/close |
| Configuration parameters | Need to pass via constructor | Can read from config |
Main Features
- Lifecycle methods: RichSourceFunction provides open() and close() methods, called when job starts and ends respectively. This allows initialization operations before data reading (like opening connections, loading configuration) and cleanup work when job ends (like closing connections, releasing resources).
- Access runtime context: Through getRuntimeContext() method, RichSourceFunction can access Flink’s runtime context, obtain parallelism information, task names, metric managers, and state-related operations.
- State management: RichSourceFunction can combine with Flink’s state management mechanism to save and restore state. This is very useful for source functions that need to maintain intermediate state in stream processing, especially during fault recovery where state can help recover to pre-fault state.
- Parallel execution: Like regular SourceFunction, RichSourceFunction can also execute in parallel by setting parallelism, enabling it to handle large-scale data sources.
State Management
RichFunction is highly integrated with Flink’s state management system, providing a complete mechanism for maintaining and managing operator intermediate state. This state management capability is the core guarantee for Flink to achieve exactly-once semantics. Flink’s State Backend is responsible for state storage and access, supporting multiple storage methods including memory, file system and RocksDB.
Flink supports four main types of states, each with specific use cases:
ValueState
Suitable for scenarios needing to save a single value:
- Typical use: Counters (like counting event numbers), flags (like processing status markers)
- Example: Counting total transaction amount in data stream processing
ValueState<Double> totalAmount = getRuntimeContext()
.getStateProsition(new ValueStateDescriptor<>("total-amount", Types.DOUBLE));
ListState
Suitable for scenarios needing to save multiple values:
- Typical use: Intermediate results in window calculations, historical record storage
- Example: Storing recent N failed login IP addresses
ListState<String> failedIPs = getRuntimeContext()
.getListState(new ListStateDescriptor<>("failed-ips", Types.STRING));
MapState
Suitable for scenarios needing to maintain key-value pairs:
- Typical use: Complex data association, user profile feature storage
- Example: Maintaining mapping from user ID to their behavior features
MapState<String, UserBehavior> userBehaviors = getRuntimeContext()
.getMapState(new MapStateDescriptor<>("user-behaviors", Types.STRING, Types.POJO(UserBehavior.class)));
ReducingState
Suitable for scenarios needing to continuously aggregate data:
- Typical use: Real-time calculation of average, maximum and other aggregated metrics
- Example: Continuously calculating moving average temperature
ReducingState<Double> avgTemperature = getRuntimeContext()
.getReducingState(new ReducingStateDescriptor<>("avg-temp", new AverageReducer(), Types.DOUBLE));
RichParallelSource Example
package icu.wzk;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
public class RichParallelSourceRich extends RichParallelSourceFunction<String> {
private long count = 1L;
private boolean running = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (running) {
count ++;
ctx.collect(String.valueOf(count));
Thread.sleep(1000);
}
}
@Override
public void cancel() {
running = false;
}
}
Why Rich Classes Are Widely Used
- Lifecycle management: Rich classes provide open() and close() methods, allowing developers to perform initialization and cleanup operations when tasks start and end. This is very useful for operations that need to set up resources (like database connections, file I/O, external service connections).
- Runtime context access: Through getRuntimeContext(), Rich classes can access task parallelism information, task name, subtask index, state management, etc. This information is crucial for scenarios where behavior needs to be adjusted based on task context or state needs to be shared across parallel instances.
- State management: RichFunction can easily combine with Flink’s state management. In state-rich application scenarios (like needing to maintain intermediate calculation results, counters, caches) in stream processing, Rich classes are very useful.
- Performance monitoring: Rich classes allow developers to register Flink’s Metrics in open() method, helping monitor and optimize job performance.
When Not to Use Rich Classes
- Simple operations: If you only need to perform simple transformation or filtering operations without complex initialization, state management or cleanup requirements, then Rich class’s additional features may not be necessary.
- High performance requirements: In scenarios with extremely high performance requirements, try to minimize complex operations and extra context access. Using lightweight MapFunction, FilterFunction, etc. directly may have better performance.