Big Data 132 - Flink SQL Quick Start | Table API + SQL in 3 Minutes with toChangelogStream New Syntax

TL;DR

  • Scenario: Want to write real-time tasks with SQL, simultaneously reuse for batch processing; need a “minimum closed loop” that can run in 3 minutes.
  • Conclusion: Using dependencies and examples in this article, directly run through Table API ⇄ SQL interoperability; data converts from DataStream to table, output uses toChangelogStream (new syntax) for print verification.
  • Output: MRE project skeleton, modern dependency list, streaming window/temporal table writing comparison, common pitfall quick reference card.

Flink SQL is a high-level query language interface provided by Apache Flink, based on SQL standard, providing developers with capability to process streaming and batch data. Flink SQL allows users to use standard SQL query language to perform complex operations on data streams and tables, applicable to various application scenarios like real-time analysis, data stream processing, machine learning, etc.

Unified Query Model for Streaming and Batch

A major characteristic of Flink SQL is the unity of stream processing and batch processing. Through the same set of SQL syntax, users can process both static data (batch processing) and dynamic data (stream processing). This simplifies application development because the same logic can be used to write real-time stream data processing and historical data queries.

Dynamic Tables

Flink SQL models stream data as continuously changing tables through the concept of dynamic tables. This dynamic table continuously updates over time, each data change (insert, update, delete) affects the table state. Through the concept of dynamic tables, Flink can use SQL to query continuous stream data and get continuously updated results during query execution.

Windowing

In stream data processing scenarios, windowing is very important. Flink SQL provides multiple types of window operations including tumbling windows, sliding windows, and session windows.

Tumbling Window

Tumbling window splits data stream into a series of non-overlapping time intervals of fixed length. Each event only belongs to one window, no overlap between windows.

Characteristics:

  • Window size fixed and non-overlapping
  • Each data element only belongs to one window
  • Window boundaries aligned to whole time (whole seconds, whole minutes)

Application Scenarios:

  1. Calculate website PV/UV every hour
  2. Calculate total transaction amount every minute
  3. Calculate sensor average every 10 seconds

Sliding Window

Sliding window allows windows to overlap, data may be assigned to multiple windows. This window is defined by two parameters: window size and slide interval.

Characteristics:

  • Windows can overlap
  • Single data may appear in multiple windows
  • Need to define window length and slide interval

Session Window

Session window dynamically determines window boundaries based on interval between active events, suitable for analyzing user behavior session scenarios.

Characteristics:

  • Window size not fixed
  • Controlled by activity interval (session timeout) parameter
  • Suitable for analyzing user behavior patterns

Joins

Flink SQL supports multiple join operations:

  • Stream-stream joins: Allows users to combine multiple streams together
  • Stream-table joins: Match static tables with streaming data
  • Temporal table joins: Used to join stream data with a temporal table

Table API and SQL API Interoperability

Flink provides two high-level data processing APIs:

  • Table API: A programming interface similar to relational algebra, supports chained calls
  • SQL API: Users can directly use standard SQL statements for data processing

Table API and SQL API have high interoperability, users can mix both in the same program.

HelloWorld

Add Dependencies

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table</artifactId>
    <type>pom</type>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.12</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.12</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

Dependency explanation:

  • flink-table-api-java-bridge_2.12: Bridge, mainly responsible for TableAPI and DataStream/DataSetAPI connection support
  • flink-table-planner-blink_2.12: Planner, main part of TableAPI, provides runtime environment and generates program execution plan

Write Code

package icu.wzk;

public class TableApiDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
        DataStreamSource<Tuple2<String, Integer>> data = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
            @Override
            public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
                while (true) {
                    ctx.collect(new Tuple2<>("name", 10));
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
            }
        });

        // Table way
        Table table = tableEnvironment.fromDataStream(data, $("name"), $("age"));
        Table name = table.select($("name"));
        DataStream<Tuple2<Boolean, Row>> result = tableEnvironment.toRetractStream(name, Row.class);
        result.print();

        // SQL way
        tableEnvironment.createTemporaryView("users",data, $("name"), $("age"));
        String sql = "select name from users";
        table = tableEnvironment.sqlQuery(sql);
        result = tableEnvironment.toRetractStream(table, Row.class);
        result.print();

        env.execute("TableApiDemo");
    }
}

Running Result

1> (true,name)
6> (true,name)
...
2> (true,name)
6> (true,name)
3> (true,name)

New Version

Maven

<!-- Use same version number -->
<properties>
  <flink.version>1.20.0</flink.version>
  <java.version>17</java.version>
</properties>

<dependencies>
  <!-- Basic stream processing -->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <!-- Table API (Java) -->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <!-- DataStream ⇄ Table bridge -->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <!-- Planner (provides runtime support; usually marked provided to hand to cluster environment) -->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
  </dependency>
</dependencies>

Write Code

package icu.wzk;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;

public class TableApiDemo {
    public static void main(String[] args) throws Exception {
        // 1) Basic environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 2) Prepare a set of deterministic data (for verification)
        var ds = env.fromElements(
                Tuple2.of("alice", 10),
                Tuple2.of("bob", 20),
                Tuple2.of("alice", 30)
        );

        // 3) DataStream -> Table (name columns)
        Table t = tEnv.fromDataStream(ds).as("name", "age");

        // 4) Table API: select only name
        Table namesByApi = t.select($("name"));

        // 5) SQL: register temporary view + query
        tEnv.createTemporaryView("users", t);
        Table namesBySql = tEnv.sqlQuery("SELECT name FROM users");

        // 6) Output: use new syntax toChangelogStream to verify incremental results
        tEnv.toChangelogStream(namesByApi, Row.class).print("API");
        tEnv.toChangelogStream(namesBySql, Row.class).print("SQL");

        env.execute("Flink SQL HelloWorld");
    }
}

Expected Result

API> +I[alice]
API> +I[bob]
API> +I[alice]
SQL> +I[alice]
SQL> +I[bob]
SQL> +I[alice]

Note: +I represents Insert (insertion), this is the standard format for changelog