本文是大数据系列第 62 篇,系统介绍 Kafka Topic 的命令行管理、副本分配策略原理与 Java API 编程实践。

完整图文版(含截图):CSDN 原文 | 掘金

kafka-topics.sh 命令详解

kafka-topics.sh 是 Kafka 集群中 Topic 管理的核心命令行工具,位于 Kafka 安装目录的 bin/ 下。

创建 Topic

kafka-topics.sh --bootstrap-server localhost:9092 \
  --create \
  --topic my-topic \
  --partitions 3 \
  --replication-factor 2

关键参数说明:

  • --partitions:分区数,影响并发消费能力
  • --replication-factor:副本数,不能超过 Broker 数量

查看 Topic 列表

kafka-topics.sh --bootstrap-server localhost:9092 --list

查看 Topic 详情

kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-topic

输出示例:

Topic: my-topic  PartitionCount: 3  ReplicationFactor: 2
  Partition: 0  Leader: 1  Replicas: 1,2  Isr: 1,2
  Partition: 1  Leader: 2  Replicas: 2,0  Isr: 2,0
  Partition: 2  Leader: 0  Replicas: 0,1  Isr: 0,1

修改 Topic

分区数只能增加,不能减少(减少分区会导致数据丢失):

kafka-topics.sh --bootstrap-server localhost:9092 \
  --alter \
  --topic my-topic \
  --partitions 6

删除 Topic

kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic my-topic

副本分配策略

Kafka 在创建 Topic 时会按照特定算法将分区副本分配到各个 Broker,核心原则如下:

三大原则

  1. 均匀分布:副本尽可能均匀地分布在所有可用的 Broker 上,避免热点
  2. 跨 Broker 冗余:同一分区的不同副本必须位于不同的 Broker,防止单点故障
  3. 机架感知:开启机架感知后,同一分区的副本会分散到不同机架,防止机架级别的故障

分配算法

以 3 个 Broker(ID: 0, 1, 2)、3 个分区、2 副本为例:

  1. 从 Broker 列表中随机选择起始位置,假设从 Broker 1 开始
  2. 第一个分区的 Leader 副本放在 Broker 1,Follower 副本放在 Broker 2
  3. 第二个分区的 Leader 副本放在 Broker 2,Follower 副本放在 Broker 0
  4. 第三个分区的 Leader 副本放在 Broker 0,Follower 副本放在 Broker 1

后续副本按质数步长递增分配,确保副本均匀分散。

手动指定副本分配

生产环境中可通过 JSON 文件手动指定副本分配方案:

kafka-topics.sh --bootstrap-server localhost:9092 \
  --create \
  --topic custom-topic \
  --replica-assignment 0:1,1:2,2:0

格式为 分区0的副本列表:分区1的副本列表,列表中第一个 Broker 为 Leader。

KafkaAdminClient Java API

Kafka 0.11+ 引入 KafkaAdminClient,完全基于 Java 实现,不再依赖 Scala,提供与命令行等价的管理能力。

初始化客户端

Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient adminClient = AdminClient.create(props);

创建 Topic

NewTopic newTopic = new NewTopic("api-topic", 3, (short) 2);
// 可选:设置 Topic 级别配置
newTopic.configs(Map.of(
    "retention.ms", "86400000",    // 保留 1 天
    "cleanup.policy", "delete"
));

CreateTopicsResult result = adminClient.createTopics(List.of(newTopic));
// 等待异步操作完成
result.all().get();
System.out.println("Topic 创建成功");

查询 Topic 详情

DescribeTopicsResult describeResult = adminClient.describeTopics(List.of("api-topic"));
Map<String, TopicDescription> topics = describeResult.allTopicNames().get();

topics.forEach((name, desc) -> {
    System.out.println("Topic: " + name);
    desc.partitions().forEach(partition -> {
        System.out.printf("  分区 %d: Leader=%s, 副本=%s, ISR=%s%n",
            partition.partition(),
            partition.leader().id(),
            partition.replicas(),
            partition.isr());
    });
});

增加分区数

Map<String, NewPartitions> newPartitions = Map.of(
    "api-topic", NewPartitions.increaseTo(6)
);
adminClient.createPartitions(newPartitions).all().get();

删除 Topic

adminClient.deleteTopics(List.of("api-topic")).all().get();

消费组管理

KafkaAdminClient 还支持消费组的查看和偏移量重置:

// 查看消费组列表
ListConsumerGroupsResult groups = adminClient.listConsumerGroups();
groups.all().get().forEach(g -> System.out.println(g.groupId()));

// 查看消费组详情(含 Lag)
DescribeConsumerGroupsResult desc = adminClient.describeConsumerGroups(List.of("my-group"));
desc.all().get().forEach((groupId, description) -> {
    description.members().forEach(member -> {
        System.out.printf("成员 %s 分配了 %d 个分区%n",
            member.clientId(), member.assignment().topicPartitions().size());
    });
});

偏移量管理:__consumer_offsets

Kafka 1.0.2+ 使用内置 Topic __consumer_offsets 存储消费组的偏移量,取代了早期依赖 ZooKeeper 的方式。

通过命令行查看消费组消费情况(含 Lag):

kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe \
  --group my-group

重置消费组偏移量(需先停止消费者):

kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group my-group \
  --topic my-topic \
  --reset-offsets \
  --to-earliest \
  --execute

合理的 Topic 管理是 Kafka 集群稳定运行的基础,掌握命令行工具和 Java API 两种方式,可以灵活应对运维和开发场景的不同需求。