This is article 64 in the Big Data series, deeply explaining Kafka Exactly-Once semantics implementation including idempotence, transaction mechanism and end-to-end EOS scheme.

Message Delivery Semantics

Kafka supports three message delivery semantics, meeting different business scenario needs:

SemanticsDescriptionTypical Configuration
At-Most-OnceMessages may be lost, but not duplicatedacks=0, no retry
At-Least-OnceMessages not lost, but may be duplicatedacks=all, retry enabled
Exactly-OnceMessages neither lost nor duplicatedIdempotence + transactions

Before Kafka 0.11.0.0, could only implement At-Least-Once. Since version 0.11, Kafka introduced idempotent producers and transaction support, officially supporting Exactly-Once Semantics (EOS).

Idempotent Producer

Principle

Idempotence achieves deduplication through two key identifiers:

  • PID (Producer ID): Unique ID assigned to each Producer instance by Broker, changes after restart
  • Sequence Number: Monotonically increasing sequence number maintained for each <PID, Topic, Partition> combination

Broker caches max sequence number for each <PID, Partition>. When receiving duplicate message (sequence number ≤ recorded value), simply discards, doesn’t write to log.

Normal write:
  Producer sends seq=1 → Broker records maxSeq=1, write success
  Producer sends seq=2 → Broker records maxSeq=2, write success

Retry deduplication:
  Producer timeout resends seq=2 → Broker finds seq ≤ maxSeq, returns success but doesn't write again

Enable Idempotence

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// Enable idempotence (default enabled in Kafka 3.0+)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Idempotence requires acks=all
props.put(ProducerConfig.ACKS_CONFIG, "all");
// Idempotence requires retries > 0
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

Idempotence Limitations

  • Only guarantees idempotence within single partition, cross-partition writes not protected
  • Only guarantees idempotence within single session, Producer restart changes PID, historical messages can’t be deduplicated
  • Cross-partition, cross-session Exactly-Once requires transactions

Transactions

Kafka transactions allow Producer to atomically write a batch of messages to multiple Topic partitions - either all succeed or all rollback, and valid across Producer sessions.

Core Components

  • Transaction Coordinator: Any Broker can serve as transaction coordinator, responsible for managing transaction state
  • Transaction Log (__transaction_state Topic): Persistently stores transaction state, ensures coordinator can recover after crash
  • transactional.id: User-configured globally unique transaction ID, identifies same logical Producer across sessions

Transactional Producer Configuration and Usage

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Must set transactional.id (globally unique)
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-producer-001");
// Enable idempotence (prerequisite for transactions)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// Initialize transaction (call once per Producer lifecycle)
producer.initTransactions();

try {
    producer.beginTransaction();

    // Write atomically to multiple partitions
    producer.send(new ProducerRecord<>("topic-a", "key1", "value1"));
    producer.send(new ProducerRecord<>("topic-b", "key2", "value2"));
    producer.send(new ProducerRecord<>("topic-c", "key3", "value3"));

    // Commit transaction
    producer.commitTransaction();
} catch (ProducerFencedException e) {
    // Old instance fenced by new instance, close directly
    producer.close();
} catch (KafkaException e) {
    // Other exceptions, rollback transaction
    producer.abortTransaction();
}

Transactional Consumer Configuration

Consumer needs to set isolation level to only read committed transaction messages:

// read_committed: Only read committed messages (ignore uncommitted/rolled back)
// read_uncommitted: Read all messages (default, including uncommitted)
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");

End-to-End Exactly-Once (EOS)

Complete end-to-end EOS = Idempotent Producer + Transaction + Consumer offset atomic commit.

Typical “read-process-write” pattern (Consume-Transform-Produce):

producer.initTransactions();

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    if (!records.isEmpty()) {
        producer.beginTransaction();

        try {
            for (ConsumerRecord<String, String> record : records) {
                // Business processing
                String result = process(record.value());
                producer.send(new ProducerRecord<>("output-topic", record.key(), result));
            }

            // Commit consumer offset as part of transaction
            // Ensures atomicity of message processing and offset commit
            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partRecords = records.records(partition);
                long lastOffset = partRecords.get(partRecords.size() - 1).offset();
                offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
            }
            producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());

            producer.commitTransaction();
        } catch (Exception e) {
            producer.abortTransaction();
        }
    }
}

Using sendOffsetsToTransaction() to include consumer offset commit in transaction ensures even if Producer crashes, after restart it won’t duplicate process or miss messages, achieving true end-to-end Exactly-Once semantics.

Summary

MechanismProblem SolvedApplicable Scope
Idempotent ProducerDeduplication within single partition/sessionSimple retry scenarios
TransactionsCross-partition, cross-session atomic writesMulti-partition writes, ETL pipelines
EOS (Idempotence + transactions + atomic offset commit)End-to-end Exactly-OnceStream processing, financial transactions

Kafka’s Exactly-Once implementation costs performance overhead: transactions introduce extra coordination rounds and log writes. In scenarios requiring extremely high data consistency, this cost is worthwhile.