DataStream API 三大核心组成部分
1. DataSource(数据源)
- 程序的输入来源,负责为Flink作业提供原始数据流
- 通过StreamExecutionEnvironment.addSource()方法添加数据源
- 支持多种数据源类型:消息队列(Kafka、RabbitMQ)、文件系统(HDFS、本地文件)、数据库(MySQL、PostgreSQL)、自定义数据源
2. Transformation(数据转换)
- 数据处理的核心环节,对数据源进行各种计算和转换操作
- 常见转换操作:Map(1:1映射)、FlatMap(1:N映射)、Filter(过滤)、KeyBy(按key分组)、Window(窗口操作)
- 支持多个数据流的合并、拆分等复杂操作
3. Sink(数据输出)
- 负责将处理后的数据输出到外部系统
- 支持输出目标:消息系统(Kafka、RabbitMQ)、数据库系统(MySQL、Elasticsearch)、文件系统(HDFS、本地文件)、自定义输出
基于文件
readTextFile(path):读取本地文件,文件遵循TextInputFormat逐行读取规则并返回
基于Socket
socketTextStream:从Socket中读取数据,元素可以通过一个分割符号分开。
基于集合
fromCollection方法:从Java的Collection集合创建数据流
Kafka连接器
需要添加依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.11.1</version>
</dependency>
自定义输入
- 非并行源:implements SourceFunction
- 并行源:implements ParallelSourceFunction接口
- 富并行源:extends RichParallelSourceFunction
完整代码示例(从集合读取并过滤)
package icu.wzk;
public class StreamFromCollection {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
List<People> peopleList = new ArrayList<>();
peopleList.add(new People("wzk", 18));
peopleList.add(new People("icu", 15));
peopleList.add(new People("wzkicu", 10));
DataStreamSource<People> data = env.getJavaEnv().fromCollection(peopleList);
SingleOutputStreamOperator<People> filtered = data.filter(new FilterFunction<People>() {
@Override
public boolean filter(People value) throws Exception {
return value.getAge() > 15;
}
});
filtered.print();
env.execute("StreamFromCollection");
}
}
完整代码示例(从Kafka读取并处理)
package icu.wzk;
public class StreamFromKafka {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "h121.wzk.icu:9092");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
"flink_test",
new SimpleStringSchema(),
properties
);
DataStreamSource<String> data = env.getJavaEnv().addSource(consumer);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = data
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word: words) {
out.collect(new Tuple2<>(word, 1));
}
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne
.keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
@Override
public Object getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
})
.sum(1);
result.print();
env.execute("StreamFromKafka");
}
}