This is article 64 in the Big Data series, deeply explaining Kafka Exactly-Once semantics implementation including idempotence, transaction mechanism and end-to-end EOS scheme.
Message Delivery Semantics
Kafka supports three message delivery semantics, meeting different business scenario needs:
| Semantics | Description | Typical Configuration |
|---|---|---|
| At-Most-Once | Messages may be lost, but not duplicated | acks=0, no retry |
| At-Least-Once | Messages not lost, but may be duplicated | acks=all, retry enabled |
| Exactly-Once | Messages neither lost nor duplicated | Idempotence + transactions |
Before Kafka 0.11.0.0, could only implement At-Least-Once. Since version 0.11, Kafka introduced idempotent producers and transaction support, officially supporting Exactly-Once Semantics (EOS).
Idempotent Producer
Principle
Idempotence achieves deduplication through two key identifiers:
- PID (Producer ID): Unique ID assigned to each Producer instance by Broker, changes after restart
- Sequence Number: Monotonically increasing sequence number maintained for each
<PID, Topic, Partition>combination
Broker caches max sequence number for each <PID, Partition>. When receiving duplicate message (sequence number ≤ recorded value), simply discards, doesn’t write to log.
Normal write:
Producer sends seq=1 → Broker records maxSeq=1, write success
Producer sends seq=2 → Broker records maxSeq=2, write success
Retry deduplication:
Producer timeout resends seq=2 → Broker finds seq ≤ maxSeq, returns success but doesn't write again
Enable Idempotence
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Enable idempotence (default enabled in Kafka 3.0+)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Idempotence requires acks=all
props.put(ProducerConfig.ACKS_CONFIG, "all");
// Idempotence requires retries > 0
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
Idempotence Limitations
- Only guarantees idempotence within single partition, cross-partition writes not protected
- Only guarantees idempotence within single session, Producer restart changes PID, historical messages can’t be deduplicated
- Cross-partition, cross-session Exactly-Once requires transactions
Transactions
Kafka transactions allow Producer to atomically write a batch of messages to multiple Topic partitions - either all succeed or all rollback, and valid across Producer sessions.
Core Components
- Transaction Coordinator: Any Broker can serve as transaction coordinator, responsible for managing transaction state
- Transaction Log (
__transaction_stateTopic): Persistently stores transaction state, ensures coordinator can recover after crash transactional.id: User-configured globally unique transaction ID, identifies same logical Producer across sessions
Transactional Producer Configuration and Usage
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Must set transactional.id (globally unique)
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-producer-001");
// Enable idempotence (prerequisite for transactions)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Initialize transaction (call once per Producer lifecycle)
producer.initTransactions();
try {
producer.beginTransaction();
// Write atomically to multiple partitions
producer.send(new ProducerRecord<>("topic-a", "key1", "value1"));
producer.send(new ProducerRecord<>("topic-b", "key2", "value2"));
producer.send(new ProducerRecord<>("topic-c", "key3", "value3"));
// Commit transaction
producer.commitTransaction();
} catch (ProducerFencedException e) {
// Old instance fenced by new instance, close directly
producer.close();
} catch (KafkaException e) {
// Other exceptions, rollback transaction
producer.abortTransaction();
}
Transactional Consumer Configuration
Consumer needs to set isolation level to only read committed transaction messages:
// read_committed: Only read committed messages (ignore uncommitted/rolled back)
// read_uncommitted: Read all messages (default, including uncommitted)
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
End-to-End Exactly-Once (EOS)
Complete end-to-end EOS = Idempotent Producer + Transaction + Consumer offset atomic commit.
Typical “read-process-write” pattern (Consume-Transform-Produce):
producer.initTransactions();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
producer.beginTransaction();
try {
for (ConsumerRecord<String, String> record : records) {
// Business processing
String result = process(record.value());
producer.send(new ProducerRecord<>("output-topic", record.key(), result));
}
// Commit consumer offset as part of transaction
// Ensures atomicity of message processing and offset commit
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partRecords = records.records(partition);
long lastOffset = partRecords.get(partRecords.size() - 1).offset();
offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
}
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
}
}
Using sendOffsetsToTransaction() to include consumer offset commit in transaction ensures even if Producer crashes, after restart it won’t duplicate process or miss messages, achieving true end-to-end Exactly-Once semantics.
Summary
| Mechanism | Problem Solved | Applicable Scope |
|---|---|---|
| Idempotent Producer | Deduplication within single partition/session | Simple retry scenarios |
| Transactions | Cross-partition, cross-session atomic writes | Multi-partition writes, ETL pipelines |
| EOS (Idempotence + transactions + atomic offset commit) | End-to-end Exactly-Once | Stream processing, financial transactions |
Kafka’s Exactly-Once implementation costs performance overhead: transactions introduce extra coordination rounds and log writes. In scenarios requiring extremely high data consistency, this cost is worthwhile.