本文是大数据系列第 62 篇,系统介绍 Kafka Topic 的命令行管理、副本分配策略原理与 Java API 编程实践。
kafka-topics.sh 命令详解
kafka-topics.sh 是 Kafka 集群中 Topic 管理的核心命令行工具,位于 Kafka 安装目录的 bin/ 下。
创建 Topic
kafka-topics.sh --bootstrap-server localhost:9092 \
--create \
--topic my-topic \
--partitions 3 \
--replication-factor 2
关键参数说明:
--partitions:分区数,影响并发消费能力--replication-factor:副本数,不能超过 Broker 数量
查看 Topic 列表
kafka-topics.sh --bootstrap-server localhost:9092 --list
查看 Topic 详情
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-topic
输出示例:
Topic: my-topic PartitionCount: 3 ReplicationFactor: 2
Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0
Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
修改 Topic
分区数只能增加,不能减少(减少分区会导致数据丢失):
kafka-topics.sh --bootstrap-server localhost:9092 \
--alter \
--topic my-topic \
--partitions 6
删除 Topic
kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic my-topic
副本分配策略
Kafka 在创建 Topic 时会按照特定算法将分区副本分配到各个 Broker,核心原则如下:
三大原则
- 均匀分布:副本尽可能均匀地分布在所有可用的 Broker 上,避免热点
- 跨 Broker 冗余:同一分区的不同副本必须位于不同的 Broker,防止单点故障
- 机架感知:开启机架感知后,同一分区的副本会分散到不同机架,防止机架级别的故障
分配算法
以 3 个 Broker(ID: 0, 1, 2)、3 个分区、2 副本为例:
- 从 Broker 列表中随机选择起始位置,假设从 Broker 1 开始
- 第一个分区的 Leader 副本放在 Broker 1,Follower 副本放在 Broker 2
- 第二个分区的 Leader 副本放在 Broker 2,Follower 副本放在 Broker 0
- 第三个分区的 Leader 副本放在 Broker 0,Follower 副本放在 Broker 1
后续副本按质数步长递增分配,确保副本均匀分散。
手动指定副本分配
生产环境中可通过 JSON 文件手动指定副本分配方案:
kafka-topics.sh --bootstrap-server localhost:9092 \
--create \
--topic custom-topic \
--replica-assignment 0:1,1:2,2:0
格式为 分区0的副本列表:分区1的副本列表,列表中第一个 Broker 为 Leader。
KafkaAdminClient Java API
Kafka 0.11+ 引入 KafkaAdminClient,完全基于 Java 实现,不再依赖 Scala,提供与命令行等价的管理能力。
初始化客户端
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient adminClient = AdminClient.create(props);
创建 Topic
NewTopic newTopic = new NewTopic("api-topic", 3, (short) 2);
// 可选:设置 Topic 级别配置
newTopic.configs(Map.of(
"retention.ms", "86400000", // 保留 1 天
"cleanup.policy", "delete"
));
CreateTopicsResult result = adminClient.createTopics(List.of(newTopic));
// 等待异步操作完成
result.all().get();
System.out.println("Topic 创建成功");
查询 Topic 详情
DescribeTopicsResult describeResult = adminClient.describeTopics(List.of("api-topic"));
Map<String, TopicDescription> topics = describeResult.allTopicNames().get();
topics.forEach((name, desc) -> {
System.out.println("Topic: " + name);
desc.partitions().forEach(partition -> {
System.out.printf(" 分区 %d: Leader=%s, 副本=%s, ISR=%s%n",
partition.partition(),
partition.leader().id(),
partition.replicas(),
partition.isr());
});
});
增加分区数
Map<String, NewPartitions> newPartitions = Map.of(
"api-topic", NewPartitions.increaseTo(6)
);
adminClient.createPartitions(newPartitions).all().get();
删除 Topic
adminClient.deleteTopics(List.of("api-topic")).all().get();
消费组管理
KafkaAdminClient 还支持消费组的查看和偏移量重置:
// 查看消费组列表
ListConsumerGroupsResult groups = adminClient.listConsumerGroups();
groups.all().get().forEach(g -> System.out.println(g.groupId()));
// 查看消费组详情(含 Lag)
DescribeConsumerGroupsResult desc = adminClient.describeConsumerGroups(List.of("my-group"));
desc.all().get().forEach((groupId, description) -> {
description.members().forEach(member -> {
System.out.printf("成员 %s 分配了 %d 个分区%n",
member.clientId(), member.assignment().topicPartitions().size());
});
});
偏移量管理:__consumer_offsets
Kafka 1.0.2+ 使用内置 Topic __consumer_offsets 存储消费组的偏移量,取代了早期依赖 ZooKeeper 的方式。
通过命令行查看消费组消费情况(含 Lag):
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe \
--group my-group
重置消费组偏移量(需先停止消费者):
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-group \
--topic my-topic \
--reset-offsets \
--to-earliest \
--execute
合理的 Topic 管理是 Kafka 集群稳定运行的基础,掌握命令行工具和 Java API 两种方式,可以灵活应对运维和开发场景的不同需求。