Overview

Apache Flink’s DataSet API is the core programming interface for Flink batch processing, specifically designed for processing static, bounded datasets. Unlike stream processing (DataStream API), DataSet API targets data collections that already exist completely, suitable for processing batch data at TB or even PB scale.

Core Features

  1. Batch Optimization: DataSet API uses lazy execution and optimization strategies, the entire computation process is processed by optimizer before execution
  2. Memory Management: Flink implements its own memory management mechanism, can efficiently process large-scale datasets
  3. Rich Operators: Provides rich transformation operations like map, filter, join, groupBy, reduce
  4. Fault Tolerance: Checkpoint-based fault tolerance mechanism ensures job reliability

Typical Application Scenarios

  1. ETL (Extract-Transform-Load)

    • Extract data from relational databases
    • Perform data cleaning and transformation
    • Load into data warehouse or analysis system
  2. Batch Data Analysis

    • Execute complex aggregation calculations
    • Generate reports and statistical analysis
    • Large-scale graph computing
  3. Machine Learning Preprocessing

    • Feature engineering
    • Dataset preparation
    • Model evaluation

Comparison with DataStream API

FeatureDataSet APIDataStream API
Data Processing ModelBatchStream
Data BoundaryBounded dataUnbounded data
Execution ModeOne-time executionContinuous execution
Typical LatencyMinutes to hoursMilliseconds to seconds
Main ApplicationOffline analysisReal-time processing

DataSource

1. Collection-based DataSource

// Create test dataset
List<String> testData = Arrays.asList("A", "B", "C", "D");
DataSet<String> dataSet = env.fromCollection(testData);

2. File-based DataSource

// Read text file from HDFS
DataSet<String> hdfsData = env.readTextFile("hdfs://namenode:8020/user/flink/input/data.txt");

DataSet Creation

// Read from local file
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.readTextFile("path/to/file");

// Read from CSV file
DataSet<Tuple3<Integer, String, Double>> csvData = env.readCsvFile("path/to/file.csv")
    .types(Integer.class, String.class, Double.class);

// Create from collection
List<Tuple2<String, Integer>> data = Arrays.asList(
    new Tuple2<>("Alice", 1),
    new Tuple2<>("Bob", 2)
);
DataSet<Tuple2<String, Integer>> dataSet = env.fromCollection(data);

DataSet Transformation Operations

Map

DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);
DataSet<Integer> squaredNumbers = numbers.map(n -> n * n);

Filter

DataSet<Integer> evenNumbers = numbers.filter(n -> n % 2 == 0);

FlatMap

DataSet<String> lines = env.fromElements("hello world", "flink is great");
DataSet<String> words = lines.flatMap((line, collector) -> {
    for (String word : line.split(" ")) {
        collector.collect(word);
    }
});

Reduce

DataSet<Integer> sum = numbers.reduce((n1, n2) -> n1 + n2);

GroupBy and Reduce

DataSet<Tuple2<String, Integer>> wordCounts = words
    .map(word -> new Tuple2<>(word, 1))
    .groupBy(0)
    .reduce((t1, t2) -> new Tuple2<>(t1.f0, t1.f1 + t2.f1));

Join

DataSet<Tuple2<Integer, String>> persons = env.fromElements(
    new Tuple2<>(1, "Alice"),
    new Tuple2<>(2, "Bob")
);
DataSet<Tuple2<Integer, String>> cities = env.fromElements(
    new Tuple2<>(1, "Berlin"),
    new Tuple2<>(2, "Paris")
);
DataSet<Tuple2<String, String>> personWithCities = persons.join(cities)
    .where(0)
    .equalTo(0)
    .with((p, c) -> new Tuple2<>(p.f1, c.f1));

DataSet Output

// Write to file
wordCounts.writeAsCsv("output/wordcounts.csv", "\n", ",");

// Print to console
wordCounts.print();

Detailed Batch Optimization Mechanism

DataSet API Optimization Principle

DataSet API provides comprehensive optimization mechanisms, mainly optimizing task execution through three aspects:

  1. Cost Model Analysis: System evaluates resource consumption and performance of different execution paths including memory usage, CPU computation, network transmission cost, etc., and selects optimal plan.

  2. Execution Plan Analysis: Through analysis of task DAG (Directed Acyclic Graph), identifies optimizable links like merging operations, reducing data transmission, etc.

Optimization Execution Process

Flink’s internal optimization process can be divided into several stages:

  1. Logical Plan Generation: Compiler first converts user-defined transformation operations (like map, filter, join, etc.) into logical execution plan.

  2. Logical Optimization:

    • Predicate Pushdown: Filter data as early as possible
    • Projection Pushdown: Reduce data volume to transmit
    • Operation Merging: Merge consecutive map operations if any
  3. Physical Plan Generation: Converts optimized logical plan to physical execution plan considering specific execution environment.

  4. Physical Optimization:

    • Local strategy selection
    • Execution node allocation
    • Parallelism adjustment

Actual Optimization Example

DataSet<Tuple2<String, Integer>> input = ...;
DataSet<Tuple2<String, Integer>> result = input
    .filter(t -> t.f1 > 100)          // Filter
    .map(t -> new Tuple2<>(t.f0, t.f1 * 2))  // Transform
    .groupBy(0)                       // Group
    .sum(1);                          // Aggregate

Optimizer may perform following optimizations:

  1. Move filter operation forward as much as possible to reduce data volume for subsequent processing
  2. Merge consecutive map operations (if multiple maps exist)
  3. Select appropriate aggregation strategy for groupBy-sum operation
  4. Decide whether to use memory-based or disk-based aggregation based on data size

DataSet API Fault Tolerance Mechanism

Flink’s DataSet API provides fault tolerance mechanism, supports re-executing failed tasks when failures occur. Although DataSet API does not depend on Checkpoint mechanism like DataStream, its batch processing characteristic allows tasks to be re-executed from the beginning, ensuring data processing correctness.

Future of DataSet API

It should be noted that Flink’s official roadmap no longer prioritizes developing new features for DataSet API. Future main development will focus on DataStream API, even batch processing will be implemented through DataStream API.

Therefore, if possible, recommend new projects to use DataStream API to replace DataSet API. Especially Flink’s Table API and SQL API are also applicable to batch and stream processing, these high-level APIs provide more concise syntax and stronger optimization capabilities.