Basic Concepts of Parallelism
In Apache Flink, parallelism refers to the number of parallel tasks that each operator can run simultaneously when executing. Flink program data flow graph consists of multiple operators (including Source, Transformation, and Sink), each operator can independently set its parallelism.
Importance of Parallelism
Parallelism is one of the key parameters for Flink performance tuning:
- Determines resource usage of tasks in the cluster
- Directly affects data processing throughput
- Affects job fault tolerance and recovery speed
Parallelism Composition Structure
Flink program execution model can be decomposed into:
- Job: Complete Flink program
- Operator: Basic unit of data processing (e.g., map, filter, join, etc.)
- Task: Parallel execution instance of operator
- Subtask: Actual execution unit (thread)
Parallelism Setting Methods
1. Global Parallelism Setting
// Set in ExecutionConfig
env.setParallelism(4);
- Affects default parallelism of all operators in the entire job
- Usually set at job entry point
2. Operator-level Parallelism
DataStream<String> data = env.addSource(new CustomSource())
.map(new MyMapper()).setParallelism(2)
.filter(new MyFilter()).setParallelism(3);
- Can set different parallelism from global for specific operators
- Suitable for compute-intensive or I/O-intensive special operators
3. Configuration File Setting
In flink-conf.yaml:
parallelism.default: 3
- Acts as cluster’s default parallelism
- Used when not explicitly set in code
4. Client Submission Parameters
./bin/flink run -p 5 -c com.example.MyJob myJob.jar
- Specified when submitting job via CLI
- Overrides global parallelism setting in code
Best Practices for Parallelism Setting
Considerations
- Data volume: Large data usually requires higher parallelism
- Operator characteristics: Stateful operators like window may need special consideration
- Cluster resources: Should not exceed total available TaskManager slots
- Data skew: May need to adjust parallelism to balance load
Typical Scenario Examples
- ETL pipeline: Source and Sink parallelism usually matches external system partition count
- Stream analysis: Key operators may need parallelism 2-4 times CPU core count
- Batch processing: Can set higher parallelism to speed up processing
Parallelism Levels
Global Parallelism
Global parallelism refers to parallelism default for all jobs and operators in Flink cluster. In configuration file flink-conf.yaml:
parallelism.default: 4
Job-level Parallelism
When submitting Flink job, set via -p parameter:
flink run -p 10 your-job.jar
Operator-level Parallelism
DataStream<String> stream = env.readTextFile("input.txt")
.map(new MyMapper())
.setParallelism(5);
Slot-level Parallelism
Each TaskManager can configure Slot count:
taskmanager.numberOfTaskSlots: 4
Parallelism Optimization Strategies
Set Reasonable Parallelism Based on Data Volume
For large data volume tasks, can increase parallelism to improve processing speed. However, higher parallelism is not always better. Excessive parallelism leads to resource waste and task scheduling overhead. Generally, recommended job parallelism should not exceed total available TaskManager slots.
Reasonably Allocate Operator Parallelism
Some operators, like reduce or aggregate after keyBy(), are limited by key count, so setting excessive parallelism for these operators doesn’t improve performance.
Use Resource Monitoring for Dynamic Tuning
During task runtime, can use Flink Web UI to monitor job running status. If certain operators have slow processing speed or low resource utilization, can consider adjusting those operators’ parallelism.
Consider Network and I/O Limitations
Flink job performance not only depends on CPU and memory, but also limited by network bandwidth and I/O speed. When processing large data, if job requires frequent network transmission or I/O operations, should avoid excessive parallelism causing network or disk I/O bottleneck.
Parallelism and Fault Tolerance
Flink supports fault tolerance. When tasks fail, Flink recovers based on savepoints (checkpoint). High parallelism jobs typically generate more checkpoint data, in some cases increasing job recovery overhead.
Code Example
public class FlinkParallelismExample {
public static void main(String[] args) throws Exception {
// 1. Create stream processing environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set global parallelism (default parallelism)
env.setParallelism(8); // Global default parallelism is 8
// 2. Configure Kafka consumer
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
kafkaProps.setProperty("group.id", "transaction-group");
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(
"transaction-topic", new SimpleStringSchema(), kafkaProps);
// Set Kafka source parallelism
DataStream<String> transactionStream = env
.addSource(kafkaSource)
.setParallelism(6); // Parallelism when reading from Kafka is 6
// 3. Data transformation operation
DataStream<String> cleanedData = transactionStream
.map(value -> cleanTransactionData(value))
.setParallelism(12); // Data cleaning parallelism is 12
// 4. Write cleaned data to HDFS
cleanedData
.writeAsText("hdfs://namenode:8020/flink/cleaned_transactions/")
.setParallelism(4); // Write to HDFS parallelism is 4
// 5. Start task
env.execute("Flink Parallelism Example");
}
// Data cleaning logic
public static String cleanTransactionData(String transaction) {
return transaction.trim(); // Simple cleaning logic example
}
}
Code Explanation
- Global parallelism: Set via
env.setParallelism(8). This means unless explicitly set, all operators use 8 parallel instances by default - Kafka consumption parallelism: Set via
setParallelism(6)for reading from Kafka operation. This parallelism can be adjusted based on Kafka partition count. If Kafka has 6 partitions, setting parallelism to 6 is reasonable - Data transformation parallelism: Parallelism set to 12 (
setParallelism(12)), meaning cleaning task will start 12 parallel instances to process data simultaneously. This can improve data processing speed, but also need to ensure cluster has enough computing resources to support this parallelism - HDFS write parallelism: Set parallelism to 4 for writing to HDFS (
setParallelism(4)). Since HDFS write usually involves disk I/O operations, setting lower parallelism can avoid I/O contention
Flink Parallelism Setting Guide
Parallelism Priority Rules
Flink task parallelism setting follows clear priority order:
- Operator level - Directly set for specific operator in code (highest priority)
- Environment level - Set via
env.setParallelism() - Client level - Specify via parameters when submitting task
- System default level - Use default configuration in
flink-conf.yaml(lowest priority)
Parallelism Limitations
- Source parallelism limit: If data source (like Kafka specific partition or single file) doesn’t support parallel reading, even setting high parallelism won’t take effect
- Example: When reading single CSV file, parallelism can only be 1
Parallelism Adjustment Notes
-
Shuffle impact: Changing parallelism causes task re-partitioning, may trigger data shuffle
- Frequent adjustments increase network overhead
- May cause data skew problem
-
Recommended approach:
- Prefer dynamically specifying when submitting task (like
-pparameter) - In production environment, recommend controlling uniformly via
-pparameter
- Prefer dynamically specifying when submitting task (like
Core Concept Distinction
-
Slot:
- Static resource concept
- Represents actual concurrent execution capability of TaskManager
- Configuration:
taskmanager.numberOfTaskSlots
-
Parallelism:
- Dynamic execution concept
- Represents actual concurrent quantity used when program runs
- Can be dynamically adjusted via code or parameters
Best Practice Suggestions
- For batch processing jobs, can set different parallelism at different stages
- Stream processing jobs recommend maintaining stable parallelism settings
- Monitor resource usage after parallelism adjustments
- Consider downstream system processing capability (e.g., database connection pool size)