本文是大数据系列第 56 篇,介绍如何在 Spring Boot 中集成 Apache Kafka,实现同步/异步消息发送与注解驱动的消费。

完整图文版(含截图):CSDN 原文 | 掘金

Kafka 核心概念回顾

在接入 Spring Boot 之前,先梳理几个关键角色:

  • Producer(生产者):消息发布者,负责将消息写入指定 Topic,支持批量发送、ACK 确认和自定义分区策略。
  • Consumer(消费者):以拉取(Pull)方式从 Broker 读取消息,可加入 Consumer Group 共同消费一个 Topic。
  • Broker:Kafka 集群中的单个节点,负责接收、持久化消息,并维护副本(Replica)。
  • Topic / Partition:Topic 是消息分类,Partition 是 Topic 的有序、不可变分片,分布在不同 Broker 上支持并行消费。
  • Consumer Group:同一 Group 内每个 Partition 只分配给一个 Consumer,天然实现负载均衡。

项目依赖

使用 Spring Boot 2.2.x + Java 8,在 pom.xml 中引入:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.2.RELEASE</version>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
</dependencies>

application.yml 配置

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

auto-offset-reset: earliest 表示当 Consumer 没有已提交 offset 时,从最早的消息开始消费。

生产者实现

Spring-Kafka 提供 KafkaTemplate 封装底层发送逻辑:

@RestController
public class ProducerController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    // 同步发送:阻塞等待 Broker 确认
    @GetMapping("/sendSync/{message}")
    public String sendSync(@PathVariable String message) throws Exception {
        ProducerRecord<String, String> record =
            new ProducerRecord<>("wzk_topic_test", message);
        RecordMetadata metadata = kafkaTemplate.send(record).get(); // 阻塞
        return "partition=" + metadata.partition() + ", offset=" + metadata.offset();
    }

    // 异步发送:通过回调处理结果
    @GetMapping("/sendAsync/{message}")
    public String sendAsync(@PathVariable String message) {
        ProducerRecord<String, String> record =
            new ProducerRecord<>("wzk_topic_test", message);
        kafkaTemplate.send(record).addCallback(
            result -> System.out.println("发送成功: " + result.getRecordMetadata().offset()),
            ex   -> System.err.println("发送失败: " + ex.getMessage())
        );
        return "async sent";
    }
}

同步 vs 异步的选择:同步发送简单可靠,但吞吐量低;异步发送适合高并发场景,需在回调中处理失败重试。

消费者实现

使用 @KafkaListener 注解声明消费者,框架自动管理 offset 提交:

@Component
public class KafkaConsumer {

    @KafkaListener(topics = {"wzk_topic_test"}, groupId = "my-group")
    public void consume(ConsumerRecord<String, String> record) {
        System.out.printf("topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
            record.topic(), record.partition(), record.offset(),
            record.key(), record.value());
    }
}

ConsumerRecord 包含完整的消息元数据(topic、partition、offset、key、value),便于排查消息丢失或重复消费问题。

Spring-Kafka 核心组件总结

组件说明
KafkaTemplate生产者发送封装,支持同步/异步两种模式
@KafkaListener声明式消费者,自动完成订阅、反序列化、offset 提交
ConsumerRecord消费端接收到的完整消息对象
ProducerRecord生产端构造的消息对象,可指定 topic、partition、key

验证

启动应用后,访问 /sendSync/hello,控制台输出类似:

发送成功: partition=0, offset=42
topic=wzk_topic_test, partition=0, offset=42, key=null, value=hello

说明消息从生产到消费整条链路通畅。下一篇将深入 Producer 内部发送流程与核心参数调优。