Sink Overview

Sink in Flink refers to the component that finally outputs data in the data stream processing. In Apache Flink, after data stream is read from Source and goes through a series of transformation operations (like map, filter, join, etc.), it is finally written to Sink. Sink is the endpoint of Flink streaming processing applications, determining how processed data is saved or transmitted.

Main Functions and Features

  1. Data Output: Sink is responsible for outputting processed data to external systems or storage media
  2. Fault Tolerance: Cooperates with Flink’s checkpoint mechanism to ensure no data loss
  3. Scalability: Supports parallel writing to improve throughput

Common Sink Types

  1. File System Sink:

    • Write to distributed file systems like HDFS/S3
    • Example: StreamingFileSink
    • Supports time-based and size-based rolling files
  2. Message Queue Sink:

    • Write to message systems like Kafka/RabbitMQ
    • Example: FlinkKafkaProducer
    • Provides exactly-once semantics guarantee
  3. Database Sink:

    • Write to relational databases (MySQL/PostgreSQL)
    • Write to NoSQL databases (HBase/Cassandra)
    • Usually implemented through JDBC or dedicated connectors
  4. Custom Sink:

    • Implement SinkFunction interface
    • Can integrate any external system

Typical Application Scenarios

  1. Real-time Computation Result Storage: Write aggregation results to OLAP database
  2. Data Pipeline: Forward processed data to downstream systems
  3. Alert System: Write anomaly detection results to notification system

Sink Fault Tolerance Mechanism

1. Fault Tolerance Semantics Level

Exactly-Once:

  • Ensures each data is processed only once, no loss or duplication even in case of failure
  • Implementation: Requires Sink to support transactional writing and work with Flink’s checkpoint mechanism

At-Least-Once:

  • Ensures data is processed at least once, but may have duplicates during fault recovery
  • Implementation: Simple retry mechanism, does not guarantee idempotency

2. Implementation Mechanism

Checkpointing:

  • Core mechanism: Periodically saves distributed snapshots including operator state and pending data
  • Configuration parameters:
    • execution.checkpointing.interval: Checkpoint interval (default 10 minutes)
    • execution.checkpointing.mode: EXACTLY_ONCE/AT_LEAST_ONCE

Configuration Example

Kafka Sink Example:

DataStream<String> stream = ...;
stream.addSink(new FlinkKafkaProducer<>(
    "broker-list",
    "topic-name",
    new SimpleStringSchema()
));

File System Sink Example:

StreamingFileSink<String> fileSink = StreamingFileSink
    .forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))
    .build();
stream.addSink(fileSink);

Case: Write Data to Redis

Add Dependency:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-redis_2.11</artifactId>
  <version>1.1.5</version>
</dependency>

Write Code: Consume Kafka and write to Redis after computation: Source(Kafka) -> Sink(Redis)

package icu.wzk;

public class StreamFromKafka {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Configuration
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "h121.wzk.icu:9092");

        // Kafka
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
                "flink_test",
                new SimpleStringSchema(),
                properties
        );
        DataStreamSource<String> data = env.getJavaEnv().addSource(consumer);

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = data
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        String[] words = value.split(" ");
                        for (String word: words) {
                            out.collect(new Tuple2<>(word, 1));
                        }
                    }
                });
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = wordAndOne
                .keyBy(new KeySelector<Tuple2<String, Integer>, Object>() {
                    @Override
                    public Object getKey(Tuple2<String, Integer> value) throws Exception {
                        return value.f0;
                    }
                })
                .sum(1);
        result.print();
        env.execute("StreamFromKafka");
    }
}

Sink Lifecycle

Flink’s Sink goes through the following phases during execution:

  • Open: Initialize resources like database connections, file handles, etc.
  • Write (invoke): Write each record to target storage system
  • Close: Close resources, ensure data is completely written and resources are properly released

Best Practices

  1. Choose appropriate fault tolerance semantics based on downstream system characteristics
  2. For exactly-once semantics, ensure target system supports transactions or idempotent operations
  3. Set checkpoint interval reasonably (usually between 1-10 minutes)
  4. Monitor checkpoint duration to avoid affecting throughput
  5. For critical business, implement end-to-end consistency verification mechanism