大数据-132 Flink SQL 实战入门 | 3 分钟跑通 Table API + SQL 含 toChangelogStream 新写法

TL;DR

  • 场景:想用 SQL 写实时任务,同时复用到批处理;需要一个能 3 分钟跑起来的”最小闭环”。
  • 结论:用本文的依赖与样例,直接跑通 Table API ⇄ SQL 的互操作;数据从 DataStream 转表,输出用 toChangelogStream(新写法)打印验证。
  • 产出:MRE 工程骨架、现代依赖清单、流式窗口/时态表的写法对照、常见坑速查卡。

Flink SQL 是 Apache Flink 提供的一种高层次的查询语言接口,它基于 SQL 标准,为开发者提供了处理流式数据和批处理数据的能力。Flink SQL 允许用户使用标准 SQL 查询语言在数据流和数据表上执行复杂的操作,适用于多种应用场景,如实时分析、数据流处理、机器学习等。

流与批统一的查询模式

Flink SQL 的一大特点是流处理和批处理的统一性。通过同一套 SQL 语法,用户可以同时处理静态数据(批处理)和动态数据(流处理)。这使得应用程序的开发更加简化,因为可以用相同的逻辑编写实时流数据处理和历史数据的查询。

动态表 (Dynamic Tables)

Flink SQL 通过动态表的概念将流数据建模为不断变化的表。这种动态表随着时间推移不断更新,数据的每个变化(插入、更新、删除)都会影响表的状态。通过动态表的概念,Flink 可以使用 SQL 查询连续的流数据,并在查询执行时获得不断更新的结果。

窗口操作 (Windowing)

在流式数据处理场景中,窗口操作非常重要。Flink SQL 提供了多种类型的窗口操作,包括滚动窗口、滑动窗口和会话窗口。

滚动窗口 (Tumbling Window)

滚动窗口是将数据流按照固定长度分割成一系列不重叠的时间段。每个事件只属于一个窗口,窗口之间没有重叠。

特点:

  • 窗口大小固定且不重叠
  • 每个数据元素只属于一个窗口
  • 窗口边界对齐整点时间(如整秒、整分钟)

应用场景:

  1. 每小时计算网站PV/UV
  2. 每分钟统计交易金额总和
  3. 每10秒计算传感器平均值

滑动窗口 (Sliding Window)

滑动窗口允许窗口之间存在重叠部分,数据可能被分配到多个窗口中。这种窗口通过两个参数定义:窗口大小和滑动步长。

特点:

  • 窗口可以重叠
  • 单个数据可能出现在多个窗口
  • 需要定义窗口长度和滑动间隔

会话窗口 (Session Window)

会话窗口根据活动事件之间的间隔动态确定窗口边界,适合于分析用户行为会话的场景。

特点:

  • 窗口大小不固定
  • 由活动间隔(会话超时)参数控制
  • 适合分析用户行为模式

连接操作 (Joins)

Flink SQL 支持多种连接操作:

  • 流与流的连接:允许用户将多个流结合在一起
  • 流与表的连接:将静态表与流数据进行匹配
  • 时态表连接 (Temporal Table Join):用于将流数据与一个时态表进行连接

Table API 与 SQL API 的互操作性

Flink 提供了两种高级数据处理 API:

  • Table API:一种与关系代数类似的编程接口,支持链式调用
  • SQL API:用户可以直接使用标准 SQL 语句进行数据处理

Table API 和 SQL API 具有很高的互操作性,用户可以在同一个程序中混合使用这两者。

HelloWorld

添加依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table</artifactId>
    <type>pom</type>
    <version>${flink.version}</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.12</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.12</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

依赖说明:

  • flink-table-api-java-bridge_2.12:桥接器,主要负责 TableAPI 和 DataStream/DataSetAPI 的连接支持
  • flink-table-planner-blink_2.12:计划期,是TableAPI最主要的部分,提供了运行时环境和生成程序执行计划的Planner

编写代码

package icu.wzk;

public class TableApiDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
        DataStreamSource<Tuple2<String, Integer>> data = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
            @Override
            public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
                while (true) {
                    ctx.collect(new Tuple2<>("name", 10));
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
            }
        });

        // Table 方式
        Table table = tableEnvironment.fromDataStream(data, $("name"), $("age"));
        Table name = table.select($("name"));
        DataStream<Tuple2<Boolean, Row>> result = tableEnvironment.toRetractStream(name, Row.class);
        result.print();

        // SQL 方式
        tableEnvironment.createTemporaryView("users",data, $("name"), $("age"));
        String sql = "select name from users";
        table = tableEnvironment.sqlQuery(sql);
        result = tableEnvironment.toRetractStream(table, Row.class);
        result.print();

        env.execute("TableApiDemo");
    }
}

运行结果

1> (true,name)
6> (true,name)
...
2> (true,name)
6> (true,name)
3> (true,name)

新版本

Maven

<!-- 统一使用同一版本号 -->
<properties>
  <flink.version>1.20.0</flink.version>
  <java.version>17</java.version>
</properties>

<dependencies>
  <!-- 基础流处理 -->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <!-- Table API(Java) -->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <!-- DataStream ⇄ Table 的桥接 -->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
  </dependency>

  <!-- Planner(提供运行期支持;通常标 provided 交给集群环境) -->
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
  </dependency>
</dependencies>

编写代码

package icu.wzk;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;

public class TableApiDemo {
    public static void main(String[] args) throws Exception {
        // 1) 基本环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 2) 准备一组确定性数据(便于验证)
        var ds = env.fromElements(
                Tuple2.of("alice", 10),
                Tuple2.of("bob", 20),
                Tuple2.of("alice", 30)
        );

        // 3) DataStream -> Table(起列名)
        Table t = tEnv.fromDataStream(ds).as("name", "age");

        // 4) Table API:只选 name
        Table namesByApi = t.select($("name"));

        // 5) SQL:注册临时视图 + 查询
        tEnv.createTemporaryView("users", t);
        Table namesBySql = tEnv.sqlQuery("SELECT name FROM users");

        // 6) 输出:用新写法 toChangelogStream 验证增量结果
        tEnv.toChangelogStream(namesByApi, Row.class).print("API");
        tEnv.toChangelogStream(namesBySql, Row.class).print("SQL");

        env.execute("Flink SQL HelloWorld");
    }
}

预期结果

API> +I[alice]
API> +I[bob]
API> +I[alice]
SQL> +I[alice]
SQL> +I[bob]
SQL> +I[alice]

说明:+I 表示 Insert(插入),这是 changelog 的标准格式