This is article 55 in the Big Data series, introducing Kafka daily operations: daemon startup, Shell topic management commands, and complete Java client Producer/Consumer examples.

Kafka Service Startup

Daemon Mode

Production must use daemon mode to prevent service termination after SSH disconnect:

kafka-server-start.sh -daemon /opt/servers/kafka_2.12-2.7.2/config/server.properties

Verify Service Status

# Check Kafka process
jps
# Output containing Kafka indicates successful start

# Confirm via system process
ps aux | grep kafka

Stop Service

kafka-server-stop.sh

Shell Topic Management

Kafka provides kafka-topics.sh script for Topic CRUD operations:

List All Topics

kafka-topics.sh --list --zookeeper h121.wzk.icu:2181

Create Topic

kafka-topics.sh --zookeeper h121.wzk.icu:2181 \
  --create \
  --topic wzk_topic_1 \
  --partitions 3 \
  --replication-factor 2

Parameter description:

ParameterDescription
--partitionsPartition count, determines concurrent consumption capability
--replication-factorReplica count, cannot exceed total Brokers

Describe Topic

kafka-topics.sh --zookeeper h121.wzk.icu:2181 \
  --describe \
  --topic wzk_topic_1

Example output:

Topic: wzk_topic_1  PartitionCount: 3  ReplicationFactor: 2  Configs:
  Topic: wzk_topic_1  Partition: 0  Leader: 1  Replicas: 1,2  Isr: 1,2
  Topic: wzk_topic_1  Partition: 1  Leader: 2  Replicas: 2,3  Isr: 2,3
  Topic: wzk_topic_1  Partition: 2  Leader: 3  Replicas: 3,1  Isr: 3,1

Alter Topic

# Increase partition count (can only increase, cannot decrease)
kafka-topics.sh --zookeeper h121.wzk.icu:2181 \
  --alter \
  --topic wzk_topic_1 \
  --partitions 6

Delete Topic

kafka-topics.sh --zookeeper h121.wzk.icu:2181 \
  --delete \
  --topic wzk_topic_1

Note: Deletion requires delete.topic.enable=true in server.properties (default is true).

Shell Message Production/Consumption

Console Producer

kafka-console-producer.sh \
  --topic wzk_topic_test \
  --broker-list h121.wzk.icu:9092

After startup, each line input is a message, Ctrl+C to exit.

Console Consumer

# Consume from latest messages
kafka-console-consumer.sh \
  --bootstrap-server h121.wzk.icu:9092 \
  --topic wzk_topic_test

# Consume from beginning (replay all historical messages)
kafka-console-consumer.sh \
  --bootstrap-server h121.wzk.icu:9092 \
  --topic wzk_topic_test \
  --from-beginning

Java Client Development

Maven Dependency

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.7.2</version>
</dependency>

Key Configuration Parameters

ParameterDescriptionExample Value
bootstrap.serversBroker address listh121.wzk.icu:9092
key.serializerKey serializerIntegerSerializer
value.serializerValue serializerStringSerializer
acksProducer reliability level0 / 1 / all
key.deserializerKey deserializerIntegerDeserializer
value.deserializerValue deserializerStringDeserializer
group.idConsumer group IDwzk-test
enable.auto.commitAuto commit Offsettrue / false
auto.offset.resetOffset reset strategyearliest / latest

Complete Producer Example

import org.apache.kafka.clients.producer.*;
import java.util.*;
import java.util.concurrent.*;

public class TestProducer01 {
    public static void main(String[] args) throws Exception {
        // Configuration
        Map<String, Object> configs = new HashMap<>();
        configs.put("bootstrap.servers", "h121.wzk.icu:9092");
        configs.put("key.serializer",
            "org.apache.kafka.common.serialization.IntegerSerializer");
        configs.put("value.serializer",
            "org.apache.kafka.common.serialization.StringSerializer");
        configs.put("acks", "1");

        KafkaProducer<Integer, String> producer = new KafkaProducer<>(configs);

        // Construct message: (topic, partition, key, value)
        // partition=0 means specifically write to Partition 0; not specified uses partition strategy
        ProducerRecord<Integer, String> record =
            new ProducerRecord<>("wzk_topic_test", 0, 0, "hello world by java!");

        // Synchronous wait for send result (timeout 3 seconds)
        Future<RecordMetadata> future = producer.send(record);
        RecordMetadata metadata = future.get(3_000, TimeUnit.MILLISECONDS);
        System.out.println("Sent to partition " + metadata.partition()
            + " at offset " + metadata.offset());

        producer.close();
    }
}

Async Send (Recommended for production):

producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        // Log error, retry as needed
        exception.printStackTrace();
    } else {
        System.out.println("Partition: " + metadata.partition()
            + ", Offset: " + metadata.offset());
    }
});
producer.flush(); // Ensure all buffered messages are sent
producer.close();

Complete Consumer Example

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;

public class TestConsumer01 {
    public static void main(String[] args) {
        Map<String, Object> configs = new HashMap<>();
        configs.put("bootstrap.servers", "h121.wzk.icu:9092");
        configs.put("key.deserializer",
            "org.apache.kafka.common.serialization.IntegerDeserializer");
        configs.put("value.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");
        configs.put("group.id", "wzk-test");
        configs.put("enable.auto.commit", "true");
        configs.put("auto.offset.reset", "latest");

        KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(configs);

        // Subscribe to Topic, register Rebalance listener
        consumer.subscribe(
            Collections.singletonList("wzk_topic_test"),
            new ConsumerRebalanceListener() {
                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    // Before Rebalance: can commit current Offset here to avoid duplicate consumption
                    partitions.forEach(p ->
                        System.out.println("Partition revoked: " + p.partition()));
                }

                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    // After Rebalance: newly assigned Partitions, can reset consumption position here
                    partitions.forEach(p ->
                        System.out.println("Partition assigned: " + p.partition()));
                }
            }
        );

        // Poll messages (timeout 3 seconds)
        ConsumerRecords<Integer, String> records =
            consumer.poll(java.time.Duration.ofMillis(3_000));

        records.forEach(record -> {
            System.out.println("Key:       " + record.key());
            System.out.println("Value:     " + record.value());
            System.out.println("Partition: " + record.partition());
            System.out.println("Offset:    " + record.offset());
            System.out.println("---");
        });

        consumer.close();
    }
}

Continuous Consumption Loop Pattern

Production Consumers typically run in infinite loop:

try {
    while (true) {
        ConsumerRecords<Integer, String> records =
            consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<Integer, String> record : records) {
            // Process business logic
            processRecord(record);
        }
        // Manual commit Offset (when enable.auto.commit=false)
        consumer.commitSync();
    }
} finally {
    consumer.close();
}

Common Issues Troubleshooting

SymptomPossible CauseInvestigation Direction
Consumer not consuming new messagesauto.offset.reset=latest and existing historical OffsetCheck submitted records in __consumer_offsets
Frequent RebalanceConsumption time exceeds max.poll.interval.msIncrease timeout or decrease max.poll.records
Duplicate message consumptionAuto commit + consumption exceptionSwitch to manual commit, commitSync after successful consumption
Unbalanced Partition LeadersBroker restart didn’t trigger preferred replica electionExecute kafka-preferred-replica-election.sh

Summary

  • Production always use -daemon daemon mode to start Kafka
  • kafka-topics.sh is core tool for daily operations, plan partition and replica counts reasonably when creating topics
  • Java client implements message send/receive via KafkaProducer and KafkaConsumer, acks and enable.auto.commit are key parameters affecting reliability
  • ConsumerRebalanceListener used for Offset submission and cleanup during partition reassignment, ensures idempotent message processing