DataStream API Three Core Components

1. DataSource

  • Input source for the program, provides raw data streams for Flink jobs
  • Add data sources through StreamExecutionEnvironment.addSource() method
  • Supports multiple data source types: Message queues (Kafka, RabbitMQ), File systems (HDFS, local files), Databases (MySQL, PostgreSQL), Custom data sources

2. Transformation

  • Core of data processing, performs various calculations and transformation operations on data sources
  • Common transformation operations: Map (1:1 mapping), FlatMap (1:N mapping), Filter (filtering), KeyBy (group by key), Window (window operations)
  • Supports complex operations like merging and splitting multiple data streams

3. Sink

  • Responsible for outputting processed data to external systems
  • Supports output targets: Message systems (Kafka, RabbitMQ), Database systems (MySQL, Elasticsearch), File systems (HDFS, local files), Custom output

Based on Files

readTextFile(path): Reads local files, file follows TextInputFormat line-by-line reading rules and returns

Based on Socket

socketTextStream: Reads data from Socket, elements can be separated by a delimiter

Based on Collections

fromCollection method: Creates data stream from Java Collection

Kafka Connector

Add dependency:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.11.1</version>
</dependency>

Custom Input

  • Non-parallel source: implements SourceFunction
  • Parallel source: implements ParallelSourceFunction interface
  • Rich parallel source: extends RichParallelSourceFunction

Complete Code Example (Read from Collection and Filter)

package icu.wzk;

public class StreamFromCollection {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        List<People> peopleList = new ArrayList<>();
        peopleList.add(new People("wzk", 18));
        peopleList.add(new People("icu", 15));
        peopleList.add(new People("wzkicu", 10));

        DataStreamSource<People> data = env.getJavaEnv().fromCollection(peopleList);
        SingleOutputStreamOperator<People> filtered = data.filter(new FilterFunction<People>() {
            @Override
            public boolean filter(People value) throws Exception {
                return value.getAge() > 15;
            }
        });
        filtered.print();
        env.execute("StreamFromCollection");
    }
}

Complete Code Example (Read from Kafka and Process)

package icu.wzk;

public class StreamFromKafka {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "h121.wzk.icu:9092");

        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
                "flink_test",
                new SimpleStringSchema(),
                properties
        );
        DataStreamSource<String> data = env.getJavaEnv().addSource(consumer);

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = data
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        String[] words = value.split(" ");
                        for (String word: words) {
                            out.collect(new Tuple2<>(word, 1));
                        }
                    }
                });
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne
                .keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
                    @Override
                    public Object getKey(Tuple2<String, Integer> value) throws Exception {
                        return value.f0;
                    }
                })
                .sum(1);
        result.print();
        env.execute("StreamFromKafka");
    }
}