Watermark
Role of Watermark in Window Computation
When using event-time based windows, Flink relies on Watermark to decide when to trigger window computation. Watermark mechanism is a core component for Flink to handle out-of-order events. It is essentially a special timestamp, indicating that events before that time point should have all arrived.
For example, for each 10-second tumbling window (like [00:00-00:10), [00:10-00:20), etc.):
- When Watermark timestamp reaches or exceeds 00:10
- Flink considers all events in the 00:00-00:10 window have arrived (allowing certain lateness)
- At this time, triggers computation for that window
Watermark mechanism working includes several key points:
- Out-of-Order Processing: Allows events to arrive late, Watermark will wait for possible late events based on maximum lateness setting
- Trigger Condition: Only triggers computation when Watermark >= window end time
- Late Tolerance: Balance computation latency and result accuracy by setting appropriate Watermark interval and maximum lateness time
Typical application scenarios include:
- IoT sensor data collection (devices may have network latency)
- User behavior log analysis (users in different regions have inconsistent data arrival times)
- Financial transaction monitoring (need to handle out-of-order transaction records caused by network latency)
Example configuration:
DataStream<T> stream = ...;
stream.assignTimestampsAndWatermarks(
WatermarkStrategy
.<T>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(...)
);
This configuration allows up to 5 seconds of lateness. Events that still haven’t arrived after 5 seconds will be treated as late data.
Assume there is a 10-second window and Watermark reaches 12:00:10. At this time, Flink will trigger window computation for 12:00:00 - 12:00:10.
How to Handle Late Events
Although Watermark can effectively solve out-of-order problems, there may always be cases where events arrive after Watermark is generated (i.e., “late events”). For this, Flink provides mechanisms to handle late events:
- Allow certain late processing: Can configure allowed lateness for windows
- Side Output for late events: Can send late events to a side output stream for subsequent processing
DataStream<Tuple2<String, Integer>> mainStream =
stream.keyBy(t -> t.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.allowedLateness(Time.seconds(5))
.sideOutputLateData(lateOutputTag);
Code Implementation
Data Format
01,1586489575000
01,1586489576000
01,1586489577000
01,1586489578000
01,1586489579000
Write Code
This code implements:
- Get real-time stream data through socket
- Map stream data to tuples with timestamps
- Apply a Watermark strategy allowing 5 seconds out-of-orderness, ensuring Flink can handle out-of-order event streams
- Group by event key and perform 5-second tumbling window computation based on event time
- Finally output each window’s event time range, window start and end time, etc.
Here, stream data is grouped by key (first field of event), and uses Tumbling Window with window length of 5 seconds. In the apply method, collect all events in the window, sort by event timestamp, then output window start and end time, and earliest and latest event timestamps in the window.
SingleOutputStreamOperator<String> res = waterMark
.keyBy(new KeySelector<Tuple2<String, Long>, String>() {
@Override
public String getKey(Tuple2<String, Long> value) throws Exception {
return value.f0;
}
})
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new WindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
@Override
public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
List<Long> list = new ArrayList<>();
for (Tuple2<String, Long> next : input) {
list.add(next.f1);
}
Collections.sort(list);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String result = "key: " + s + ", list.size(): " + list.size() + ", list.get(0): " + sdf.format(list.get(0)) + ", list.get(last): " + sdf.format(list.get(list.size() - 1))
+ ", start: " + sdf.format(window.getStart()) + ", end: " + sdf.format(window.getEnd());
out.collect(result);
}
});
Watermark strategy, defines a Bounded Out-of-Orderness Watermark strategy allowing up to 5 seconds event out-of-orderness. In extractTimestamp, extracts event timestamp and prints each event’s key and corresponding event time. Also maintains a currentMaxTimestamp to record current maximum event timestamp:
WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy
.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
Long currentMaxTimestamp = 0L;
final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
long timestamp = element.f1;
currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp);
System.out.println("Key:" + element.f0 + ", EventTime: " + element.f1 + ", " + format.format(element.f1));
return element.f1;
}
});
Complete code is as follows, implementing an event-time based stream processing system and handling out-of-order events through Watermark mechanism:
package icu.wzk;
public class WatermarkTest01 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStreamSource<String> data = env.socketTextStream("localhost", 9999);
SingleOutputStreamOperator<Tuple2<String, Long>> mapped = data.map(
new MapFunction<String, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(String value) throws Exception {
String[] split = value.split(",");
return new Tuple2<>(split[0], Long.valueOf(split[1]));
}
}
);
WatermarkStrategy<Tuple2<String, Long>> watermarkStrategy = WatermarkStrategy
.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
Long currentMaxTimestamp = 0L;
final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
long timestamp = element.f1;
currentMaxTimestamp = Math.max(currentMaxTimestamp, timestamp);
System.out.println("Key:" + element.f0 + ", EventTime: " + element.f1 + ", " + format.format(element.f1));
return element.f1;
}
});
SingleOutputStreamOperator<Tuple2<String, Long>> waterMark = mapped
.assignTimestampsAndWatermarks(watermarkStrategy);
SingleOutputStreamOperator<String> res = waterMark
.keyBy(new KeySelector<Tuple2<String, Long>, String>() {
@Override
public String getKey(Tuple2<String, Long> value) throws Exception {
return value.f0;
}
})
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new WindowFunction<Tuple2<String, Long>, String, String, TimeWindow>() {
@Override
public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
List<Long> list = new ArrayList<>();
for (Tuple2<String, Long> next : input) {
list.add(next.f1);
}
Collections.sort(list);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String result = "key: " + s + ", list.size(): " + list.size() + ", list.get(0): " + sdf.format(list.get(0)) + ", list.get(last): " + sdf.format(list.get(list.size() - 1))
+ ", start: " + sdf.format(window.getStart()) + ", end: " + sdf.format(window.getEnd());
out.collect(result);
}
});
res.print();
env.execute();
}
}
Run Code
Input data (in console):
01,1586489575000
01,1586489576000
01,1586489577000
01,1586489578000
01,1586489579000
View results: Console will output computation results for each window, including event time range in window, window start and end time, etc.