This is article 94 in the Big Data series, comprehensively introducing Flink Window mechanism and Watermark.
Full illustrated version: CSDN Original | Juejin
Flink Window Background
Flink considers Batch as a special case of Streaming—this design philosophy stems from the unified stream-batch architecture. Specifically, Flink treats batch processing as a special case of stream processing, namely bounded stream processing. This is fundamentally different from traditional batch processing frameworks like Hadoop MapReduce.
Flink’s underlying engine is a highly optimized streaming engine with core components:
- Distributed runtime environment
- State management backend
- Checkpoint mechanism
- Time processing model
On this streaming engine foundation, Flink implements two processing modes:
- Streaming: Process unbounded streams
- Batch: Process bounded streams
Window mechanism is indeed an important bridge from Streaming to Batch, its role mainly reflects in:
- Time Window: Including tumbling window, sliding window, and session window
- Count Window: Window based on element count
- Global Window: Special window for batch processing
Window is a mechanism to set a finite set for an infinite stream, enabling operations on bounded datasets. The collection on the stream is defined by Window.
Window can be driven by time (TimeWindow) such as 30 seconds or data (CountWindow) such as 100 elements.
Flink Window Overview
Basic Concepts
- Window is the core of Flink’s processing of infinite streams—Windows split streams into finite-sized “buckets” where we can apply calculations
- Flink considers Batch as a special case of Streaming, so Flink’s underlying engine is a streaming engine, implementing both streaming and batch processing on top
- Window is a bridge from Streaming to Batch
- Flink provides a complete window mechanism
- In stream processing, data is continuous, so we cannot wait for all data to arrive before processing
- Of course we can process each message as it arrives, but sometimes we need to do aggregation operations
- In this case, we must define a window to collect data within the last minute and calculate on this window’s data
- Windows can be time-driven or event-driven
- Similarly, based on different event-driven: Tumbling Window (no overlap), Sliding Window (with overlap), Session Window (activity gap), Global Window
- To operate on windows in Flink, first convert StreamSource to WindowedStream
Transformation Steps
- Get stream data source
- Get window
- Operate on window data
- Output window data
Tumbling Time Window
Type Characteristics
When splitting data according to fixed window length, follow these detailed steps and precautions:
-
Time Alignment Processing
- First ensure data timestamps are continuous and equally spaced
- For missing time points, interpolation (linear, spline) or filling (forward, backward) can be used
- Example: For 5-minute interval sensor data, if 10:15 data point is missing, use 10:10 and 10:20 for linear interpolation
-
Fixed Window Length Splitting
- Determine window size based on business needs (e.g., 1 hour, 1 day)
- Split data strictly by window size from start time point
- Each window’s data volume depends on collection frequency
- Example: For data sampled per second, 1-hour window contains 3600 data points
-
No Overlap Processing
- Adjacent windows strictly separated, no shared data points
- Ensure each data point belongs to only one window
- Window boundary handling: Use left-closed right-open interval [t, t+window_size)
-
Application Scenarios
- Time series analysis (e.g., stock price analysis)
- Periodic data statistics (e.g., daily user activity statistics)
- Device monitoring (e.g., hourly machine running status monitoring)
-
Implementation Precautions
- Boundary data: How to handle last incomplete window (discard or process separately)
- Computation efficiency: Consider parallel processing for large data volumes
- Data consistency: Ensure window splitting doesn’t change data’s statistical characteristics
Flink’s Tumbling Window is a common time-based window mechanism that can perform calculations driven by events. The characteristic of tumbling window is that time windows have fixed length, no overlap between windows, each event can only enter one window.
In Flink, tumbling time windows can be defined based on Event Time or Processing Time. To drive based on event time, use EventTimeSessionWindows or TumblingEventTimeWindows for definition.
Key Points
- Event Time and Watermark: Use assignTimestampsAndWatermarks to specify event time, and use watermarks to ensure window calculations don’t miss delayed events
- Window Definition: Use TumblingEventTimeWindows.of(Time.seconds(x)) to define tumbling window. Window length is x seconds
- Trigger: Use EventTimeTrigger to trigger calculation, ensuring window is based on event time
Time-Driven Based
Scenario: We need to count total purchases per minute for each user, need to split user behavior events per minute—this is called Tumbling Time Window.
Main class:
package icu.wzk;
public class TumblingWindow {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> dataStreamSource = env.getJavaEnv().socketTextStream("localhost", 9999);
SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long timeMillis = System.currentTimeMillis();
int random = new Random().nextInt(10);
System.out.println("value: " + value + ", random: " + random + ", time: " + format.format(timeMillis));
return Tuple2.of(value, random);
}
});
KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream
.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple>() {
@Override
public Tuple getKey(Tuple2<String, Integer> value) throws Exception {
return Tuple1.of(value.f0);
}
});
// Time-driven, split window every 10 seconds
WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyedStream
.timeWindow(Time.seconds(10));
timeWindow.apply(new MyTimeWindowFunction()).print();
env.execute("TumblingWindow");
}
}
Implement MyTimeWindowFunction for tumbling time window:
package icu.wzk;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
public class MyTimeWindowFunction implements WindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow> {
/**
* Scenario: We need to count total purchases per minute for each user, split user behavior events per minute - called Tumbling Time Window
* @author wzk
* @date 16:58 2024/7/26
**/
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
int sum = 0;
for (Tuple2<String, Integer> tuple2 : input) {
sum += tuple2.f1;
}
out.collect("key: " + tuple.getField(0) + ", value: " + sum +
", window start: " + format.format(window.getStart()) + ", window end: " + format.format(window.getEnd()));
}
}
Event-Driven Based
Scenario: When we want 100 user purchases as the driver, whenever the window is filled with 100 “same” elements, calculate the window.
Write a startup class:
package icu.wzk;
public class TumblingWindow {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> dataStreamSource = env.getJavaEnv().socketTextStream("localhost", 9999);
SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long timeMillis = System.currentTimeMillis();
int random = new Random().nextInt(10);
System.out.println("value: " + value + ", random: " + random + ", time: " + format.format(timeMillis));
return Tuple2.of(value, random);
}
});
KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream
.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple>() {
@Override
public Tuple getKey(Tuple2<String, Integer> value) throws Exception {
return Tuple1.of(value.f0);
}
});
// Event-driven, window filled every 3 elements
WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> globalWindow = keyedStream
.countWindow(3);
globalWindow.apply(new MyCountWindowFuntion());
env.execute("TumblingWindow");
}
}
Write event-driven class: MyCountWindowFuntion
package icu.wzk;
public class MyCountWindowFuntion implements WindowFunction<Tuple2<String, Integer>, String, Tuple, GlobalWindow> {
/**
* Scenario: When we want 100 user purchases as the driver, whenever window is filled with 100 "same" elements, calculate the window
* @author wzk
* @date 17:11 2024/7/26
**/
@Override
public void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
int sum = 0;
for (Tuple2<String, Integer> tuple2 : input) {
sum += tuple2.f1;
}
// Unused timestamp: default is Long.MAX_VALUE, for count-based event-driven, don't care about time
long maxTimestamp = window.maxTimestamp();
out.collect("key:" + tuple.getField(0) + ", value: " + sum + ", maxTimestamp :"
+ maxTimestamp + "," + format.format(maxTimestamp));
}
}