本文是大数据系列第 55 篇,介绍 Kafka 的日常运维操作:守护进程启动、Shell 主题管理命令,以及 Java 客户端 Producer/Consumer 完整示例。

完整图文版(含截图):CSDN 原文 | 掘金

Kafka 服务启动

守护进程模式

生产环境必须以守护进程方式启动,防止 SSH 断连后服务终止:

kafka-server-start.sh -daemon /opt/servers/kafka_2.12-2.7.2/config/server.properties

验证服务状态

# 查看 Kafka 进程
jps
# 输出包含 Kafka 表示启动成功

# 通过系统进程确认
ps aux | grep kafka

停止服务

kafka-server-stop.sh

Shell 主题管理

Kafka 提供 kafka-topics.sh 脚本进行主题的增删改查:

查看所有主题

kafka-topics.sh --list --zookeeper h121.wzk.icu:2181

创建主题

kafka-topics.sh --zookeeper h121.wzk.icu:2181 \
  --create \
  --topic wzk_topic_1 \
  --partitions 3 \
  --replication-factor 2

参数说明:

参数说明
--partitions分区数,决定并发消费能力
--replication-factor副本数,不能超过 Broker 总数

查看主题详情

kafka-topics.sh --zookeeper h121.wzk.icu:2181 \
  --describe \
  --topic wzk_topic_1

输出示例:

Topic: wzk_topic_1  PartitionCount: 3  ReplicationFactor: 2  Configs:
  Topic: wzk_topic_1  Partition: 0  Leader: 1  Replicas: 1,2  Isr: 1,2
  Topic: wzk_topic_1  Partition: 1  Leader: 2  Replicas: 2,3  Isr: 2,3
  Topic: wzk_topic_1  Partition: 2  Leader: 3  Replicas: 3,1  Isr: 3,1

修改主题

# 增加分区数(只能增加,不能减少)
kafka-topics.sh --zookeeper h121.wzk.icu:2181 \
  --alter \
  --topic wzk_topic_1 \
  --partitions 6

删除主题

kafka-topics.sh --zookeeper h121.wzk.icu:2181 \
  --delete \
  --topic wzk_topic_1

注意:删除需要 server.properties 中配置 delete.topic.enable=true(默认为 true)。

Shell 消息收发

控制台 Producer

kafka-console-producer.sh \
  --topic wzk_topic_test \
  --broker-list h121.wzk.icu:9092

启动后每行输入即为一条消息,Ctrl+C 退出。

控制台 Consumer

# 从最新消息开始消费
kafka-console-consumer.sh \
  --bootstrap-server h121.wzk.icu:9092 \
  --topic wzk_topic_test

# 从头开始消费(重放所有历史消息)
kafka-console-consumer.sh \
  --bootstrap-server h121.wzk.icu:9092 \
  --topic wzk_topic_test \
  --from-beginning

Java 客户端开发

Maven 依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.7.2</version>
</dependency>

关键配置参数速查

参数说明示例值
bootstrap.serversBroker 地址列表h121.wzk.icu:9092
key.serializerKey 序列化器IntegerSerializer
value.serializerValue 序列化器StringSerializer
acks生产者可靠性级别0 / 1 / all
key.deserializerKey 反序列化器IntegerDeserializer
value.deserializerValue 反序列化器StringDeserializer
group.id消费者组 IDwzk-test
enable.auto.commit自动提交 Offsettrue / false
auto.offset.resetOffset 重置策略earliest / latest

Producer 完整示例

import org.apache.kafka.clients.producer.*;
import java.util.*;
import java.util.concurrent.*;

public class TestProducer01 {
    public static void main(String[] args) throws Exception {
        // 配置参数
        Map<String, Object> configs = new HashMap<>();
        configs.put("bootstrap.servers", "h121.wzk.icu:9092");
        configs.put("key.serializer",
            "org.apache.kafka.common.serialization.IntegerSerializer");
        configs.put("value.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");
        configs.put("acks", "1");

        KafkaProducer<Integer, String> producer = new KafkaProducer<>(configs);

        // 构造消息:(topic, partition, key, value)
        // partition=0 表示指定写入 Partition 0;不指定则按分区策略路由
        ProducerRecord<Integer, String> record =
            new ProducerRecord<>("wzk_topic_test", 0, 0, "hello world by java!");

        // 同步等待发送结果(超时 3 秒)
        Future<RecordMetadata> future = producer.send(record);
        RecordMetadata metadata = future.get(3_000, TimeUnit.MILLISECONDS);
        System.out.println("Sent to partition " + metadata.partition()
            + " at offset " + metadata.offset());

        producer.close();
    }
}

异步发送(推荐生产使用):

producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        // 记录错误日志,按需重试
        exception.printStackTrace();
    } else {
        System.out.println("Partition: " + metadata.partition()
            + ", Offset: " + metadata.offset());
    }
});
producer.flush(); // 确保缓冲区消息全部发出
producer.close();

Consumer 完整示例

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;

public class TestConsumer01 {
    public static void main(String[] args) {
        Map<String, Object> configs = new HashMap<>();
        configs.put("bootstrap.servers", "h121.wzk.icu:9092");
        configs.put("key.deserializer",
            "org.apache.kafka.common.serialization.IntegerDeserializer");
        configs.put("value.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");
        configs.put("group.id", "wzk-test");
        configs.put("enable.auto.commit", "true");
        configs.put("auto.offset.reset", "latest");

        KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(configs);

        // 订阅 Topic,并注册 Rebalance 监听器
        consumer.subscribe(
            Collections.singletonList("wzk_topic_test"),
            new ConsumerRebalanceListener() {
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    // Rebalance 前:可在此提交当前 Offset,避免重复消费
                    partitions.forEach(p ->
                        System.out.println("Partition revoked: " + p.partition()));
                }

                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    // Rebalance 后:新分配的 Partition,可在此重置消费位置
                    partitions.forEach(p ->
                        System.out.println("Partition assigned: " + p.partition()));
                }
            }
        );

        // 拉取消息(超时 3 秒)
        ConsumerRecords<Integer, String> records =
            consumer.poll(java.time.Duration.ofMillis(3_000));

        records.forEach(record -> {
            System.out.println("Key:       " + record.key());
            System.out.println("Value:     " + record.value());
            System.out.println("Partition: " + record.partition());
            System.out.println("Offset:    " + record.offset());
            System.out.println("---");
        });

        consumer.close();
    }
}

持续消费循环模式

生产环境 Consumer 通常以无限循环方式运行:

try {
    while (true) {
        ConsumerRecords<Integer, String> records =
            consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<Integer, String> record : records) {
            // 处理业务逻辑
            processRecord(record);
        }
        // 手动提交 Offset(enable.auto.commit=false 时使用)
        consumer.commitSync();
    }
} finally {
    consumer.close();
}

常见问题排查

现象可能原因排查方向
Consumer 不消费新消息auto.offset.reset=latest 且已有历史 Offset检查 __consumer_offsets 中的提交记录
Rebalance 频繁触发消费耗时过长超过 max.poll.interval.ms增大超时或减小 max.poll.records
消息重复消费自动提交 + 消费异常改为手动提交,消费成功后再 commitSync
分区 Leader 不均衡Broker 重启后未触发优先副本选举执行 kafka-preferred-replica-election.sh

小结

  • 生产环境始终使用 -daemon 守护进程启动 Kafka
  • kafka-topics.sh 是日常运维的核心工具,创建主题时合理规划分区数和副本数
  • Java 客户端通过 KafkaProducerKafkaConsumer 实现消息收发,acksenable.auto.commit 是影响可靠性的关键参数
  • ConsumerRebalanceListener 用于在分区重分配时执行 Offset 提交等清理操作,保障消息处理的幂等性