Flink considers Batch as a special case of Streaming. This design philosophy originates from the unified stream-batch architecture. Specifically, Flink treats batch processing as a special case of stream processing, namely bounded stream processing.

Flink’s underlying engine is a highly optimized streaming engine, core components include:

  • Distributed runtime environment
  • State management backend
  • Checkpoint mechanism
  • Time processing model

Window mechanism is indeed an important bridge from Streaming to Batch, its role mainly reflects in:

  1. Time Window: Includes tumbling window, sliding window and session window
  2. Count Window: Window based on element count
  3. Global Window: Special window suitable for batch processing

Basic Concepts

Window is the core of Flink’s infinite stream processing. Windows split streams into finite-sized “buckets” on which we can apply computations.

Windows can be time-driven or event-driven, and based on different event driving can be divided into:

  • Tumbling Window (no overlap)
  • Sliding Window (with overlap)
  • Session Window (activity gap)
  • Global Window

Tumbling Time Window

Tumbling time window is characterized by fixed-length time windows with no overlap between windows, each event can only enter one window.

Time-driven

Scenario: We need to count total items purchased by users per minute, need to split user behavior events by each minute.

package icu.wzk;

public class TumblingWindow {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource = env.getJavaEnv().socketTextStream("localhost", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = dataStreamSource
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        long timeMillis = System.currentTimeMillis();
                        int random = new Random().nextInt(10);
                        System.out.println("value: " + value + ", random: " + random + ", time: " + format.format(timeMillis));
                        return Tuple2.of(value, random);
                    }
                });

        KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = mapStream
                .keyBy(new KeySelector<Tuple2<String, Integer>, Tuple>() {
                    @Override
                    public Tuple getKey(Tuple2<String, Integer> value) throws Exception {
                        return Tuple1.of(value.f0);
                    }
                });

        // Time-driven: divide a window every 10 seconds
        WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> timeWindow = keyedStream
                .timeWindow(Time.seconds(10));
        timeWindow.apply(new MyTimeWindowFunction()).print();
        env.execute("TumblingWindow");
    }
}

Implement WindowFunction:

package icu.wzk;

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;

public class MyTimeWindowFunction implements WindowFunction<Tuple2<String, Integer>, String, Tuple, TimeWindow> {

    @Override
    public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        int sum = 0;
        for (Tuple2<String, Integer> tuple2 : input) {
            sum += tuple2.f1;
        }
        out.collect("key: " + tuple.getField(0) + ", value: " + sum  +
                ", window start: " + format.format(window.getStart()) + ", window end: " + format.format(window.getEnd()));
    }
}

Event-driven

Scenario: When we want every 100 users’ purchase behavior as the driver, then whenever the window is filled with 100 “same” elements, the window will be computed.

// Event-driven: divide a window every 3 elements
WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> globalWindow = keyedStream
        .countWindow(3);
globalWindow.apply(new MyCountWindowFuntion());

Event-driven WindowFunction:

package icu.wzk;

public class MyCountWindowFuntion implements WindowFunction<Tuple2<String, Integer>, String, Tuple, GlobalWindow> {

    @Override
    public void apply(Tuple tuple, GlobalWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        int sum = 0;
        for (Tuple2<String, Integer> tuple2 : input) {
            sum += tuple2.f1;
        }
        long maxTimestamp = window.maxTimestamp();
        out.collect("key:" + tuple.getField(0) + ", value: " + sum + ", maxTimestamp :"
                + maxTimestamp + "," + format.format(maxTimestamp));
    }
}

Key Points Summary

FeatureDescription
Window TypeTumbling Window, Sliding Window, Session Window
Driving MethodTime-driven (TimeWindow), Event-driven (CountWindow)
Time SemanticsEvent Time, Processing Time
No OverlapTumbling windows do not overlap