Basic Concepts of Parallelism

In Apache Flink, parallelism refers to the number of parallel tasks that each operator can run simultaneously when executing. Flink program data flow graph consists of multiple operators (including Source, Transformation, and Sink), each operator can independently set its parallelism.

Importance of Parallelism

Parallelism is one of the key parameters for Flink performance tuning:

  • Determines resource usage of tasks in the cluster
  • Directly affects data processing throughput
  • Affects job fault tolerance and recovery speed

Parallelism Composition Structure

Flink program execution model can be decomposed into:

  1. Job: Complete Flink program
  2. Operator: Basic unit of data processing (e.g., map, filter, join, etc.)
  3. Task: Parallel execution instance of operator
  4. Subtask: Actual execution unit (thread)

Parallelism Setting Methods

1. Global Parallelism Setting

// Set in ExecutionConfig
env.setParallelism(4);
  • Affects default parallelism of all operators in the entire job
  • Usually set at job entry point

2. Operator-level Parallelism

DataStream<String> data = env.addSource(new CustomSource())
    .map(new MyMapper()).setParallelism(2)
    .filter(new MyFilter()).setParallelism(3);
  • Can set different parallelism from global for specific operators
  • Suitable for compute-intensive or I/O-intensive special operators

3. Configuration File Setting

In flink-conf.yaml:

parallelism.default: 3
  • Acts as cluster’s default parallelism
  • Used when not explicitly set in code

4. Client Submission Parameters

./bin/flink run -p 5 -c com.example.MyJob myJob.jar
  • Specified when submitting job via CLI
  • Overrides global parallelism setting in code

Best Practices for Parallelism Setting

Considerations

  1. Data volume: Large data usually requires higher parallelism
  2. Operator characteristics: Stateful operators like window may need special consideration
  3. Cluster resources: Should not exceed total available TaskManager slots
  4. Data skew: May need to adjust parallelism to balance load

Typical Scenario Examples

  • ETL pipeline: Source and Sink parallelism usually matches external system partition count
  • Stream analysis: Key operators may need parallelism 2-4 times CPU core count
  • Batch processing: Can set higher parallelism to speed up processing

Parallelism Levels

Global Parallelism

Global parallelism refers to parallelism default for all jobs and operators in Flink cluster. In configuration file flink-conf.yaml:

parallelism.default: 4

Job-level Parallelism

When submitting Flink job, set via -p parameter:

flink run -p 10 your-job.jar

Operator-level Parallelism

DataStream<String> stream = env.readTextFile("input.txt")
                               .map(new MyMapper())
                               .setParallelism(5);

Slot-level Parallelism

Each TaskManager can configure Slot count:

taskmanager.numberOfTaskSlots: 4

Parallelism Optimization Strategies

Set Reasonable Parallelism Based on Data Volume

For large data volume tasks, can increase parallelism to improve processing speed. However, higher parallelism is not always better. Excessive parallelism leads to resource waste and task scheduling overhead. Generally, recommended job parallelism should not exceed total available TaskManager slots.

Reasonably Allocate Operator Parallelism

Some operators, like reduce or aggregate after keyBy(), are limited by key count, so setting excessive parallelism for these operators doesn’t improve performance.

Use Resource Monitoring for Dynamic Tuning

During task runtime, can use Flink Web UI to monitor job running status. If certain operators have slow processing speed or low resource utilization, can consider adjusting those operators’ parallelism.

Consider Network and I/O Limitations

Flink job performance not only depends on CPU and memory, but also limited by network bandwidth and I/O speed. When processing large data, if job requires frequent network transmission or I/O operations, should avoid excessive parallelism causing network or disk I/O bottleneck.

Parallelism and Fault Tolerance

Flink supports fault tolerance. When tasks fail, Flink recovers based on savepoints (checkpoint). High parallelism jobs typically generate more checkpoint data, in some cases increasing job recovery overhead.

Code Example

public class FlinkParallelismExample {

    public static void main(String[] args) throws Exception {
        // 1. Create stream processing environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Set global parallelism (default parallelism)
        env.setParallelism(8);  // Global default parallelism is 8

        // 2. Configure Kafka consumer
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", "localhost:9092");
        kafkaProps.setProperty("group.id", "transaction-group");

        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(
                "transaction-topic", new SimpleStringSchema(), kafkaProps);

        // Set Kafka source parallelism
        DataStream<String> transactionStream = env
                .addSource(kafkaSource)
                .setParallelism(6); // Parallelism when reading from Kafka is 6

        // 3. Data transformation operation
        DataStream<String> cleanedData = transactionStream
                .map(value -> cleanTransactionData(value))
                .setParallelism(12); // Data cleaning parallelism is 12

        // 4. Write cleaned data to HDFS
        cleanedData
                .writeAsText("hdfs://namenode:8020/flink/cleaned_transactions/")
                .setParallelism(4);  // Write to HDFS parallelism is 4

        // 5. Start task
        env.execute("Flink Parallelism Example");
    }

    // Data cleaning logic
    public static String cleanTransactionData(String transaction) {
        return transaction.trim();  // Simple cleaning logic example
    }
}

Code Explanation

  • Global parallelism: Set via env.setParallelism(8). This means unless explicitly set, all operators use 8 parallel instances by default
  • Kafka consumption parallelism: Set via setParallelism(6) for reading from Kafka operation. This parallelism can be adjusted based on Kafka partition count. If Kafka has 6 partitions, setting parallelism to 6 is reasonable
  • Data transformation parallelism: Parallelism set to 12 (setParallelism(12)), meaning cleaning task will start 12 parallel instances to process data simultaneously. This can improve data processing speed, but also need to ensure cluster has enough computing resources to support this parallelism
  • HDFS write parallelism: Set parallelism to 4 for writing to HDFS (setParallelism(4)). Since HDFS write usually involves disk I/O operations, setting lower parallelism can avoid I/O contention

Parallelism Priority Rules

Flink task parallelism setting follows clear priority order:

  1. Operator level - Directly set for specific operator in code (highest priority)
  2. Environment level - Set via env.setParallelism()
  3. Client level - Specify via parameters when submitting task
  4. System default level - Use default configuration in flink-conf.yaml (lowest priority)

Parallelism Limitations

  • Source parallelism limit: If data source (like Kafka specific partition or single file) doesn’t support parallel reading, even setting high parallelism won’t take effect
  • Example: When reading single CSV file, parallelism can only be 1

Parallelism Adjustment Notes

  1. Shuffle impact: Changing parallelism causes task re-partitioning, may trigger data shuffle

    • Frequent adjustments increase network overhead
    • May cause data skew problem
  2. Recommended approach:

    • Prefer dynamically specifying when submitting task (like -p parameter)
    • In production environment, recommend controlling uniformly via -p parameter

Core Concept Distinction

  • Slot:

    • Static resource concept
    • Represents actual concurrent execution capability of TaskManager
    • Configuration: taskmanager.numberOfTaskSlots
  • Parallelism:

    • Dynamic execution concept
    • Represents actual concurrent quantity used when program runs
    • Can be dynamically adjusted via code or parameters

Best Practice Suggestions

  1. For batch processing jobs, can set different parallelism at different stages
  2. Stream processing jobs recommend maintaining stable parallelism settings
  3. Monitor resource usage after parallelism adjustments
  4. Consider downstream system processing capability (e.g., database connection pool size)