This is article 59 in the Big Data series, focusing on Kafka Producer Interceptor mechanism, explaining its two core interception points, interceptor chain composition, and common production use cases.
What is Kafka Interceptor
Kafka 0.10 introduced Producer and Consumer interceptors, allowing monitoring and customized processing of message streams without modifying business code. This follows the same philosophy as Servlet Filter and Spring AOP, standard approach for cross-cutting concerns.
Common use cases:
- Inject distributed tracing ID (Trace ID)
- Message encryption/decryption
- Message format validation or transformation
- Send success/failure metrics reporting
- A/B testing message routing
Producer Interceptor’s Two Interception Points
ProducerInterceptor<K, V> interface defines four methods, with two core interception points:
onSend(ProducerRecord)
Execution thread: Producer main thread
Execution timing: Inside KafkaProducer.send(), before serialization and partition routing
Can modify message content, add headers, inject metadata. Note: Don’t modify topic and partition, otherwise it will interfere with subsequent partition calculation.
onAcknowledgement(RecordMetadata, Exception)
Execution thread: Producer IO thread (Sender thread)
Execution timing: After Broker returns ACK, before triggering business callback
Suitable for statistics, monitoring, and resource cleanup. Avoid time-consuming operations here (like synchronous HTTP requests), otherwise it will block IO thread and affect overall throughput.
Implement Custom Interceptor
public class TimestampInterceptor<K, V> implements ProducerInterceptor<K, V> {
private final AtomicLong successCount = new AtomicLong(0);
private final AtomicLong failureCount = new AtomicLong(0);
@Override
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
// Prepend timestamp to value (example only, recommend Headers in production)
String newValue = System.currentTimeMillis() + "|" + record.value();
return new ProducerRecord<>(
record.topic(), record.partition(), record.timestamp(),
record.key(), (V) newValue, record.headers());
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// Note: this method executes in IO thread, use atomic variables for thread safety
if (exception == null) {
successCount.incrementAndGet();
} else {
failureCount.incrementAndGet();
}
}
@Override
public void close() {
// Called when Producer closes, can print final stats or release resources here
System.out.printf("Send stats - Success: %d, Failed: %d%n",
successCount.get(), failureCount.get());
}
@Override
public void configure(Map<String, ?> configs) {
// Can read custom parameters from Producer config
}
}
Interceptor Chain
Multiple interceptors can be chained, processing the same message in sequence:
configs.put(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
"com.example.TimestampInterceptor,com.example.TracingInterceptor"
);
Execution order: TimestampInterceptor.onSend() → TracingInterceptor.onSend() → serialization → partition → buffer → send → TracingInterceptor.onAcknowledgement() → TimestampInterceptor.onAcknowledgement() (reverse order for onAcknowledgement).
Error isolation: If an interceptor’s onSend() throws exception, framework logs and continues executing subsequent interceptors in chain, single point failure doesn’t affect message sending.
Complete Sending Flow (with Interceptor)
KafkaProducer.send()
↓
[Main Thread] Interceptor chain onSend() (in order)
↓
[Main Thread] Serialize key & value
↓
[Main Thread] Partition routing Partitioner
↓
[Main Thread] Write to RecordAccumulator
↓
[IO Thread] Sender batch send to Broker
↓
[IO Thread] Interceptor chain onAcknowledgement() → Business callback
Thread Safety Notes
onSend() executes in main thread, onAcknowledgement() in IO thread, both share the same interceptor instance. Therefore:
- Shared state like counters must use thread-safe classes like
AtomicLong,ConcurrentHashMap. - Avoid using non-thread-safe classes like
SimpleDateFormat,HashMapin interceptors.
Summary
Kafka interceptors are best practice for Producer extension points. Compared to directly modifying business code, they offer low intrusion, composability, and pluggability. Reasonably using interceptor chains can add tracing, monitoring, encryption capabilities to message systems without changing core logic.