本文是大数据系列第 64 篇,深入讲解 Kafka 精确一次语义的实现原理,包括幂等性、事务机制与端到端 EOS 方案。

完整图文版(含截图):CSDN 原文 | 掘金

消息交付语义

Kafka 支持三种消息交付语义,满足不同业务场景的需求:

语义描述典型配置
At-Most-Once(最多一次)消息可能丢失,但不重复acks=0,不重试
At-Least-Once(至少一次)消息不丢失,但可能重复acks=all,重试开启
Exactly-Once(精确一次)消息不丢失且不重复幂等性 + 事务

在 Kafka 0.11.0.0 之前,只能实现 At-Least-Once。自 0.11 版本起,Kafka 引入幂等生产者和事务支持,正式支持 Exactly-Once 语义(EOS)。

幂等生产者(Idempotent Producer)

原理

幂等性通过两个关键标识符实现去重:

  • PID(Producer ID):每个 Producer 实例启动时由 Broker 分配的唯一 ID,重启后会变化
  • 序列号(Sequence Number):每个 <PID, Topic, Partition> 组合维护一个单调递增的序列号

Broker 会缓存每个 <PID, Partition> 的最大序列号。当收到重复消息(序列号 ≤ 已记录值)时,直接丢弃,不写入日志。

正常写入:
  Producer 发送 seq=1 → Broker 记录 maxSeq=1,写入成功
  Producer 发送 seq=2 → Broker 记录 maxSeq=2,写入成功

重试去重:
  Producer 超时重发 seq=2 → Broker 发现 seq ≤ maxSeq,返回成功但不重复写入

开启幂等性

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 开启幂等性(Kafka 3.0+ 默认开启)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 幂等性要求 acks=all
props.put(ProducerConfig.ACKS_CONFIG, "all");
// 幂等性要求 retries > 0
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

幂等性的局限

  • 只保证单个分区内的幂等性,跨分区写入不受保护
  • 只保证单个会话内的幂等性,Producer 重启后 PID 变化,历史消息无法去重
  • 跨分区、跨会话的精确一次需要事务

事务(Transactions)

Kafka 事务允许 Producer 将一批消息原子性地写入多个 Topic 分区——要么全部成功,要么全部回滚,且跨 Producer 会话保持有效。

核心组件

  • Transaction Coordinator:每个 Broker 都可以作为事务协调者,负责管理事务状态
  • Transaction Log__transaction_state Topic):持久化存储事务状态,保证协调者宕机后可恢复
  • transactional.id:用户配置的全局唯一事务 ID,跨会话标识同一个逻辑 Producer

事务生产者配置与使用

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 必须设置 transactional.id(全局唯一)
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-producer-001");
// 开启幂等性(事务的前提条件)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// 初始化事务(每个 Producer 生命周期只调用一次)
producer.initTransactions();

try {
    producer.beginTransaction();

    // 向多个分区原子写入
    producer.send(new ProducerRecord<>("topic-a", "key1", "value1"));
    producer.send(new ProducerRecord<>("topic-b", "key2", "value2"));
    producer.send(new ProducerRecord<>("topic-c", "key3", "value3"));

    // 提交事务
    producer.commitTransaction();
} catch (ProducerFencedException e) {
    // 旧实例被新实例 fence,直接关闭
    producer.close();
} catch (KafkaException e) {
    // 其他异常,回滚事务
    producer.abortTransaction();
}

事务消费者配置

消费者需设置隔离级别,只读取已提交的事务消息:

// read_committed:只读取已提交消息(忽略未提交/回滚的消息)
// read_uncommitted:读取所有消息(默认,包括未提交的)
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

端到端 Exactly-Once(EOS)

完整的端到端 EOS = 幂等生产者 + 事务 + 消费者偏移量原子提交

典型的”读取-处理-写入”模式(Consume-Transform-Produce):

producer.initTransactions();

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    if (!records.isEmpty()) {
        producer.beginTransaction();

        try {
            for (ConsumerRecord<String, String> record : records) {
                // 业务处理
                String result = process(record.value());
                producer.send(new ProducerRecord<>("output-topic", record.key(), result));
            }

            // 将消费者偏移量作为事务的一部分提交
            // 保证消息处理和偏移量提交的原子性
            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partRecords = records.records(partition);
                long lastOffset = partRecords.get(partRecords.size() - 1).offset();
                offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
            }
            producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());

            producer.commitTransaction();
        } catch (Exception e) {
            producer.abortTransaction();
        }
    }
}

通过 sendOffsetsToTransaction() 将消费偏移量的提交纳入事务,确保即使 Producer 崩溃,重启后也不会重复处理或遗漏消息,实现真正的端到端精确一次语义。

小结

机制解决的问题适用范围
幂等生产者单分区、单会话内去重简单重试场景
事务跨分区、跨会话原子写入多分区写入、ETL 管道
EOS(幂等+事务+原子偏移提交)端到端精确一次流处理、金融交易

Kafka 的 Exactly-Once 实现代价是性能开销:事务引入了额外的协调轮次和日志写入。在对数据一致性要求极高的场景中,这个代价是值得的。