This is article 62 in the Big Data series, systematically introducing Kafka Topic command line management, replica assignment strategy principles and Java API programming practices.

kafka-topics.sh Command Details

kafka-topics.sh is the core command line tool for Topic management in Kafka cluster, located in bin/ under Kafka installation directory.

Create Topic

kafka-topics.sh --bootstrap-server localhost:9092 \
  --create \
  --topic my-topic \
  --partitions 3 \
  --replication-factor 2

Key parameter descriptions:

  • --partitions: Partition count, affects concurrent consumption capability
  • --replication-factor: Replica count, cannot exceed Broker count

List Topics

kafka-topics.sh --bootstrap-server localhost:9092 --list

Describe Topic

kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic my-topic

Example output:

Topic: my-topic  PartitionCount: 3  ReplicationFactor: 2
  Partition: 0  Leader: 1  Replicas: 1,2  Isr: 1,2
  Partition: 1  Leader: 2  Replicas: 2,0  Isr: 2,0
  Partition: 2  Leader: 0  Replicas: 0,1  Isr: 0,1

Alter Topic

Partition count can only increase, cannot decrease (decreasing causes data loss):

kafka-topics.sh --bootstrap-server localhost:9092 \
  --alter \
  --topic my-topic \
  --partitions 6

Delete Topic

kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic my-topic

Replica Assignment Strategy

Kafka assigns partition replicas to Brokers using specific algorithm when creating Topic, core principles:

Three Principles

  1. Even distribution: Replicas evenly distributed across all available Brokers, avoiding hot spots
  2. Cross-Broker redundancy: Different replicas of same partition must be on different Brokers, preventing single point of failure
  3. Rack awareness: When enabled, replicas of same partition spread across different racks, preventing rack-level failures

Assignment Algorithm

Example with 3 Brokers (ID: 0, 1, 2), 3 partitions, 2 replicas:

  1. Random starting position from Broker list, assume start from Broker 1
  2. Partition 1 Leader replica on Broker 1, Follower replica on Broker 2
  3. Partition 2 Leader replica on Broker 2, Follower replica on Broker 0
  4. Partition 3 Leader replica on Broker 0, Follower replica on Broker 1

Subsequent replicas assigned with prime step increment, ensuring even distribution.

Manual Replica Assignment

In production, can manually specify replica assignment via JSON file:

kafka-topics.sh --bootstrap-server localhost:9092 \
  --create \
  --topic custom-topic \
  --replica-assignment 0:1,1:2,2:0

Format is partition0 replica list:partition1 replica list, first Broker in list is Leader.

KafkaAdminClient Java API

Kafka 0.11+ introduces KafkaAdminClient, fully Java-based, no longer depends on Scala, provides management capability equivalent to command line.

Initialize Client

Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
AdminClient adminClient = AdminClient.create(props);

Create Topic

NewTopic newTopic = new NewTopic("api-topic", 3, (short) 2);
// Optional: set Topic-level config
newTopic.configs(Map.of(
    "retention.ms", "86400000",    // Retain for 1 day
    "cleanup.policy", "delete"
));

CreateTopicsResult result = adminClient.createTopics(List.of(newTopic));
// Wait for async operation to complete
result.all().get();
System.out.println("Topic created successfully");

Describe Topic

DescribeTopicsResult describeResult = adminClient.describeTopics(List.of("api-topic"));
Map<String, TopicDescription> topics = describeResult.allTopicNames().get();

topics.forEach((name, desc) -> {
    System.out.println("Topic: " + name);
    desc.partitions().forEach(partition -> {
        System.out.printf("  Partition %d: Leader=%s, Replicas=%s, ISR=%s%n",
            partition.partition(),
            partition.leader().id(),
            partition.replicas(),
            partition.isr());
    });
});

Increase Partition Count

Map<String, NewPartitions> newPartitions = Map.of(
    "api-topic", NewPartitions.increaseTo(6)
);
adminClient.createPartitions(newPartitions).all().get();

Delete Topic

adminClient.deleteTopics(List.of("api-topic")).all().get();

Consumer Group Management

KafkaAdminClient also supports viewing consumer groups and offset reset:

// View consumer group list
ListConsumerGroupsResult groups = adminClient.listConsumerGroups();
groups.all().get().forEach(g -> System.out.println(g.groupId()));

// View consumer group details (including Lag)
DescribeConsumerGroupsResult desc = adminClient.describeConsumerGroups(List.of("my-group"));
desc.all().get().forEach((groupId, description) -> {
    description.members().forEach(member -> {
        System.out.printf("Member %s assigned %d partitions%n",
            member.clientId(), member.assignment().topicPartitions().size());
    });
});

Offset Management: __consumer_offsets

Kafka 1.0.2+ uses internal Topic __consumer_offsets to store consumer group offsets, replacing early approach relying on ZooKeeper.

View consumer group consumption (including Lag) via command line:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe \
  --group my-group

Reset consumer group offset (must stop Consumer first):

kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group my-group \
  --topic my-topic \
  --reset-offsets \
  --to-earliest \
  --execute

Proper Topic management is foundation for stable Kafka cluster operation. Mastering both command line tools and Java API provides flexibility to meet different operational and development needs.