DataStream API 三大核心组成部分

1. DataSource(数据源)

  • 程序的输入来源,负责为Flink作业提供原始数据流
  • 通过StreamExecutionEnvironment.addSource()方法添加数据源
  • 支持多种数据源类型:消息队列(Kafka、RabbitMQ)、文件系统(HDFS、本地文件)、数据库(MySQL、PostgreSQL)、自定义数据源

2. Transformation(数据转换)

  • 数据处理的核心环节,对数据源进行各种计算和转换操作
  • 常见转换操作:Map(1:1映射)、FlatMap(1:N映射)、Filter(过滤)、KeyBy(按key分组)、Window(窗口操作)
  • 支持多个数据流的合并、拆分等复杂操作

3. Sink(数据输出)

  • 负责将处理后的数据输出到外部系统
  • 支持输出目标:消息系统(Kafka、RabbitMQ)、数据库系统(MySQL、Elasticsearch)、文件系统(HDFS、本地文件)、自定义输出

基于文件

readTextFile(path):读取本地文件,文件遵循TextInputFormat逐行读取规则并返回

基于Socket

socketTextStream:从Socket中读取数据,元素可以通过一个分割符号分开。

基于集合

fromCollection方法:从Java的Collection集合创建数据流

Kafka连接器

需要添加依赖:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.11.1</version>
</dependency>

自定义输入

  • 非并行源:implements SourceFunction
  • 并行源:implements ParallelSourceFunction接口
  • 富并行源:extends RichParallelSourceFunction

完整代码示例(从集合读取并过滤)

package icu.wzk;

public class StreamFromCollection {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        List<People> peopleList = new ArrayList<>();
        peopleList.add(new People("wzk", 18));
        peopleList.add(new People("icu", 15));
        peopleList.add(new People("wzkicu", 10));

        DataStreamSource<People> data = env.getJavaEnv().fromCollection(peopleList);
        SingleOutputStreamOperator<People> filtered = data.filter(new FilterFunction<People>() {
            @Override
            public boolean filter(People value) throws Exception {
                return value.getAge() > 15;
            }
        });
        filtered.print();
        env.execute("StreamFromCollection");
    }
}

完整代码示例(从Kafka读取并处理)

package icu.wzk;

public class StreamFromKafka {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "h121.wzk.icu:9092");

        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
                "flink_test",
                new SimpleStringSchema(),
                properties
        );
        DataStreamSource<String> data = env.getJavaEnv().addSource(consumer);

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = data
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        String[] words = value.split(" ");
                        for (String word: words) {
                            out.collect(new Tuple2<>(word, 1));
                        }
                    }
                });
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne
                .keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
                    @Override
                    public Object getKey(Tuple2<String, Integer> value) throws Exception {
                        return value.f0;
                    }
                })
                .sum(1);
        result.print();
        env.execute("StreamFromKafka");
    }
}