This is article 62 in the Big Data series, systematically introducing Kafka Topic command line management, replica assignment strategy principles and Java API programming practices.
kafka-topics.sh Command Details
kafka-topics.sh is the core command line tool for Topic management in Kafka cluster, located in bin/ under Kafka installation directory.
Create Topic
kafka-topics.sh --bootstrap-server localhost:9092 \
--create \
--topic my-topic \
--partitions 3 \
--replication-factor 2
Key parameter descriptions:
--partitions: Partition count, affects concurrent consumption capability--replication-factor: Replica count, cannot exceed Broker count
List Topics
kafka-topics.sh --bootstrap-server localhost:9092 --list
Describe Topic
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-topic
Example output:
Topic: my-topic PartitionCount: 3 ReplicationFactor: 2
Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0
Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
Alter Topic
Partition count can only increase, cannot decrease (decreasing causes data loss):
kafka-topics.sh --bootstrap-server localhost:9092 \
--alter \
--topic my-topic \
--partitions 6
Delete Topic
kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic my-topic
Replica Assignment Strategy
Kafka assigns partition replicas to Brokers using specific algorithm when creating Topic, core principles:
Three Principles
- Even distribution: Replicas evenly distributed across all available Brokers, avoiding hot spots
- Cross-Broker redundancy: Different replicas of same partition must be on different Brokers, preventing single point of failure
- Rack awareness: When enabled, replicas of same partition spread across different racks, preventing rack-level failures
Assignment Algorithm
Example with 3 Brokers (ID: 0, 1, 2), 3 partitions, 2 replicas:
- Random starting position from Broker list, assume start from Broker 1
- Partition 1 Leader replica on Broker 1, Follower replica on Broker 2
- Partition 2 Leader replica on Broker 2, Follower replica on Broker 0
- Partition 3 Leader replica on Broker 0, Follower replica on Broker 1
Subsequent replicas assigned with prime step increment, ensuring even distribution.
Manual Replica Assignment
In production, can manually specify replica assignment via JSON file:
kafka-topics.sh --bootstrap-server localhost:9092 \
--create \
--topic custom-topic \
--replica-assignment 0:1,1:2,2:0
Format is partition0 replica list:partition1 replica list, first Broker in list is Leader.
KafkaAdminClient Java API
Kafka 0.11+ introduces KafkaAdminClient, fully Java-based, no longer depends on Scala, provides management capability equivalent to command line.
Initialize Client
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient adminClient = AdminClient.create(props);
Create Topic
NewTopic newTopic = new NewTopic("api-topic", 3, (short) 2);
// Optional: set Topic-level config
newTopic.configs(Map.of(
"retention.ms", "86400000", // Retain for 1 day
"cleanup.policy", "delete"
));
CreateTopicsResult result = adminClient.createTopics(List.of(newTopic));
// Wait for async operation to complete
result.all().get();
System.out.println("Topic created successfully");
Describe Topic
DescribeTopicsResult describeResult = adminClient.describeTopics(List.of("api-topic"));
Map<String, TopicDescription> topics = describeResult.allTopicNames().get();
topics.forEach((name, desc) -> {
System.out.println("Topic: " + name);
desc.partitions().forEach(partition -> {
System.out.printf(" Partition %d: Leader=%s, Replicas=%s, ISR=%s%n",
partition.partition(),
partition.leader().id(),
partition.replicas(),
partition.isr());
});
});
Increase Partition Count
Map<String, NewPartitions> newPartitions = Map.of(
"api-topic", NewPartitions.increaseTo(6)
);
adminClient.createPartitions(newPartitions).all().get();
Delete Topic
adminClient.deleteTopics(List.of("api-topic")).all().get();
Consumer Group Management
KafkaAdminClient also supports viewing consumer groups and offset reset:
// View consumer group list
ListConsumerGroupsResult groups = adminClient.listConsumerGroups();
groups.all().get().forEach(g -> System.out.println(g.groupId()));
// View consumer group details (including Lag)
DescribeConsumerGroupsResult desc = adminClient.describeConsumerGroups(List.of("my-group"));
desc.all().get().forEach((groupId, description) -> {
description.members().forEach(member -> {
System.out.printf("Member %s assigned %d partitions%n",
member.clientId(), member.assignment().topicPartitions().size());
});
});
Offset Management: __consumer_offsets
Kafka 1.0.2+ uses internal Topic __consumer_offsets to store consumer group offsets, replacing early approach relying on ZooKeeper.
View consumer group consumption (including Lag) via command line:
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe \
--group my-group
Reset consumer group offset (must stop Consumer first):
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group my-group \
--topic my-topic \
--reset-offsets \
--to-earliest \
--execute
Proper Topic management is foundation for stable Kafka cluster operation. Mastering both command line tools and Java API provides flexibility to meet different operational and development needs.