Flink Window Background
Flink considers Batch as a special case of Streaming. This design philosophy originates from the unified stream-batch architecture. Specifically, Flink treats batch processing as a special case of stream processing, namely bounded stream processing.
Flink’s underlying engine is a highly optimized streaming engine, core components include:
- Distributed runtime environment
- State management backend
- Checkpoint mechanism
- Time processing model
Window mechanism is indeed an important bridge from Streaming to Batch, its role mainly reflects in:
- Time Window: Includes tumbling window, sliding window and session window
- Count Window: Window based on element count
- Global Window: Special window suitable for batch processing
Basic Concepts
Window is the core of Flink’s infinite stream processing. Windows split streams into finite-sized “buckets” on which we can apply computations.
Windows can be time-driven or event-driven, and based on different event driving can be divided into:
- Tumbling Window (no overlap)
- Sliding Window (with overlap)
- Session Window (activity gap)
- Global Window
Tumbling Time Window
Tumbling time window is characterized by fixed-length time windows with no overlap between windows, each event can only enter one window.
Time-driven
Scenario: We need to count total items purchased by users per minute, need to split user behavior events by each minute.
package icu.wzk;
public class TumblingWindow {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> dataStreamSource = env.getJavaEnv().socketTextStream("localhost", 9999);
SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long timeMillis = System.currentTimeMillis();
int random = new Random().nextInt(10);
System.out.println("value: " + value + ", random: " + random + ", time: " + format.format(timeMillis));
return Tuple2.of(value, random);
}
});
KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream
.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple>() {
@Override
public Tuple getKey(Tuple2<String, Integer> value) throws Exception {
return Tuple1.of(value.f0);
}
});
// Time-driven: divide a window every 10 seconds
WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyedStream
.timeWindow(Time.seconds(10));
timeWindow.apply(new MyTimeWindowFunction()).print();
env.execute("TumblingWindow");
}
}
Implement WindowFunction:
package icu.wzk;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
public class MyTimeWindowFunction implements WindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow> {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
int sum = 0;
for (Tuple2<String, Integer> tuple2 : input) {
sum += tuple2.f1;
}
out.collect("key: " + tuple.getField(0) + ", value: " + sum +
", window start: " + format.format(window.getStart()) + ", window end: " + format.format(window.getEnd()));
}
}
Event-driven
Scenario: When we want every 100 users’ purchase behavior as the driver, then whenever the window is filled with 100 “same” elements, the window will be computed.
// Event-driven: divide a window every 3 elements
WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> globalWindow = keyedStream
.countWindow(3);
globalWindow.apply(new MyCountWindowFuntion());
Event-driven WindowFunction:
package icu.wzk;
public class MyCountWindowFuntion implements WindowFunction<Tuple2<String, Integer>, String, Tuple, GlobalWindow> {
@Override
public void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
int sum = 0;
for (Tuple2<String, Integer> tuple2 : input) {
sum += tuple2.f1;
}
long maxTimestamp = window.maxTimestamp();
out.collect("key:" + tuple.getField(0) + ", value: " + sum + ", maxTimestamp :"
+ maxTimestamp + "," + format.format(maxTimestamp));
}
}
Key Points Summary
| Feature | Description |
|---|---|
| Window Type | Tumbling Window, Sliding Window, Session Window |
| Driving Method | Time-driven (TimeWindow), Event-driven (CountWindow) |
| Time Semantics | Event Time, Processing Time |
| No Overlap | Tumbling windows do not overlap |