本文是大数据系列第 59 篇,聚焦 Kafka Producer 的拦截器(Interceptor)机制,讲解其两个核心拦截点的工作原理、拦截器链的组合方式,以及常见生产用途。
什么是 Kafka 拦截器
Kafka 0.10 引入了 Producer 和 Consumer 拦截器,允许在不修改业务代码的前提下,对消息流进行监控和定制处理。这与 Servlet Filter、Spring AOP 的思想一脉相承,属于横切关注点(Cross-Cutting Concern)的标准处理方式。
常见应用场景:
- 注入分布式追踪 ID(Trace ID)
- 消息加密 / 解密
- 消息格式校验或转换
- 发送成功 / 失败指标上报
- A/B 测试消息路由
Producer 拦截器的两个拦截点
ProducerInterceptor<K, V> 接口定义了四个方法,核心的两个拦截点如下:
onSend(ProducerRecord)
执行线程:Producer 主线程
执行时机:KafkaProducer.send() 内部,序列化和分区路由之前
可在此修改消息内容、添加 headers、注入元数据。注意:不要修改 topic 和 partition,否则会干扰后续的分区计算。
onAcknowledgement(RecordMetadata, Exception)
执行线程:Producer IO 线程(Sender 线程)
执行时机:Broker 返回 ACK 之后、触发业务回调之前
适合做统计、监控和资源清理。切忌在此执行耗时操作(如同步 HTTP 请求),否则会阻塞 IO 线程,影响整体吞吐。
实现自定义拦截器
public class TimestampInterceptor<K, V> implements ProducerInterceptor<K, V> {
private final AtomicLong successCount = new AtomicLong(0);
private final AtomicLong failureCount = new AtomicLong(0);
@Override
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
// 在 value 前追加时间戳前缀(仅示例,生产中推荐用 Headers)
String newValue = System.currentTimeMillis() + "|" + record.value();
return new ProducerRecord<>(
record.topic(), record.partition(), record.timestamp(),
record.key(), (V) newValue, record.headers());
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// 注意:此方法在 IO 线程执行,使用原子变量保证线程安全
if (exception == null) {
successCount.incrementAndGet();
} else {
failureCount.incrementAndGet();
}
}
@Override
public void close() {
// Producer 关闭时调用,可在此打印最终统计或释放资源
System.out.printf("发送统计 - 成功: %d, 失败: %d%n",
successCount.get(), failureCount.get());
}
@Override
public void configure(Map<String, ?> configs) {
// 可从 Producer 配置中读取自定义参数
}
}
拦截器链
多个拦截器可以串联成链,按顺序依次处理同一条消息:
configs.put(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
"com.example.TimestampInterceptor,com.example.TracingInterceptor"
);
执行顺序:TimestampInterceptor.onSend() → TracingInterceptor.onSend() → 序列化 → 分区 → 缓冲 → 发送 → TracingInterceptor.onAcknowledgement() → TimestampInterceptor.onAcknowledgement()(onAcknowledgement 逆序执行)。
错误隔离:若某个拦截器的 onSend() 抛出异常,框架会记录日志并继续执行链中后续的拦截器,单点失败不影响消息发送流程。
完整发送流程(含拦截器)
KafkaProducer.send()
↓
[主线程] 拦截器链 onSend() (按序)
↓
[主线程] 序列化 key & value
↓
[主线程] 分区路由 Partitioner
↓
[主线程] 写入 RecordAccumulator
↓
[IO线程] Sender 批量发送给 Broker
↓
[IO线程] 拦截器链 onAcknowledgement() → 业务回调
线程安全注意事项
onSend() 在主线程执行,onAcknowledgement() 在 IO 线程执行,两者共享同一个拦截器实例。因此:
- 计数器等共享状态必须使用
AtomicLong、ConcurrentHashMap等线程安全类。 - 避免在拦截器中使用非线程安全的
SimpleDateFormat、HashMap等。
小结
Kafka 拦截器是 Producer 扩展点的最佳实践,相比直接修改业务代码,具有低侵入、可组合、可插拔的优势。合理利用拦截器链,可以在不改动核心逻辑的情况下为消息系统增加追踪、监控、加密等能力。