DataStream API Three Core Components
1. DataSource
- Input source for the program, provides raw data streams for Flink jobs
- Add data sources through StreamExecutionEnvironment.addSource() method
- Supports multiple data source types: Message queues (Kafka, RabbitMQ), File systems (HDFS, local files), Databases (MySQL, PostgreSQL), Custom data sources
2. Transformation
- Core of data processing, performs various calculations and transformation operations on data sources
- Common transformation operations: Map (1:1 mapping), FlatMap (1:N mapping), Filter (filtering), KeyBy (group by key), Window (window operations)
- Supports complex operations like merging and splitting multiple data streams
3. Sink
- Responsible for outputting processed data to external systems
- Supports output targets: Message systems (Kafka, RabbitMQ), Database systems (MySQL, Elasticsearch), File systems (HDFS, local files), Custom output
Based on Files
readTextFile(path): Reads local files, file follows TextInputFormat line-by-line reading rules and returns
Based on Socket
socketTextStream: Reads data from Socket, elements can be separated by a delimiter
Based on Collections
fromCollection method: Creates data stream from Java Collection
Kafka Connector
Add dependency:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.11.1</version>
</dependency>
Custom Input
- Non-parallel source: implements SourceFunction
- Parallel source: implements ParallelSourceFunction interface
- Rich parallel source: extends RichParallelSourceFunction
Complete Code Example (Read from Collection and Filter)
package icu.wzk;
public class StreamFromCollection {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
List<People> peopleList = new ArrayList<>();
peopleList.add(new People("wzk", 18));
peopleList.add(new People("icu", 15));
peopleList.add(new People("wzkicu", 10));
DataStreamSource<People> data = env.getJavaEnv().fromCollection(peopleList);
SingleOutputStreamOperator<People> filtered = data.filter(new FilterFunction<People>() {
@Override
public boolean filter(People value) throws Exception {
return value.getAge() > 15;
}
});
filtered.print();
env.execute("StreamFromCollection");
}
}
Complete Code Example (Read from Kafka and Process)
package icu.wzk;
public class StreamFromKafka {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "h121.wzk.icu:9092");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
"flink_test",
new SimpleStringSchema(),
properties
);
DataStreamSource<String> data = env.getJavaEnv().addSource(consumer);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = data
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word: words) {
out.collect(new Tuple2<>(word, 1));
}
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne
.keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
@Override
public Object getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
})
.sum(1);
result.print();
env.execute("StreamFromKafka");
}
}