Sink Overview
Sink in Flink refers to the component that finally outputs data in the data stream processing. In Apache Flink, after data stream is read from Source and goes through a series of transformation operations (like map, filter, join, etc.), it is finally written to Sink. Sink is the endpoint of Flink streaming processing applications, determining how processed data is saved or transmitted.
Main Functions and Features
- Data Output: Sink is responsible for outputting processed data to external systems or storage media
- Fault Tolerance: Cooperates with Flink’s checkpoint mechanism to ensure no data loss
- Scalability: Supports parallel writing to improve throughput
Common Sink Types
-
File System Sink:
- Write to distributed file systems like HDFS/S3
- Example: StreamingFileSink
- Supports time-based and size-based rolling files
-
Message Queue Sink:
- Write to message systems like Kafka/RabbitMQ
- Example: FlinkKafkaProducer
- Provides exactly-once semantics guarantee
-
Database Sink:
- Write to relational databases (MySQL/PostgreSQL)
- Write to NoSQL databases (HBase/Cassandra)
- Usually implemented through JDBC or dedicated connectors
-
Custom Sink:
- Implement SinkFunction interface
- Can integrate any external system
Typical Application Scenarios
- Real-time Computation Result Storage: Write aggregation results to OLAP database
- Data Pipeline: Forward processed data to downstream systems
- Alert System: Write anomaly detection results to notification system
Sink Fault Tolerance Mechanism
1. Fault Tolerance Semantics Level
Exactly-Once:
- Ensures each data is processed only once, no loss or duplication even in case of failure
- Implementation: Requires Sink to support transactional writing and work with Flink’s checkpoint mechanism
At-Least-Once:
- Ensures data is processed at least once, but may have duplicates during fault recovery
- Implementation: Simple retry mechanism, does not guarantee idempotency
2. Implementation Mechanism
Checkpointing:
- Core mechanism: Periodically saves distributed snapshots including operator state and pending data
- Configuration parameters:
execution.checkpointing.interval: Checkpoint interval (default 10 minutes)execution.checkpointing.mode: EXACTLY_ONCE/AT_LEAST_ONCE
Configuration Example
Kafka Sink Example:
DataStream<String> stream = ...;
stream.addSink(new FlinkKafkaProducer<>(
"broker-list",
"topic-name",
new SimpleStringSchema()
));
File System Sink Example:
StreamingFileSink<String> fileSink = StreamingFileSink
.forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))
.build();
stream.addSink(fileSink);
Case: Write Data to Redis
Add Dependency:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
</dependency>
Write Code: Consume Kafka and write to Redis after computation: Source(Kafka) -> Sink(Redis)
package icu.wzk;
public class StreamFromKafka {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configuration
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "h121.wzk.icu:9092");
// Kafka
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
"flink_test",
new SimpleStringSchema(),
properties
);
DataStreamSource<String> data = env.getJavaEnv().addSource(consumer);
SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = data
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word: words) {
out.collect(new Tuple2<>(word, 1));
}
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne
.keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
@Override
public Object getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
})
.sum(1);
result.print();
env.execute("StreamFromKafka");
}
}
Sink Lifecycle
Flink’s Sink goes through the following phases during execution:
- Open: Initialize resources like database connections, file handles, etc.
- Write (invoke): Write each record to target storage system
- Close: Close resources, ensure data is completely written and resources are properly released
Best Practices
- Choose appropriate fault tolerance semantics based on downstream system characteristics
- For exactly-once semantics, ensure target system supports transactions or idempotent operations
- Set checkpoint interval reasonably (usually between 1-10 minutes)
- Monitor checkpoint duration to avoid affecting throughput
- For critical business, implement end-to-end consistency verification mechanism