Stream Processing
Definition: Stream processing means real-time processing of continuously flowing data streams. Flink’s stream processing mode is very suitable for processing continuously generated data, such as data streams from sensors, logging systems, or financial transactions.
Core Concepts:
- Unbounded Data Stream: Stream processing typically processes unbounded data streams, meaning data streams have no clear endpoint and continuously generate
- Event Time: Flink supports event-time-based processing, can handle out-of-order and delayed data, making processing more accurate. Event time refers to time when data was generated at its source
- Window Operations: During stream processing, data usually needs to be grouped by time windows (like sliding windows, tumbling windows, session windows) to perform aggregation or other operations
- State Management: Flink supports stateful stream processing, meaning when processing each record, can remember previous data state. For example, calculate cumulative sum or frequency in a stream
Batch Processing
Definition: Batch processing processes static, bounded datasets. This processing is typically used for one-time large-scale data analysis, like regular business report generation, data transformation and loading tasks.
Core Concepts:
- Bounded Dataset: Batch processing typically processes bounded datasets, meaning dataset has fixed size with clear start and end points
- Task Parallelization: In batch processing mode, Flink divides dataset into multiple subtasks and executes these tasks in parallel to speed up processing
- DataSet API: Flink’s DataSet API provides a set of high-level operators for performing various operations on batch datasets, like map, filter, join, and aggregate
Word Count (Batch Data)
Requirements:
Count occurrences of each word in a file, output results to file
- Read data source
- Process data source
- Split each line in source file by space
- Append 1 to each split word
- Aggregate by word (group same words together)
- Sum same words (sum the 1 after each word)
- Save processed result
Import Dependencies:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink-test</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.11.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.11.1</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Write Code:
package icu.wzk;
public class WordCount {
public static void main(String[] args) throws Exception {
String inPath = "word-count/word-count.txt";
String outPath = "word-count/word-count-result.csv";
// Get Flink batch processing execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Read file content
DataSet<String> text = env.readTextFile(inPath);
// Process data
DataSet<Tuple2<String, Integer>> dataSet = text
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
for (String word : line.split(" ")) {
collector.collect(new Tuple2<>(word, 1));
}
}
})
.groupBy(0)
.sum(1);
dataSet
.writeAsCsv(outPath, "\n", " ", FileSystem.WriteMode.OVERWRITE)
.setParallelism(1);
// Trigger execution
env.execute("Word Count");
}
}
Word Count (Stream Data)
Requirements:
Socket simulates real-time sending of words, use Flink to receive data in real-time, aggregate statistics on data within specified time window (e.g., 5 seconds), calculate once every 1 second, and print results within time window.
Write Code
Server Part:
Write a Socket service to provide certain data stream.
package icu.wzk;
public class WordCountServer {
public static void main(String[] args) throws IOException, InterruptedException {
String ip = "localhost";
int port = 9999;
Random random = new Random();
ServerSocket serverSocket = new ServerSocket();
InetSocketAddress address = new InetSocketAddress(ip, port);
serverSocket.bind(address);
Socket socket = serverSocket.accept();
OutputStream outputStream = socket.getOutputStream();
PrintWriter writer = new PrintWriter(outputStream, true);
for (int i = 0; i < 1000; i ++) {
int number = random.nextInt(100);
System.out.println(number);
writer.println(number);
Thread.sleep((random.nextInt(900) + 100));
}
socket.close();
serverSocket.close();
}
}
Flink Part:
Connect to Server part above
package icu.wzk;
public class WordCount2 {
public static void main(String[] args) throws Exception {
String ip = "localhost";
int port = 9999;
// Get Flink execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Get Socket input data
DataStreamSource<String> textStream = env.socketTextStream(ip, port, "\n");
SingleOutputStreamOperator<Tuple2<String, Integer>> wordCount = textStream
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] splits = value.split("\\s");
for (String word : splits) {
out.collect(new Tuple2<>(word, 1));
}
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> word = wordCount
.keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
@Override
public Object getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
})
.timeWindow(Time.seconds(5), Time.seconds(1))
.sum(1);
// Output and run
word.print();
env.execute("Word Count");
}
}
Process Summary
- Get an execution environment
- Load, create, initialize environment
- Specify data operation operators
- Specify result data storage location
- Call Execute to trigger execution
Note: Flink programs are lazily evaluated. The program is only truly triggered when execute() method is called.