This is article 56 in the Big Data series, introducing how to integrate Apache Kafka in Spring Boot for sync/async message sending and annotation-driven consumption.

Kafka Core Concepts Review

Before integrating with Spring Boot, review key roles:

  • Producer: Message publisher, writes messages to specified Topic, supports batch sending, ACK confirmation, and custom partitioning strategy.
  • Consumer: Pulls messages from Broker, can join Consumer Group to collectively consume a Topic.
  • Broker: Single node in Kafka cluster, responsible for receiving, persisting messages, and maintaining replicas.
  • Topic / Partition: Topic is message classification; Partition is ordered, immutable shard of Topic, distributed across Brokers for parallel consumption.
  • Consumer Group: Within same Group, each Partition only assigned to one Consumer, naturally implements load balancing.

Project Dependencies

Using Spring Boot 2.2.x + Java 8, add to pom.xml:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.2.RELEASE</version>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
</dependencies>

application.yml Configuration

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

auto-offset-reset: earliest means when Consumer has no committed Offset, start consuming from earliest message.

Producer Implementation

Spring-Kafka provides KafkaTemplate wrapping underlying sending logic:

@RestController
public class ProducerController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    // Sync send: block waiting for Broker confirmation
    @GetMapping("/sendSync/{message}")
    public String sendSync(@PathVariable String message) throws Exception {
        ProducerRecord<String, String> record =
            new ProducerRecord<>("wzk_topic_test", message);
        RecordMetadata metadata = kafkaTemplate.send(record).get(); // block
        return "partition=" + metadata.partition() + ", offset=" + metadata.offset();
    }

    // Async send: handle result via callback
    @GetMapping("/sendAsync/{message}")
    public String sendAsync(@PathVariable String message) {
        ProducerRecord<String, String> record =
            new ProducerRecord<>("wzk_topic_test", message);
        kafkaTemplate.send(record).addCallback(
            result -> System.out.println("Send success: " + result.getRecordMetadata().offset()),
            ex   -> System.err.println("Send failed: " + ex.getMessage())
        );
        return "async sent";
    }
}

Sync vs Async selection: Sync is simple and reliable but low throughput; Async suitable for high concurrency scenarios, handle retry in callback.

Consumer Implementation

Use @KafkaListener annotation to declare consumer, framework auto-manages offset commit:

@Component
public class KafkaConsumer {

    @KafkaListener(topics = {"wzk_topic_test"}, groupId = "my-group")
    public void consume(ConsumerRecord<String, String> record) {
        System.out.printf("topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
            record.topic(), record.partition(), record.offset(),
            record.key(), record.value());
    }
}

ConsumerRecord contains complete message metadata (topic, partition, offset, key, value), convenient for troubleshooting message loss or duplicate consumption.

Spring-Kafka Core Components Summary

ComponentDescription
KafkaTemplateProducer send wrapper, supports sync/async modes
@KafkaListenerDeclarative consumer, auto completes subscription, deserialization, offset commit
ConsumerRecordComplete message object received by consumer
ProducerRecordMessage object constructed by producer, can specify topic, partition, key

Verification

After starting the application, access /sendSync/hello, console output similar to:

Send success: partition=0, offset=42
topic=wzk_topic_test, partition=0, offset=42, key=null, value=hello

Indicates message flows successfully from production to consumption. Next article will dive into Producer internal sending flow and core parameter tuning.