Flink Window 背景
Flink认为Batch是Streaming的一个特例,这种设计理念源于流批一体的架构思想。具体来说,Flink将批处理视为流处理的一种特殊情况,即有限流(bounded stream)处理。
Flink底层引擎是一个高度优化的流式引擎,核心组件包括:
- 分布式运行时环境
- 状态管理后端
- 检查点机制
- 时间处理模型
Window机制确实是从Streaming到Batch的重要桥梁,其作用主要体现在:
- 时间窗口(Time Window):包括滚动窗口、滑动窗口和会话窗口
- 计数窗口(Count Window):基于元素数量的窗口
- 全局窗口(Global Window):适用于批处理的特殊窗口
基本概念
Window是Flink处理无限流的核心,Windows将流拆分为有限大小”桶”,我们可以在其上应用计算。
窗口可以基于时间驱动、也可以基于事件驱动,同样基于不同事件驱动的可以分为:
- 翻滚窗口(Tumbling Window 无重叠)
- 滑动窗口(Sliding Window 有重叠)
- 会话窗口(SessionWindow 活动间隙)
- 全局窗口
滚动时间窗口(Tumbling Time Window)
滚动时间窗口的特点是时间窗口是固定长度的,窗口之间没有重叠,每个事件只能进入一个窗口。
基于时间驱动
场景:我们需要统计每一分钟用户购买商品的总数,需要将用户的行为事件按每一分钟进行切分。
package icu.wzk;
public class TumblingWindow {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> dataStreamSource = env.getJavaEnv().socketTextStream("localhost", 9999);
SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long timeMillis = System.currentTimeMillis();
int random = new Random().nextInt(10);
System.out.println("value: " + value + ", random: " + random + ", time: " + format.format(timeMillis));
return Tuple2.of(value, random);
}
});
KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream
.keyBy(new KeySelector<Tuple2<String, Integer>, Tuple>() {
@Override
public Tuple getKey(Tuple2<String, Integer> value) throws Exception {
return Tuple1.of(value.f0);
}
});
// 基于时间驱动 每隔 10秒 划分一个窗口
WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyedStream
.timeWindow(Time.seconds(10));
timeWindow.apply(new MyTimeWindowFunction()).print();
env.execute("TumblingWindow");
}
}
实现WindowFunction:
package icu.wzk;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.text.SimpleDateFormat;
public class MyTimeWindowFunction implements WindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow> {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
int sum = 0;
for (Tuple2<String, Integer> tuple2 : input) {
sum += tuple2.f1;
}
out.collect("key: " + tuple.getField(0) + ", value: " + sum +
", window start: " + format.format(window.getStart()) + ", window end: " + format.format(window.getEnd()));
}
}
基于事件驱动
场景:当我们想要每100个用户的购买行为作为驱动,那么每当窗口中填满了100个”相同”元素,就会对窗口进行计算。
// 基于事件驱动 每3个元素划分一个窗口
WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> globalWindow = keyedStream
.countWindow(3);
globalWindow.apply(new MyCountWindowFuntion());
事件驱动WindowFunction:
package icu.wzk;
public class MyCountWindowFuntion implements WindowFunction<Tuple2<String, Integer>, String, Tuple, GlobalWindow> {
@Override
public void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
int sum = 0;
for (Tuple2<String, Integer> tuple2 : input) {
sum += tuple2.f1;
}
long maxTimestamp = window.maxTimestamp();
out.collect("key:" + tuple.getField(0) + ", value: " + sum + ", maxTimestamp :"
+ maxTimestamp + "," + format.format(maxTimestamp));
}
}
关键点总结
| 特性 | 说明 |
|---|---|
| 窗口类型 | 滚动窗口(Tumbling)、滑动窗口(Sliding)、会话窗口(Session) |
| 驱动方式 | 时间驱动(TimeWindow)、事件驱动(CountWindow) |
| 时间语义 | 事件时间(Event Time)、处理时间(Processing Time) |
| 无重叠 | 翻滚窗口的窗口之间不重叠 |