This is article 57 in the Big Data series, deeply analyzing Kafka Producer internals, breaking down the complete chain from send() call to message persistence, and summarizing core parameters affecting throughput and reliability.

Producer Initialization Phase

When calling new KafkaProducer(configs), framework synchronously initializes:

  • Sender thread: Runs as daemon thread, responsible for taking batches from buffer and sending to Broker over network.
  • RecordAccumulator: Message buffer, default 32 MB, messages aggregate into batches here.
  • Interceptor chain: Loads ProducerInterceptor implementations in configured order.
  • Serializer & Partitioner: Loaded based on key.serializer, value.serializer, partitioner.class.

Complete Message Sending Flow

send(ProducerRecord)

[1] Interceptor chain (onSend)

[2] Serialization (key & value → byte[])

[3] Partition routing (Partitioner)

[4] Write to RecordAccumulator (per TopicPartition queue)

[5] Sender thread batch send to Leader Broker

[6] Broker writes to PageCache → ISR replica sync

[7] Return ACK → Trigger callback (onAcknowledgement)

Step 1: Interceptor Processing

Messages pass through each ProducerInterceptor.onSend() sequentially in main thread, can be used for cross-cutting logic like injecting trace ID, encryption.

Step 2: Serialization

Key and Value are converted to byte arrays via configured serializers. Kafka built-in StringSerializer, ByteArraySerializer, IntegerSerializer, etc. Custom serializer implements Serializer<T> interface.

Step 3: Partition Routing

Partitioner.partition() returns target partition number, default strategy:

  • Message with Key: hash(key) % partitionCount, ensures same Key messages ordered.
  • Message without Key: Round-Robin for even distribution.
  • Explicit partition parameter: Use directly.

Step 4: Buffer & Batch

Messages enter RecordAccumulator, each TopicPartition maintains independent batch queue (Deque<ProducerBatch>). Conditions to trigger Sender thread (any one):

  • Current batch size ≥ batch.size (default 16 KB)
  • Wait time ≥ linger.ms (default 0 ms, send immediately)

High throughput scenarios recommend setting linger.ms to 20-50 ms, significantly improves batch fill rate.

Step 5: Network Transmission & ACK

Sender thread waits for confirmation based on acks configuration:

acks ValueSemanticsRisk
0No waiting for Broker responseMessage may be lost
1 (default)Wait for Leader write confirmationMay be lost when Leader crashes
all / -1Wait for all ISR replicasHighest reliability, slightly higher latency

Core Parameters Quick Reference

ParameterDefaultDescription
batch.size16384 (16 KB)Max bytes per batch
linger.ms0Batch wait time limit
buffer.memory33554432 (32 MB)RecordAccumulator total capacity
retriesInteger.MAX_VALUERetry count on failure
acks1ACK confirmation level
retry.backoff.ms100Retry interval (exponential backoff supported)
request.timeout.ms30000Broker response wait timeout
max.block.ms60000send() block limit when buffer full
compression.typenoneCompression algorithm (gzip/snappy/lz4)
max.in.flight.requests.per.connection5Max unconfirmed requests per connection
client.id""Producer identifier, for Broker logs

Reliability and Ordering

  • Strict ordering: Set max.in.flight.requests.per.connection to 1, ensures same partition messages ordered (cost is reduced throughput).
  • Idempotent sending: Enable enable.idempotence=true, Broker auto deduplicates, avoids duplicate messages from retries.
  • Transactional sending: Use transactional.id for cross-partition atomic writes.

Error Handling

Retryable errors (network jitter, Leader election) trigger auto retry; non-retryable errors (message too large, serialization failure) directly callback onFailure(). Recommend logging failed messages to dead letter queue in callback to avoid data loss.