This is article 57 in the Big Data series, deeply analyzing Kafka Producer internals, breaking down the complete chain from send() call to message persistence, and summarizing core parameters affecting throughput and reliability.
Producer Initialization Phase
When calling new KafkaProducer(configs), framework synchronously initializes:
- Sender thread: Runs as daemon thread, responsible for taking batches from buffer and sending to Broker over network.
- RecordAccumulator: Message buffer, default 32 MB, messages aggregate into batches here.
- Interceptor chain: Loads
ProducerInterceptorimplementations in configured order. - Serializer & Partitioner: Loaded based on
key.serializer,value.serializer,partitioner.class.
Complete Message Sending Flow
send(ProducerRecord)
↓
[1] Interceptor chain (onSend)
↓
[2] Serialization (key & value → byte[])
↓
[3] Partition routing (Partitioner)
↓
[4] Write to RecordAccumulator (per TopicPartition queue)
↓
[5] Sender thread batch send to Leader Broker
↓
[6] Broker writes to PageCache → ISR replica sync
↓
[7] Return ACK → Trigger callback (onAcknowledgement)
Step 1: Interceptor Processing
Messages pass through each ProducerInterceptor.onSend() sequentially in main thread, can be used for cross-cutting logic like injecting trace ID, encryption.
Step 2: Serialization
Key and Value are converted to byte arrays via configured serializers. Kafka built-in StringSerializer, ByteArraySerializer, IntegerSerializer, etc. Custom serializer implements Serializer<T> interface.
Step 3: Partition Routing
Partitioner.partition() returns target partition number, default strategy:
- Message with Key:
hash(key) % partitionCount, ensures same Key messages ordered. - Message without Key: Round-Robin for even distribution.
- Explicit
partitionparameter: Use directly.
Step 4: Buffer & Batch
Messages enter RecordAccumulator, each TopicPartition maintains independent batch queue (Deque<ProducerBatch>). Conditions to trigger Sender thread (any one):
- Current batch size ≥
batch.size(default 16 KB) - Wait time ≥
linger.ms(default 0 ms, send immediately)
High throughput scenarios recommend setting
linger.msto 20-50 ms, significantly improves batch fill rate.
Step 5: Network Transmission & ACK
Sender thread waits for confirmation based on acks configuration:
acks Value | Semantics | Risk |
|---|---|---|
0 | No waiting for Broker response | Message may be lost |
1 (default) | Wait for Leader write confirmation | May be lost when Leader crashes |
all / -1 | Wait for all ISR replicas | Highest reliability, slightly higher latency |
Core Parameters Quick Reference
| Parameter | Default | Description |
|---|---|---|
batch.size | 16384 (16 KB) | Max bytes per batch |
linger.ms | 0 | Batch wait time limit |
buffer.memory | 33554432 (32 MB) | RecordAccumulator total capacity |
retries | Integer.MAX_VALUE | Retry count on failure |
acks | 1 | ACK confirmation level |
retry.backoff.ms | 100 | Retry interval (exponential backoff supported) |
request.timeout.ms | 30000 | Broker response wait timeout |
max.block.ms | 60000 | send() block limit when buffer full |
compression.type | none | Compression algorithm (gzip/snappy/lz4) |
max.in.flight.requests.per.connection | 5 | Max unconfirmed requests per connection |
client.id | "" | Producer identifier, for Broker logs |
Reliability and Ordering
- Strict ordering: Set
max.in.flight.requests.per.connectionto1, ensures same partition messages ordered (cost is reduced throughput). - Idempotent sending: Enable
enable.idempotence=true, Broker auto deduplicates, avoids duplicate messages from retries. - Transactional sending: Use
transactional.idfor cross-partition atomic writes.
Error Handling
Retryable errors (network jitter, Leader election) trigger auto retry; non-retryable errors (message too large, serialization failure) directly callback onFailure(). Recommend logging failed messages to dead letter queue in callback to avoid data loss.