This is article 59 in the Big Data series, focusing on Kafka Producer Interceptor mechanism, explaining its two core interception points, interceptor chain composition, and common production use cases.

What is Kafka Interceptor

Kafka 0.10 introduced Producer and Consumer interceptors, allowing monitoring and customized processing of message streams without modifying business code. This follows the same philosophy as Servlet Filter and Spring AOP, standard approach for cross-cutting concerns.

Common use cases:

  • Inject distributed tracing ID (Trace ID)
  • Message encryption/decryption
  • Message format validation or transformation
  • Send success/failure metrics reporting
  • A/B testing message routing

Producer Interceptor’s Two Interception Points

ProducerInterceptor<K, V> interface defines four methods, with two core interception points:

onSend(ProducerRecord)

Execution thread: Producer main thread
Execution timing: Inside KafkaProducer.send(), before serialization and partition routing

Can modify message content, add headers, inject metadata. Note: Don’t modify topic and partition, otherwise it will interfere with subsequent partition calculation.

onAcknowledgement(RecordMetadata, Exception)

Execution thread: Producer IO thread (Sender thread)
Execution timing: After Broker returns ACK, before triggering business callback

Suitable for statistics, monitoring, and resource cleanup. Avoid time-consuming operations here (like synchronous HTTP requests), otherwise it will block IO thread and affect overall throughput.

Implement Custom Interceptor

public class TimestampInterceptor<K, V> implements ProducerInterceptor<K, V> {

    private final AtomicLong successCount = new AtomicLong(0);
    private final AtomicLong failureCount = new AtomicLong(0);

    @Override
    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
        // Prepend timestamp to value (example only, recommend Headers in production)
        String newValue = System.currentTimeMillis() + "|" + record.value();
        return new ProducerRecord<>(
            record.topic(), record.partition(), record.timestamp(),
            record.key(), (V) newValue, record.headers());
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        // Note: this method executes in IO thread, use atomic variables for thread safety
        if (exception == null) {
            successCount.incrementAndGet();
        } else {
            failureCount.incrementAndGet();
        }
    }

    @Override
    public void close() {
        // Called when Producer closes, can print final stats or release resources here
        System.out.printf("Send stats - Success: %d, Failed: %d%n",
            successCount.get(), failureCount.get());
    }

    @Override
    public void configure(Map<String, ?> configs) {
        // Can read custom parameters from Producer config
    }
}

Interceptor Chain

Multiple interceptors can be chained, processing the same message in sequence:

configs.put(
    ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
    "com.example.TimestampInterceptor,com.example.TracingInterceptor"
);

Execution order: TimestampInterceptor.onSend()TracingInterceptor.onSend() → serialization → partition → buffer → send → TracingInterceptor.onAcknowledgement()TimestampInterceptor.onAcknowledgement() (reverse order for onAcknowledgement).

Error isolation: If an interceptor’s onSend() throws exception, framework logs and continues executing subsequent interceptors in chain, single point failure doesn’t affect message sending.

Complete Sending Flow (with Interceptor)

KafkaProducer.send()

[Main Thread] Interceptor chain onSend() (in order)

[Main Thread] Serialize key & value

[Main Thread] Partition routing Partitioner

[Main Thread] Write to RecordAccumulator

[IO Thread] Sender batch send to Broker

[IO Thread] Interceptor chain onAcknowledgement() → Business callback

Thread Safety Notes

onSend() executes in main thread, onAcknowledgement() in IO thread, both share the same interceptor instance. Therefore:

  • Shared state like counters must use thread-safe classes like AtomicLong, ConcurrentHashMap.
  • Avoid using non-thread-safe classes like SimpleDateFormat, HashMap in interceptors.

Summary

Kafka interceptors are best practice for Producer extension points. Compared to directly modifying business code, they offer low intrusion, composability, and pluggability. Reasonably using interceptor chains can add tracing, monitoring, encryption capabilities to message systems without changing core logic.