This is article 93 in the Big Data series, introducing Flink DataStream API core concepts and program structure.
Full illustrated version: CSDN Original | Juejin
DataStream API Overview
Flink program’s data processing flow consists of three core components:
1. DataSource (Data Source)
- This is the input source of the program, responsible for providing raw data stream for Flink jobs
- Can add data source via
StreamExecutionEnvironment.addSource()method - Supports multiple data source types:
- Message queues: Kafka, RabbitMQ, etc.
- File systems: HDFS, local files, etc.
- Databases: MySQL, PostgreSQL, etc.
- Custom data source: Implemented via SourceFunction
2. Transformation (Data Transformation)
- This is the core data processing环节, performing various calculation and transformation operations on data sources
- Common transformation operations include:
- Map: Transform each element (1:1 mapping)
- FlatMap: Transform one element to zero or multiple elements (1:N mapping)
- Filter: Filter data based on conditions
- KeyBy: Group by key for processing
- Window: Window operations based on time or count
- Supports complex operations like merging and splitting multiple data streams
- Transformation operations can be chained to form processing pipeline
3. Sink (Data Output)
- Responsible for outputting processed data to external systems
- Supports multiple output targets:
- Message systems: Kafka, RabbitMQ, etc.
- Database systems: MySQL, Elasticsearch, etc.
- File systems: HDFS, local files, etc.
- Custom output: Implemented via SinkFunction
Based on Files
readTextFile(path): Reads local file, file follows TextInputFormat line-by-line reading rules and returns.
If reading HDFS, need to add dependencies:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.9.2</version>
</dependency>
Based on Socket
socketTextStream: Reads data from Socket, elements can be separated by a delimiter.
Based on Collections
fromCollection Method Details
fromCollection is a method in Apache Flink DataStream API used to create data stream from Java Collection.
Basic Requirements
- All elements in the input Collection must be of the same type
- This method is typically used for local testing and small-scale dataset processing
POJO Type Recognition Conditions
Flink recognizes classes as POJO types if they meet the following conditions:
-
Class Definition Requirements
- Must be a public class and independent (cannot be non-static inner class)
- Must have a public no-argument constructor
-
Field Access Requirements
- All properties in the class and its parent class not modified by
staticortransientneed to meet one of:- Be public and not modified by final
- Include Getter and Setter methods following JavaBean naming conventions
- All properties in the class and its parent class not modified by
JavaBean Naming Convention Example
For a field named value, its access methods should be:
- Getter method:
public DataType getValue() - Setter method:
public void setValue(DataType value)
Application Scenario Example
// Define POJO class meeting requirements
public class SensorReading {
public String sensorId; // public field
private double temperature; // private field but has getter/setter
public SensorReading() {} // no-arg constructor
// Getter and Setter methods
public double getTemperature() {
return temperature;
}
public void setTemperature(double temperature) {
this.temperature = temperature;
}
}
// Use fromCollection to create data stream
List<SensorReading> readings = Arrays.asList(
new SensorReading("sensor1", 25.0),
new SensorReading("sensor2", 28.5)
);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<SensorReading> stream = env.fromCollection(readings);
Write Code
package icu.wzk;
public class StreamFromCollection {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
List<People> peopleList = new ArrayList<>();
peopleList.add(new People("wzk", 18));
peopleList.add(new People("icu", 15));
peopleList.add(new People("wzkicu", 10));
DataStreamSource<People> data = env.getJavaEnv().fromCollection(peopleList);
SingleOutputStreamOperator<People> filtered = data.filter(new FilterFunction<People>() {
@Override
public boolean filter(People value) throws Exception {
return value.getAge() > 15;
}
});
filtered.print();
env.execute("StreamFromCollection");
}
public static class People {
private String name;
private Integer age;
public People() {
}
public People(String name, Integer age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
}
}
toString
We can override People’s toString() method to print content:
@Override
public String toString() {
return "name: " + this.name + ", age: " + this.age;
}
Custom Input
Can use StreamExecutionEnvironment.addSource() to add a data source to the program.
Flink provides many pre-implemented source functions, but you can also write your own custom sources:
- Non-parallel source:
implements SourceFunction - Parallel source:
implements ParallelSourceFuctioninterface - Rich parallel source:
extends RichParallelSourceFunction
Kafka Connector
Add Dependency
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.11.1</version>
</dependency>
Write Code
package icu.wzk;
public class StreamFromKafka {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configuration
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "h121.wzk.icu:9092");
// Kafka
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
"flink_test",
new SimpleStringSchema(),
properties
);
DataStreamSource<String> data = env.getJavaEnv().addSource(consumer);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = data
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word: words) {
out.collect(new Tuple2<>(word, 1));
}
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne
.keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
@Override
public Object getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
})
.sum(1);
result.print();
env.execute("StreamFromKafka");
}
}
Start Kafka
cd /opt/servers/kafka_2.12-2.7.2/bin
./kafka-server-start.sh ../config/server.properties
Create Topic
cd /opt/servers/kafka_2.12-2.7.2/bin/
./kafka-topics.sh --create --zookeeper h121.wzk.icu:2181 --replication-factor 1 --partition 1 --topic flink_test
Produce Messages
cd /opt/servers/kafka_2.12-2.7.2/bin/
./kafka-console-producer.sh --bootstrap-server h121.wzk.icu:9092 --topic flink_test
# Wait for Java program to start, produce several messages
Run Results
Observing the console:
3> (hello,1)
5> (world,1)
3> (hello,2)
5> (world,2)
3> (hello,3)
3> (hello,4)
2> (hello!,1)
2> (hello!,2)
...
Flink Built-in Connectors
Flink also provides some built-in connectors:
| Connector Type | Description |
|---|---|
| Kafka | Message queue connector |
| Redis | Redis database connector |
| Elasticsearch | Search engine connector |
| RabbitMQ | Message queue connector |
| JDBC | Database connector |