Flink认为Batch是Streaming的一个特例,这种设计理念源于流批一体的架构思想。具体来说,Flink将批处理视为流处理的一种特殊情况,即有限流(bounded stream)处理。

Flink底层引擎是一个高度优化的流式引擎,核心组件包括:

  • 分布式运行时环境
  • 状态管理后端
  • 检查点机制
  • 时间处理模型

Window机制确实是从Streaming到Batch的重要桥梁,其作用主要体现在:

  1. 时间窗口(Time Window):包括滚动窗口、滑动窗口和会话窗口
  2. 计数窗口(Count Window):基于元素数量的窗口
  3. 全局窗口(Global Window):适用于批处理的特殊窗口

基本概念

Window是Flink处理无限流的核心,Windows将流拆分为有限大小”桶”,我们可以在其上应用计算。

窗口可以基于时间驱动、也可以基于事件驱动,同样基于不同事件驱动的可以分为:

  • 翻滚窗口(Tumbling Window 无重叠)
  • 滑动窗口(Sliding Window 有重叠)
  • 会话窗口(SessionWindow 活动间隙)
  • 全局窗口

滚动时间窗口(Tumbling Time Window)

滚动时间窗口的特点是时间窗口是固定长度的,窗口之间没有重叠,每个事件只能进入一个窗口。

基于时间驱动

场景:我们需要统计每一分钟用户购买商品的总数,需要将用户的行为事件按每一分钟进行切分。

package icu.wzk;

public class TumblingWindow {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource = env.getJavaEnv().socketTextStream("localhost", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        long timeMillis = System.currentTimeMillis();
                        int random = new Random().nextInt(10);
                        System.out.println("value: " + value + ", random: " + random + ", time: " + format.format(timeMillis));
                        return Tuple2.of(value, random);
                    }
                });

        KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream
                .keyBy(new KeySelector<Tuple2<String, Integer>, Tuple>() {
                    @Override
                    public Tuple getKey(Tuple2<String, Integer> value) throws Exception {
                        return Tuple1.of(value.f0);
                    }
                });

        // 基于时间驱动 每隔 10秒 划分一个窗口
        WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyedStream
                .timeWindow(Time.seconds(10));
        timeWindow.apply(new MyTimeWindowFunction()).print();
        env.execute("TumblingWindow");
    }
}

实现WindowFunction:

package icu.wzk;

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;

public class MyTimeWindowFunction implements WindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow> {

    @Override
    public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        int sum = 0;
        for (Tuple2<String, Integer> tuple2 : input) {
            sum += tuple2.f1;
        }
        out.collect("key: " + tuple.getField(0) + ", value: " + sum  +
                ", window start: " + format.format(window.getStart()) + ", window end: " + format.format(window.getEnd()));
    }
}

基于事件驱动

场景:当我们想要每100个用户的购买行为作为驱动,那么每当窗口中填满了100个”相同”元素,就会对窗口进行计算。

// 基于事件驱动 每3个元素划分一个窗口
WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> globalWindow = keyedStream
        .countWindow(3);
globalWindow.apply(new MyCountWindowFuntion());

事件驱动WindowFunction:

package icu.wzk;

public class MyCountWindowFuntion implements WindowFunction<Tuple2<String, Integer>, String, Tuple, GlobalWindow> {

    @Override
    public void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        int sum = 0;
        for (Tuple2<String, Integer> tuple2 : input) {
            sum += tuple2.f1;
        }
        long maxTimestamp = window.maxTimestamp();
        out.collect("key:" + tuple.getField(0) + ", value: " + sum + ", maxTimestamp :"
                + maxTimestamp + "," + format.format(maxTimestamp));
    }
}

关键点总结

特性说明
窗口类型滚动窗口(Tumbling)、滑动窗口(Sliding)、会话窗口(Session)
驱动方式时间驱动(TimeWindow)、事件驱动(CountWindow)
时间语义事件时间(Event Time)、处理时间(Processing Time)
无重叠翻滚窗口的窗口之间不重叠