本文是大数据系列第 56 篇,介绍如何在 Spring Boot 中集成 Apache Kafka,实现同步/异步消息发送与注解驱动的消费。
Kafka 核心概念回顾
在接入 Spring Boot 之前,先梳理几个关键角色:
- Producer(生产者):消息发布者,负责将消息写入指定 Topic,支持批量发送、ACK 确认和自定义分区策略。
- Consumer(消费者):以拉取(Pull)方式从 Broker 读取消息,可加入 Consumer Group 共同消费一个 Topic。
- Broker:Kafka 集群中的单个节点,负责接收、持久化消息,并维护副本(Replica)。
- Topic / Partition:Topic 是消息分类,Partition 是 Topic 的有序、不可变分片,分布在不同 Broker 上支持并行消费。
- Consumer Group:同一 Group 内每个 Partition 只分配给一个 Consumer,天然实现负载均衡。
项目依赖
使用 Spring Boot 2.2.x + Java 8,在 pom.xml 中引入:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.2.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
application.yml 配置
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest 表示当 Consumer 没有已提交 offset 时,从最早的消息开始消费。
生产者实现
Spring-Kafka 提供 KafkaTemplate 封装底层发送逻辑:
@RestController
public class ProducerController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
// 同步发送:阻塞等待 Broker 确认
@GetMapping("/sendSync/{message}")
public String sendSync(@PathVariable String message) throws Exception {
ProducerRecord<String, String> record =
new ProducerRecord<>("wzk_topic_test", message);
RecordMetadata metadata = kafkaTemplate.send(record).get(); // 阻塞
return "partition=" + metadata.partition() + ", offset=" + metadata.offset();
}
// 异步发送:通过回调处理结果
@GetMapping("/sendAsync/{message}")
public String sendAsync(@PathVariable String message) {
ProducerRecord<String, String> record =
new ProducerRecord<>("wzk_topic_test", message);
kafkaTemplate.send(record).addCallback(
result -> System.out.println("发送成功: " + result.getRecordMetadata().offset()),
ex -> System.err.println("发送失败: " + ex.getMessage())
);
return "async sent";
}
}
同步 vs 异步的选择:同步发送简单可靠,但吞吐量低;异步发送适合高并发场景,需在回调中处理失败重试。
消费者实现
使用 @KafkaListener 注解声明消费者,框架自动管理 offset 提交:
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"wzk_topic_test"}, groupId = "my-group")
public void consume(ConsumerRecord<String, String> record) {
System.out.printf("topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
}
ConsumerRecord 包含完整的消息元数据(topic、partition、offset、key、value),便于排查消息丢失或重复消费问题。
Spring-Kafka 核心组件总结
| 组件 | 说明 |
|---|---|
KafkaTemplate | 生产者发送封装,支持同步/异步两种模式 |
@KafkaListener | 声明式消费者,自动完成订阅、反序列化、offset 提交 |
ConsumerRecord | 消费端接收到的完整消息对象 |
ProducerRecord | 生产端构造的消息对象,可指定 topic、partition、key |
验证
启动应用后,访问 /sendSync/hello,控制台输出类似:
发送成功: partition=0, offset=42
topic=wzk_topic_test, partition=0, offset=42, key=null, value=hello
说明消息从生产到消费整条链路通畅。下一篇将深入 Producer 内部发送流程与核心参数调优。