JDBC Sink Overview

In Apache Flink, JDBC Sink is an important data output component that allows writing stream or batch processed data to relational databases through JDBC connections. MySQL is one of the most commonly used target databases.

Core Functions

  1. Data Persistence: Save real-time processing results to persistent storage
  2. System Integration: Data exchange with other SQL-based systems
  3. Transaction Support: Configurable transactions ensure data consistency

Typical Application Scenarios

  • Real-time analysis result storage
  • User behavior data archiving
  • Business metric persistence
  • Data loading in ETL processes

Implementation Methods

Using JdbcSink Class

Flink provides built-in JdbcSink.sink() method to create JDBC Sink. Basic usage pattern:

DataStream<User> users = ...;
users.addSink(JdbcSink.sink(
    "INSERT INTO users (id, name, age) VALUES (?, ?, ?)",
    (statement, user) -> {
        statement.setInt(1, user.getId());
        statement.setString(2, user.getName());
        statement.setInt(3, user.getAge());
    },
    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
        .withUrl("jdbc:mysql://localhost:3306/test")
        .withDriverName("com.mysql.jdbc.Driver")
        .withUsername("root")
        .withPassword("password")
        .build()
));

Configuration Options

  1. Connection Parameters:

    • JDBC URL
    • Driver class name
    • Username and password
    • Connection pool configuration (optional)
  2. Execution Parameters:

    • Batch size
    • Retry strategy
    • Timeout settings

Flink provides JdbcSink connector, which is a Sink implementation based on standard JDBC protocol, can efficiently write data in stream processing to various relational databases supporting JDBC including MySQL, PostgreSQL, Oracle, etc. This connector is an important part of Flink ecosystem, providing standardized solution for data transmission from stream processing systems to relational databases.

Core Functional Features

  1. Multi-database Support: Compatible with various relational databases through JDBC drivers
  2. Batch Optimization: Supports batch writing mode to improve performance
  3. Transaction Guarantee: Provides exactly-once semantics guarantee
  4. Retry Mechanism: Built-in automatic retry after connection failure

Usage Requirements

When using JDBC Sink, need to provide the following key configuration:

  1. Database Connection Information:

    • JDBC URL (e.g., jdbc:mysql://localhost:3306/db_name)
    • Username and password
    • Connection pool configuration (optional)
  2. SQL Statements:

    • Support INSERT/UPDATE and other DML statements
    • Can use PreparedStatement form
    • Support dynamic value binding through parameters
  3. Data Type Mapping:

    • Automatic conversion between Flink data types and database types
    • Support custom type serializers

Typical Application Scenarios

Taking MySQL as example, Flink JdbcSink workflow:

  1. Receive records from data stream
  2. Prepare statement based on configured SQL template
  3. Get database connection through JDBC connection pool
  4. Execute batch write operation
  5. Commit transaction to ensure data consistency
// Example: Create JdbcSink to write to MySQL
JdbcSink.sink(
    "INSERT INTO user_actions (user_id, action_type, timestamp) VALUES (?, ?, ?)",
    (ps, record) -> {
        ps.setString(1, record.getUserId());
        ps.setString(2, record.getActionType());
        ps.setTimestamp(3, new Timestamp(record.getTimestamp()));
    },
    JdbcExecutionOptions.builder()
        .withBatchSize(1000)
        .withBatchIntervalMs(200)
        .build(),
    new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
        .withUrl("jdbc:mysql://localhost:3306/flink_db")
        .withDriverName("com.mysql.jdbc.Driver")
        .withUsername("flink_user")
        .withPassword("password")
        .build()
);

Case: Stream Data to MySQL

Add Dependency

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.28</version>
</dependency>

Write Code

A Person class corresponding to fields in a MySQL table. Simulate several data streams and write to MySQL.

package icu.wzk;

public class SinkSqlTest {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Person> data = env.getJavaEnv().fromElements(
                new Person("wzk", 18, 1),
                new Person("icu", 20, 1),
                new Person("wzkicu", 13, 2)
        );
        data.addSink(new MySqlSinkFunction());

        env.execute();
    }

    public static class MySqlSinkFunction extends RichSinkFunction<Person> {

        private PreparedStatement preparedStatement = null;

        private Connection connection = null;

        @Override
        public void open(Configuration parameters) throws Exception {
            String url = "jdbc:mysql://h122.wzk.icu:3306/flink-test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC";
            String username = "hive";
            String password = "hive@wzk.icu";
            connection = DriverManager.getConnection(url, username, password);
            String sql = "INSERT INTO PERSON(name, age, sex) VALUES(?, ?, ?)";
            preparedStatement = connection.prepareStatement(sql);
        }

        @Override
        public void invoke(Person value, Context context) throws Exception {
            preparedStatement.setString(1, value.getName());
            preparedStatement.setInt(2, value.getAge());
            preparedStatement.setInt(3, value.getSex());
            preparedStatement.executeUpdate();
        }

        @Override
        public void close() throws Exception {
            if (null != connection) {
                connection.close();
            }
            if (null != preparedStatement) {
                preparedStatement.close();
            }
        }
    }

    public static class Person {
        private String name;
        private Integer age;
        private Integer sex;

        public Person() {

        }

        public Person(String name, Integer age, Integer sex) {
            this.name = name;
            this.age = age;
            this.sex = sex;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public Integer getAge() {
            return age;
        }

        public void setAge(Integer age) {
            this.age = age;
        }

        public Integer getSex() {
            return sex;
        }

        public void setSex(Integer sex) {
            this.sex = sex;
        }
    }
}

Case: Write to Kafka

Write Code

package icu.wzk;

public class SinkKafkaTest {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> data = env.socketTextStream("localhost", 9999, '\n', 0);
        String brokerList = "h121.wzk.icu:9092";
        String topic = "flink_test";
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(brokerList, topic, new SimpleStringSchema());
        data.addSink(producer);
        env.execute("SinkKafkaTest");
    }

}

Run Code

Start nc:

nc -lk 9999

We can send data by pressing enter.

View Results

Login to server to view information:

./kafka-console-consumer.sh --bootstrap-server h121.wzk.icu:9092 --topic flink_test --from-beginning

You can see the data has been written.

Optimization Suggestions

When inserting large amounts of data into MySQL in actual projects, consider the following optimization strategies:

  • Batch Insert: Configure batch insert through JdbcExecutionOptions, can significantly improve write performance.
  • Connection Pool: For high-concurrency write operations, recommend using connection pool to reduce database connection overhead.
  • Index Optimization: Configure appropriate indexes for the table being inserted can improve query performance, but during large-volume writes, indexes may reduce insert speed, so trade-offs are needed.
  • Data Sharding: For very large-scale data, consider sharding data and writing in parallel to different MySQL instances or partitioned tables.