This is article 93 in the Big Data series, introducing Flink DataStream API core concepts and program structure.

Full illustrated version: CSDN Original | Juejin

DataStream API Overview

Flink program’s data processing flow consists of three core components:

1. DataSource (Data Source)

  • This is the input source of the program, responsible for providing raw data stream for Flink jobs
  • Can add data source via StreamExecutionEnvironment.addSource() method
  • Supports multiple data source types:
    • Message queues: Kafka, RabbitMQ, etc.
    • File systems: HDFS, local files, etc.
    • Databases: MySQL, PostgreSQL, etc.
    • Custom data source: Implemented via SourceFunction

2. Transformation (Data Transformation)

  • This is the core data processing环节, performing various calculation and transformation operations on data sources
  • Common transformation operations include:
    • Map: Transform each element (1:1 mapping)
    • FlatMap: Transform one element to zero or multiple elements (1:N mapping)
    • Filter: Filter data based on conditions
    • KeyBy: Group by key for processing
    • Window: Window operations based on time or count
  • Supports complex operations like merging and splitting multiple data streams
  • Transformation operations can be chained to form processing pipeline

3. Sink (Data Output)

  • Responsible for outputting processed data to external systems
  • Supports multiple output targets:
    • Message systems: Kafka, RabbitMQ, etc.
    • Database systems: MySQL, Elasticsearch, etc.
    • File systems: HDFS, local files, etc.
    • Custom output: Implemented via SinkFunction

Based on Files

readTextFile(path): Reads local file, file follows TextInputFormat line-by-line reading rules and returns.

If reading HDFS, need to add dependencies:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-hadoop-compatibility_2.11</artifactId>
  <version>1.11.1</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-common</artifactId>
  <version>2.9.2</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-hdfs</artifactId>
  <version>2.9.2</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>2.9.2</version>
</dependency>

Based on Socket

socketTextStream: Reads data from Socket, elements can be separated by a delimiter.

Based on Collections

fromCollection Method Details

fromCollection is a method in Apache Flink DataStream API used to create data stream from Java Collection.

Basic Requirements

  • All elements in the input Collection must be of the same type
  • This method is typically used for local testing and small-scale dataset processing

POJO Type Recognition Conditions

Flink recognizes classes as POJO types if they meet the following conditions:

  1. Class Definition Requirements

    • Must be a public class and independent (cannot be non-static inner class)
    • Must have a public no-argument constructor
  2. Field Access Requirements

    • All properties in the class and its parent class not modified by static or transient need to meet one of:
      • Be public and not modified by final
      • Include Getter and Setter methods following JavaBean naming conventions

JavaBean Naming Convention Example

For a field named value, its access methods should be:

  • Getter method: public DataType getValue()
  • Setter method: public void setValue(DataType value)

Application Scenario Example

// Define POJO class meeting requirements
public class SensorReading {
    public String sensorId;  // public field
    private double temperature;  // private field but has getter/setter

    public SensorReading() {}  // no-arg constructor

    // Getter and Setter methods
    public double getTemperature() {
        return temperature;
    }

    public void setTemperature(double temperature) {
        this.temperature = temperature;
    }
}

// Use fromCollection to create data stream
List<SensorReading> readings = Arrays.asList(
    new SensorReading("sensor1", 25.0),
    new SensorReading("sensor2", 28.5)
);

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<SensorReading> stream = env.fromCollection(readings);

Write Code

package icu.wzk;

public class StreamFromCollection {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        List<People> peopleList = new ArrayList<>();
        peopleList.add(new People("wzk", 18));
        peopleList.add(new People("icu", 15));
        peopleList.add(new People("wzkicu", 10));

        DataStreamSource<People> data = env.getJavaEnv().fromCollection(peopleList);
        SingleOutputStreamOperator<People> filtered = data.filter(new FilterFunction<People>() {
            @Override
            public boolean filter(People value) throws Exception {
                return value.getAge() > 15;
            }
        });
        filtered.print();
        env.execute("StreamFromCollection");
    }

    public static class People {

        private String name;
        private Integer age;

        public People() {

        }

        public People(String name, Integer age) {
            this.name = name;
            this.age = age;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public Integer getAge() {
            return age;
        }

        public void setAge(Integer age) {
            this.age = age;
        }
    }
}

toString

We can override People’s toString() method to print content:

@Override
public String toString() {
    return "name: " + this.name + ", age: " + this.age;
}

Custom Input

Can use StreamExecutionEnvironment.addSource() to add a data source to the program.

Flink provides many pre-implemented source functions, but you can also write your own custom sources:

  • Non-parallel source: implements SourceFunction
  • Parallel source: implements ParallelSourceFuction interface
  • Rich parallel source: extends RichParallelSourceFunction

Kafka Connector

Add Dependency

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.11.1</version>
</dependency>

Write Code

package icu.wzk;

public class StreamFromKafka {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Configuration
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "h121.wzk.icu:9092");

        // Kafka
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
                "flink_test",
                new SimpleStringSchema(),
                properties
        );
        DataStreamSource<String> data = env.getJavaEnv().addSource(consumer);

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = data
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        String[] words = value.split(" ");
                        for (String word: words) {
                            out.collect(new Tuple2<>(word, 1));
                        }
                    }
                });
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne
                .keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
                    @Override
                    public Object getKey(Tuple2<String, Integer> value) throws Exception {
                        return value.f0;
                    }
                })
                .sum(1);
        result.print();
        env.execute("StreamFromKafka");
    }
}

Start Kafka

cd /opt/servers/kafka_2.12-2.7.2/bin
./kafka-server-start.sh ../config/server.properties

Create Topic

cd /opt/servers/kafka_2.12-2.7.2/bin/
./kafka-topics.sh --create --zookeeper h121.wzk.icu:2181 --replication-factor 1 --partition 1 --topic flink_test

Produce Messages

cd /opt/servers/kafka_2.12-2.7.2/bin/
./kafka-console-producer.sh --bootstrap-server h121.wzk.icu:9092 --topic flink_test
# Wait for Java program to start, produce several messages

Run Results

Observing the console:

3> (hello,1)
5> (world,1)
3> (hello,2)
5> (world,2)
3> (hello,3)
3> (hello,4)
2> (hello!,1)
2> (hello!,2)
...

Flink also provides some built-in connectors:

Connector TypeDescription
KafkaMessage queue connector
RedisRedis database connector
ElasticsearchSearch engine connector
RabbitMQMessage queue connector
JDBCDatabase connector