本文是大数据系列第 64 篇,深入讲解 Kafka 精确一次语义的实现原理,包括幂等性、事务机制与端到端 EOS 方案。
消息交付语义
Kafka 支持三种消息交付语义,满足不同业务场景的需求:
| 语义 | 描述 | 典型配置 |
|---|---|---|
| At-Most-Once(最多一次) | 消息可能丢失,但不重复 | acks=0,不重试 |
| At-Least-Once(至少一次) | 消息不丢失,但可能重复 | acks=all,重试开启 |
| Exactly-Once(精确一次) | 消息不丢失且不重复 | 幂等性 + 事务 |
在 Kafka 0.11.0.0 之前,只能实现 At-Least-Once。自 0.11 版本起,Kafka 引入幂等生产者和事务支持,正式支持 Exactly-Once 语义(EOS)。
幂等生产者(Idempotent Producer)
原理
幂等性通过两个关键标识符实现去重:
- PID(Producer ID):每个 Producer 实例启动时由 Broker 分配的唯一 ID,重启后会变化
- 序列号(Sequence Number):每个
<PID, Topic, Partition>组合维护一个单调递增的序列号
Broker 会缓存每个 <PID, Partition> 的最大序列号。当收到重复消息(序列号 ≤ 已记录值)时,直接丢弃,不写入日志。
正常写入:
Producer 发送 seq=1 → Broker 记录 maxSeq=1,写入成功
Producer 发送 seq=2 → Broker 记录 maxSeq=2,写入成功
重试去重:
Producer 超时重发 seq=2 → Broker 发现 seq ≤ maxSeq,返回成功但不重复写入
开启幂等性
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 开启幂等性(Kafka 3.0+ 默认开启)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 幂等性要求 acks=all
props.put(ProducerConfig.ACKS_CONFIG, "all");
// 幂等性要求 retries > 0
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
幂等性的局限
- 只保证单个分区内的幂等性,跨分区写入不受保护
- 只保证单个会话内的幂等性,Producer 重启后 PID 变化,历史消息无法去重
- 跨分区、跨会话的精确一次需要事务
事务(Transactions)
Kafka 事务允许 Producer 将一批消息原子性地写入多个 Topic 分区——要么全部成功,要么全部回滚,且跨 Producer 会话保持有效。
核心组件
- Transaction Coordinator:每个 Broker 都可以作为事务协调者,负责管理事务状态
- Transaction Log(
__transaction_stateTopic):持久化存储事务状态,保证协调者宕机后可恢复 transactional.id:用户配置的全局唯一事务 ID,跨会话标识同一个逻辑 Producer
事务生产者配置与使用
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 必须设置 transactional.id(全局唯一)
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-producer-001");
// 开启幂等性(事务的前提条件)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 初始化事务(每个 Producer 生命周期只调用一次)
producer.initTransactions();
try {
producer.beginTransaction();
// 向多个分区原子写入
producer.send(new ProducerRecord<>("topic-a", "key1", "value1"));
producer.send(new ProducerRecord<>("topic-b", "key2", "value2"));
producer.send(new ProducerRecord<>("topic-c", "key3", "value3"));
// 提交事务
producer.commitTransaction();
} catch (ProducerFencedException e) {
// 旧实例被新实例 fence,直接关闭
producer.close();
} catch (KafkaException e) {
// 其他异常,回滚事务
producer.abortTransaction();
}
事务消费者配置
消费者需设置隔离级别,只读取已提交的事务消息:
// read_committed:只读取已提交消息(忽略未提交/回滚的消息)
// read_uncommitted:读取所有消息(默认,包括未提交的)
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
端到端 Exactly-Once(EOS)
完整的端到端 EOS = 幂等生产者 + 事务 + 消费者偏移量原子提交。
典型的”读取-处理-写入”模式(Consume-Transform-Produce):
producer.initTransactions();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
producer.beginTransaction();
try {
for (ConsumerRecord<String, String> record : records) {
// 业务处理
String result = process(record.value());
producer.send(new ProducerRecord<>("output-topic", record.key(), result));
}
// 将消费者偏移量作为事务的一部分提交
// 保证消息处理和偏移量提交的原子性
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partRecords = records.records(partition);
long lastOffset = partRecords.get(partRecords.size() - 1).offset();
offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
}
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
}
}
通过 sendOffsetsToTransaction() 将消费偏移量的提交纳入事务,确保即使 Producer 崩溃,重启后也不会重复处理或遗漏消息,实现真正的端到端精确一次语义。
小结
| 机制 | 解决的问题 | 适用范围 |
|---|---|---|
| 幂等生产者 | 单分区、单会话内去重 | 简单重试场景 |
| 事务 | 跨分区、跨会话原子写入 | 多分区写入、ETL 管道 |
| EOS(幂等+事务+原子偏移提交) | 端到端精确一次 | 流处理、金融交易 |
Kafka 的 Exactly-Once 实现代价是性能开销:事务引入了额外的协调轮次和日志写入。在对数据一致性要求极高的场景中,这个代价是值得的。