本文是大数据系列第 93 篇,介绍 Flink DataStream API 的核心概念与程序结构。
DataStream API 概述
Flink程序的数据处理流程主要分为3个核心组成部分:
1. DataSource(数据源)
- 这是程序的输入来源,负责为Flink作业提供原始数据流
- 可以通过
StreamExecutionEnvironment.addSource()方法添加数据源 - 支持多种数据源类型:
- 消息队列:如Kafka、RabbitMQ等
- 文件系统:如HDFS、本地文件等
- 数据库:如MySQL、PostgreSQL等
- 自定义数据源:通过实现SourceFunction来实现
2. Transformation(数据转换)
- 这是数据处理的核心环节,对数据源进行各种计算和转换操作
- 常见转换操作包括:
- Map:对每个元素进行转换(1:1映射)
- FlatMap:将一个元素转换为零个或多个元素(1:N映射)
- Filter:根据条件过滤数据
- KeyBy:按key分组处理
- Window:基于时间或数量的窗口操作
- 支持多个数据流的合并、拆分等复杂操作
- 转换操作可以链式调用,形成处理流水线
3. Sink(数据输出)
- 负责将处理后的数据输出到外部系统
- 支持多种输出目标:
- 消息系统:如Kafka、RabbitMQ等
- 数据库系统:如MySQL、Elasticsearch等
- 文件系统:如HDFS、本地文件等
- 自定义输出:通过实现SinkFunction来实现
基于文件
readTextFile(path):读取本地文件,文件遵循TextInputFormat逐行读取规则并返回。
如果读取HDFS,需要添加依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.9.2</version>
</dependency>
基于Socket
socketTextStream:从Socket中读取数据,元素可以通过一个分割符号分开。
基于集合
fromCollection 方法详解
fromCollection是Apache Flink DataStream API中的一个方法,用于从Java的Collection集合创建数据流。
基本要求
- 输入的Collection集合中的所有元素必须是相同类型的
- 该方法通常用于本地测试和小规模数据集处理
POJO 类型识别条件
Flink会将满足以下条件的类识别为POJO类型:
-
类定义要求
- 必须是public类且是独立的(不能是非静态内部类)
- 必须有public的无参构造方法
-
字段访问要求
- 类及其父类中所有不被
static、transient修饰的属性需要满足以下条件之一:- 是public的且不被final修饰
- 包含遵循JavaBean命名规范的Getter和Setter方法
- 类及其父类中所有不被
JavaBean 命名规范示例
对于名为value的字段,其访问方法应为:
- Getter方法:
public DataType getValue() - Setter方法:
public void setValue(DataType value)
应用场景示例
// 定义符合要求的POJO类
public class SensorReading {
public String sensorId; // public字段
private double temperature; // 私有字段但有getter/setter
public SensorReading() {} // 无参构造
// Getter和Setter方法
public double getTemperature() {
return temperature;
}
public void setTemperature(double temperature) {
this.temperature = temperature;
}
}
// 使用fromCollection创建数据流
List<SensorReading> readings = Arrays.asList(
new SensorReading("sensor1", 25.0),
new SensorReading("sensor2", 28.5)
);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<SensorReading> stream = env.fromCollection(readings);
编写代码
package icu.wzk;
public class StreamFromCollection {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
List<People> peopleList = new ArrayList<>();
peopleList.add(new People("wzk", 18));
peopleList.add(new People("icu", 15));
peopleList.add(new People("wzkicu", 10));
DataStreamSource<People> data = env.getJavaEnv().fromCollection(peopleList);
SingleOutputStreamOperator<People> filtered = data.filter(new FilterFunction<People>() {
@Override
public boolean filter(People value) throws Exception {
return value.getAge() > 15;
}
});
filtered.print();
env.execute("StreamFromCollection");
}
public static class People {
private String name;
private Integer age;
public People() {
}
public People(String name, Integer age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
}
}
toString
我们可以通过重写 People 的 toString() 方法,来打印内容:
@Override
public String toString() {
return "name: " + this.name + ", age: " + this.age;
}
自定义输入
可以使用 StreamExecutionEnvironment.addSource()将一个数据源添加到程序中。
Flink提供了许多预先实现的源函数,但是也可以编写自己的自定义源:
- 非并行源:
implements SourceFunction - 并行源:
implements ParallelSourceFuction接口 - 富并行源:
extends RichParallelSourceFunction
Kafka 连接器
添加依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.11.1</version>
</dependency>
编写代码
package icu.wzk;
public class StreamFromKafka {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 配置信息
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "h121.wzk.icu:9092");
// Kafka
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
"flink_test",
new SimpleStringSchema(),
properties
);
DataStreamSource<String> data = env.getJavaEnv().addSource(consumer);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = data
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word: words) {
out.collect(new Tuple2<>(word, 1));
}
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne
.keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
@Override
public Object getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
})
.sum(1);
result.print();
env.execute("StreamFromKafka");
}
}
启动 Kafka
cd /opt/servers/kafka_2.12-2.7.2/bin
./kafka-server-start.sh ../config/server.properties
创建主题
cd /opt/servers/kafka_2.12-2.7.2/bin/
./kafka-topics.sh --create --zookeeper h121.wzk.icu:2181 --replication-factor 1 --partition 1 --topic flink_test
生产消息
cd /opt/servers/kafka_2.12-2.7.2/bin/
./kafka-console-producer.sh --bootstrap-server h121.wzk.icu:9092 --topic flink_test
# 我们等Java程序启动后,产生几条消息
运行结果
观察控制台可以看到:
3> (hello,1)
5> (world,1)
3> (hello,2)
5> (world,2)
3> (hello,3)
3> (hello,4)
2> (hello!,1)
2> (hello!,2)
...
Flink 内置 Connector
Flink也提供了一些内置的连接器,主要包括:
| 连接器类型 | 说明 |
|---|---|
| Kafka | 消息队列连接器 |
| Redis | Redis数据库连接器 |
| Elasticsearch | 搜索引擎连接器 |
| RabbitMQ | 消息队列连接器 |
| JDBC | 数据库连接器 |