本文是大数据系列第 59 篇,聚焦 Kafka Producer 的拦截器(Interceptor)机制,讲解其两个核心拦截点的工作原理、拦截器链的组合方式,以及常见生产用途。

完整图文版(含截图):CSDN 原文 | 掘金

什么是 Kafka 拦截器

Kafka 0.10 引入了 Producer 和 Consumer 拦截器,允许在不修改业务代码的前提下,对消息流进行监控和定制处理。这与 Servlet Filter、Spring AOP 的思想一脉相承,属于横切关注点(Cross-Cutting Concern)的标准处理方式。

常见应用场景:

  • 注入分布式追踪 ID(Trace ID)
  • 消息加密 / 解密
  • 消息格式校验或转换
  • 发送成功 / 失败指标上报
  • A/B 测试消息路由

Producer 拦截器的两个拦截点

ProducerInterceptor<K, V> 接口定义了四个方法,核心的两个拦截点如下:

onSend(ProducerRecord)

执行线程:Producer 主线程
执行时机:KafkaProducer.send() 内部,序列化和分区路由之前

可在此修改消息内容、添加 headers、注入元数据。注意:不要修改 topic 和 partition,否则会干扰后续的分区计算。

onAcknowledgement(RecordMetadata, Exception)

执行线程:Producer IO 线程(Sender 线程)
执行时机:Broker 返回 ACK 之后、触发业务回调之前

适合做统计、监控和资源清理。切忌在此执行耗时操作(如同步 HTTP 请求),否则会阻塞 IO 线程,影响整体吞吐。

实现自定义拦截器

public class TimestampInterceptor<K, V> implements ProducerInterceptor<K, V> {

    private final AtomicLong successCount = new AtomicLong(0);
    private final AtomicLong failureCount = new AtomicLong(0);

    @Override
    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
        // 在 value 前追加时间戳前缀(仅示例,生产中推荐用 Headers)
        String newValue = System.currentTimeMillis() + "|" + record.value();
        return new ProducerRecord<>(
            record.topic(), record.partition(), record.timestamp(),
            record.key(), (V) newValue, record.headers());
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        // 注意:此方法在 IO 线程执行,使用原子变量保证线程安全
        if (exception == null) {
            successCount.incrementAndGet();
        } else {
            failureCount.incrementAndGet();
        }
    }

    @Override
    public void close() {
        // Producer 关闭时调用,可在此打印最终统计或释放资源
        System.out.printf("发送统计 - 成功: %d, 失败: %d%n",
            successCount.get(), failureCount.get());
    }

    @Override
    public void configure(Map<String, ?> configs) {
        // 可从 Producer 配置中读取自定义参数
    }
}

拦截器链

多个拦截器可以串联成链,按顺序依次处理同一条消息:

configs.put(
    ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
    "com.example.TimestampInterceptor,com.example.TracingInterceptor"
);

执行顺序TimestampInterceptor.onSend()TracingInterceptor.onSend() → 序列化 → 分区 → 缓冲 → 发送 → TracingInterceptor.onAcknowledgement()TimestampInterceptor.onAcknowledgement()onAcknowledgement 逆序执行)。

错误隔离:若某个拦截器的 onSend() 抛出异常,框架会记录日志并继续执行链中后续的拦截器,单点失败不影响消息发送流程。

完整发送流程(含拦截器)

KafkaProducer.send()

[主线程] 拦截器链 onSend() (按序)

[主线程] 序列化 key & value

[主线程] 分区路由 Partitioner

[主线程] 写入 RecordAccumulator

[IO线程] Sender 批量发送给 Broker

[IO线程] 拦截器链 onAcknowledgement() → 业务回调

线程安全注意事项

onSend() 在主线程执行,onAcknowledgement() 在 IO 线程执行,两者共享同一个拦截器实例。因此:

  • 计数器等共享状态必须使用 AtomicLongConcurrentHashMap 等线程安全类。
  • 避免在拦截器中使用非线程安全的 SimpleDateFormatHashMap 等。

小结

Kafka 拦截器是 Producer 扩展点的最佳实践,相比直接修改业务代码,具有低侵入、可组合、可插拔的优势。合理利用拦截器链,可以在不改动核心逻辑的情况下为消息系统增加追踪、监控、加密等能力。