This is article 56 in the Big Data series, introducing how to integrate Apache Kafka in Spring Boot for sync/async message sending and annotation-driven consumption.
Kafka Core Concepts Review
Before integrating with Spring Boot, review key roles:
- Producer: Message publisher, writes messages to specified Topic, supports batch sending, ACK confirmation, and custom partitioning strategy.
- Consumer: Pulls messages from Broker, can join Consumer Group to collectively consume a Topic.
- Broker: Single node in Kafka cluster, responsible for receiving, persisting messages, and maintaining replicas.
- Topic / Partition: Topic is message classification; Partition is ordered, immutable shard of Topic, distributed across Brokers for parallel consumption.
- Consumer Group: Within same Group, each Partition only assigned to one Consumer, naturally implements load balancing.
Project Dependencies
Using Spring Boot 2.2.x + Java 8, add to pom.xml:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.2.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
application.yml Configuration
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: my-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest means when Consumer has no committed Offset, start consuming from earliest message.
Producer Implementation
Spring-Kafka provides KafkaTemplate wrapping underlying sending logic:
@RestController
public class ProducerController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
// Sync send: block waiting for Broker confirmation
@GetMapping("/sendSync/{message}")
public String sendSync(@PathVariable String message) throws Exception {
ProducerRecord<String, String> record =
new ProducerRecord<>("wzk_topic_test", message);
RecordMetadata metadata = kafkaTemplate.send(record).get(); // block
return "partition=" + metadata.partition() + ", offset=" + metadata.offset();
}
// Async send: handle result via callback
@GetMapping("/sendAsync/{message}")
public String sendAsync(@PathVariable String message) {
ProducerRecord<String, String> record =
new ProducerRecord<>("wzk_topic_test", message);
kafkaTemplate.send(record).addCallback(
result -> System.out.println("Send success: " + result.getRecordMetadata().offset()),
ex -> System.err.println("Send failed: " + ex.getMessage())
);
return "async sent";
}
}
Sync vs Async selection: Sync is simple and reliable but low throughput; Async suitable for high concurrency scenarios, handle retry in callback.
Consumer Implementation
Use @KafkaListener annotation to declare consumer, framework auto-manages offset commit:
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"wzk_topic_test"}, groupId = "my-group")
public void consume(ConsumerRecord<String, String> record) {
System.out.printf("topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
}
ConsumerRecord contains complete message metadata (topic, partition, offset, key, value), convenient for troubleshooting message loss or duplicate consumption.
Spring-Kafka Core Components Summary
| Component | Description |
|---|---|
KafkaTemplate | Producer send wrapper, supports sync/async modes |
@KafkaListener | Declarative consumer, auto completes subscription, deserialization, offset commit |
ConsumerRecord | Complete message object received by consumer |
ProducerRecord | Message object constructed by producer, can specify topic, partition, key |
Verification
After starting the application, access /sendSync/hello, console output similar to:
Send success: partition=0, offset=42
topic=wzk_topic_test, partition=0, offset=42, key=null, value=hello
Indicates message flows successfully from production to consumption. Next article will dive into Producer internal sending flow and core parameter tuning.