JDBC Sink Overview
In Apache Flink, JDBC Sink is an important data output component that allows writing stream or batch processed data to relational databases through JDBC connections. MySQL is one of the most commonly used target databases.
Core Functions
- Data Persistence: Save real-time processing results to persistent storage
- System Integration: Data exchange with other SQL-based systems
- Transaction Support: Configurable transactions ensure data consistency
Typical Application Scenarios
- Real-time analysis result storage
- User behavior data archiving
- Business metric persistence
- Data loading in ETL processes
Implementation Methods
Using JdbcSink Class
Flink provides built-in JdbcSink.sink() method to create JDBC Sink. Basic usage pattern:
DataStream<User> users = ...;
users.addSink(JdbcSink.sink(
"INSERT INTO users (id, name, age) VALUES (?, ?, ?)",
(statement, user) -> {
statement.setInt(1, user.getId());
statement.setString(2, user.getName());
statement.setInt(3, user.getAge());
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/test")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("root")
.withPassword("password")
.build()
));
Configuration Options
-
Connection Parameters:
- JDBC URL
- Driver class name
- Username and password
- Connection pool configuration (optional)
-
Execution Parameters:
- Batch size
- Retry strategy
- Timeout settings
Flink JDBC Sink Introduction
Flink provides JdbcSink connector, which is a Sink implementation based on standard JDBC protocol, can efficiently write data in stream processing to various relational databases supporting JDBC including MySQL, PostgreSQL, Oracle, etc. This connector is an important part of Flink ecosystem, providing standardized solution for data transmission from stream processing systems to relational databases.
Core Functional Features
- Multi-database Support: Compatible with various relational databases through JDBC drivers
- Batch Optimization: Supports batch writing mode to improve performance
- Transaction Guarantee: Provides exactly-once semantics guarantee
- Retry Mechanism: Built-in automatic retry after connection failure
Usage Requirements
When using JDBC Sink, need to provide the following key configuration:
-
Database Connection Information:
- JDBC URL (e.g., jdbc:mysql://localhost:3306/db_name)
- Username and password
- Connection pool configuration (optional)
-
SQL Statements:
- Support INSERT/UPDATE and other DML statements
- Can use PreparedStatement form
- Support dynamic value binding through parameters
-
Data Type Mapping:
- Automatic conversion between Flink data types and database types
- Support custom type serializers
Typical Application Scenarios
Taking MySQL as example, Flink JdbcSink workflow:
- Receive records from data stream
- Prepare statement based on configured SQL template
- Get database connection through JDBC connection pool
- Execute batch write operation
- Commit transaction to ensure data consistency
// Example: Create JdbcSink to write to MySQL
JdbcSink.sink(
"INSERT INTO user_actions (user_id, action_type, timestamp) VALUES (?, ?, ?)",
(ps, record) -> {
ps.setString(1, record.getUserId());
ps.setString(2, record.getActionType());
ps.setTimestamp(3, new Timestamp(record.getTimestamp()));
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/flink_db")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("flink_user")
.withPassword("password")
.build()
);
Case: Stream Data to MySQL
Add Dependency
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
</dependency>
Write Code
A Person class corresponding to fields in a MySQL table. Simulate several data streams and write to MySQL.
package icu.wzk;
public class SinkSqlTest {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Person> data = env.getJavaEnv().fromElements(
new Person("wzk", 18, 1),
new Person("icu", 20, 1),
new Person("wzkicu", 13, 2)
);
data.addSink(new MySqlSinkFunction());
env.execute();
}
public static class MySqlSinkFunction extends RichSinkFunction<Person> {
private PreparedStatement preparedStatement = null;
private Connection connection = null;
@Override
public void open(Configuration parameters) throws Exception {
String url = "jdbc:mysql://h122.wzk.icu:3306/flink-test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC";
String username = "hive";
String password = "hive@wzk.icu";
connection = DriverManager.getConnection(url, username, password);
String sql = "INSERT INTO PERSON(name, age, sex) VALUES(?, ?, ?)";
preparedStatement = connection.prepareStatement(sql);
}
@Override
public void invoke(Person value, Context context) throws Exception {
preparedStatement.setString(1, value.getName());
preparedStatement.setInt(2, value.getAge());
preparedStatement.setInt(3, value.getSex());
preparedStatement.executeUpdate();
}
@Override
public void close() throws Exception {
if (null != connection) {
connection.close();
}
if (null != preparedStatement) {
preparedStatement.close();
}
}
}
public static class Person {
private String name;
private Integer age;
private Integer sex;
public Person() {
}
public Person(String name, Integer age, Integer sex) {
this.name = name;
this.age = age;
this.sex = sex;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
public Integer getSex() {
return sex;
}
public void setSex(Integer sex) {
this.sex = sex;
}
}
}
Case: Write to Kafka
Write Code
package icu.wzk;
public class SinkKafkaTest {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> data = env.socketTextStream("localhost", 9999, '\n', 0);
String brokerList = "h121.wzk.icu:9092";
String topic = "flink_test";
FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(brokerList, topic, new SimpleStringSchema());
data.addSink(producer);
env.execute("SinkKafkaTest");
}
}
Run Code
Start nc:
nc -lk 9999
We can send data by pressing enter.
View Results
Login to server to view information:
./kafka-console-consumer.sh --bootstrap-server h121.wzk.icu:9092 --topic flink_test --from-beginning
You can see the data has been written.
Optimization Suggestions
When inserting large amounts of data into MySQL in actual projects, consider the following optimization strategies:
- Batch Insert: Configure batch insert through JdbcExecutionOptions, can significantly improve write performance.
- Connection Pool: For high-concurrency write operations, recommend using connection pool to reduce database connection overhead.
- Index Optimization: Configure appropriate indexes for the table being inserted can improve query performance, but during large-volume writes, indexes may reduce insert speed, so trade-offs are needed.
- Data Sharding: For very large-scale data, consider sharding data and writing in parallel to different MySQL instances or partitioned tables.