Overview
Apache Flink’s DataSet API is the core programming interface for Flink batch processing, specifically designed for processing static, bounded datasets. Unlike stream processing (DataStream API), DataSet API targets data collections that already exist completely, suitable for processing batch data at TB or even PB scale.
Core Features
- Batch Optimization: DataSet API uses lazy execution and optimization strategies, the entire computation process is processed by optimizer before execution
- Memory Management: Flink implements its own memory management mechanism, can efficiently process large-scale datasets
- Rich Operators: Provides rich transformation operations like map, filter, join, groupBy, reduce
- Fault Tolerance: Checkpoint-based fault tolerance mechanism ensures job reliability
Typical Application Scenarios
-
ETL (Extract-Transform-Load)
- Extract data from relational databases
- Perform data cleaning and transformation
- Load into data warehouse or analysis system
-
Batch Data Analysis
- Execute complex aggregation calculations
- Generate reports and statistical analysis
- Large-scale graph computing
-
Machine Learning Preprocessing
- Feature engineering
- Dataset preparation
- Model evaluation
Comparison with DataStream API
| Feature | DataSet API | DataStream API |
|---|---|---|
| Data Processing Model | Batch | Stream |
| Data Boundary | Bounded data | Unbounded data |
| Execution Mode | One-time execution | Continuous execution |
| Typical Latency | Minutes to hours | Milliseconds to seconds |
| Main Application | Offline analysis | Real-time processing |
DataSource
1. Collection-based DataSource
// Create test dataset
List<String> testData = Arrays.asList("A", "B", "C", "D");
DataSet<String> dataSet = env.fromCollection(testData);
2. File-based DataSource
// Read text file from HDFS
DataSet<String> hdfsData = env.readTextFile("hdfs://namenode:8020/user/flink/input/data.txt");
DataSet Creation
// Read from local file
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.readTextFile("path/to/file");
// Read from CSV file
DataSet<Tuple3<Integer, String, Double>> csvData = env.readCsvFile("path/to/file.csv")
.types(Integer.class, String.class, Double.class);
// Create from collection
List<Tuple2<String, Integer>> data = Arrays.asList(
new Tuple2<>("Alice", 1),
new Tuple2<>("Bob", 2)
);
DataSet<Tuple2<String, Integer>> dataSet = env.fromCollection(data);
DataSet Transformation Operations
Map
DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);
DataSet<Integer> squaredNumbers = numbers.map(n -> n * n);
Filter
DataSet<Integer> evenNumbers = numbers.filter(n -> n % 2 == 0);
FlatMap
DataSet<String> lines = env.fromElements("hello world", "flink is great");
DataSet<String> words = lines.flatMap((line, collector) -> {
for (String word : line.split(" ")) {
collector.collect(word);
}
});
Reduce
DataSet<Integer> sum = numbers.reduce((n1, n2) -> n1 + n2);
GroupBy and Reduce
DataSet<Tuple2<String, Integer>> wordCounts = words
.map(word -> new Tuple2<>(word, 1))
.groupBy(0)
.reduce((t1, t2) -> new Tuple2<>(t1.f0, t1.f1 + t2.f1));
Join
DataSet<Tuple2<Integer, String>> persons = env.fromElements(
new Tuple2<>(1, "Alice"),
new Tuple2<>(2, "Bob")
);
DataSet<Tuple2<Integer, String>> cities = env.fromElements(
new Tuple2<>(1, "Berlin"),
new Tuple2<>(2, "Paris")
);
DataSet<Tuple2<String, String>> personWithCities = persons.join(cities)
.where(0)
.equalTo(0)
.with((p, c) -> new Tuple2<>(p.f1, c.f1));
DataSet Output
// Write to file
wordCounts.writeAsCsv("output/wordcounts.csv", "\n", ",");
// Print to console
wordCounts.print();
Detailed Batch Optimization Mechanism
DataSet API Optimization Principle
DataSet API provides comprehensive optimization mechanisms, mainly optimizing task execution through three aspects:
-
Cost Model Analysis: System evaluates resource consumption and performance of different execution paths including memory usage, CPU computation, network transmission cost, etc., and selects optimal plan.
-
Execution Plan Analysis: Through analysis of task DAG (Directed Acyclic Graph), identifies optimizable links like merging operations, reducing data transmission, etc.
Optimization Execution Process
Flink’s internal optimization process can be divided into several stages:
-
Logical Plan Generation: Compiler first converts user-defined transformation operations (like map, filter, join, etc.) into logical execution plan.
-
Logical Optimization:
- Predicate Pushdown: Filter data as early as possible
- Projection Pushdown: Reduce data volume to transmit
- Operation Merging: Merge consecutive map operations if any
-
Physical Plan Generation: Converts optimized logical plan to physical execution plan considering specific execution environment.
-
Physical Optimization:
- Local strategy selection
- Execution node allocation
- Parallelism adjustment
Actual Optimization Example
DataSet<Tuple2<String, Integer>> input = ...;
DataSet<Tuple2<String, Integer>> result = input
.filter(t -> t.f1 > 100) // Filter
.map(t -> new Tuple2<>(t.f0, t.f1 * 2)) // Transform
.groupBy(0) // Group
.sum(1); // Aggregate
Optimizer may perform following optimizations:
- Move filter operation forward as much as possible to reduce data volume for subsequent processing
- Merge consecutive map operations (if multiple maps exist)
- Select appropriate aggregation strategy for groupBy-sum operation
- Decide whether to use memory-based or disk-based aggregation based on data size
DataSet API Fault Tolerance Mechanism
Flink’s DataSet API provides fault tolerance mechanism, supports re-executing failed tasks when failures occur. Although DataSet API does not depend on Checkpoint mechanism like DataStream, its batch processing characteristic allows tasks to be re-executed from the beginning, ensuring data processing correctness.
Future of DataSet API
It should be noted that Flink’s official roadmap no longer prioritizes developing new features for DataSet API. Future main development will focus on DataStream API, even batch processing will be implemented through DataStream API.
Therefore, if possible, recommend new projects to use DataStream API to replace DataSet API. Especially Flink’s Table API and SQL API are also applicable to batch and stream processing, these high-level APIs provide more concise syntax and stronger optimization capabilities.