This is article 55 in the Big Data series, introducing Kafka daily operations: daemon startup, Shell topic management commands, and complete Java client Producer/Consumer examples.
Kafka Service Startup
Daemon Mode
Production must use daemon mode to prevent service termination after SSH disconnect:
kafka-server-start.sh -daemon /opt/servers/kafka_2.12-2.7.2/config/server.properties
Verify Service Status
# Check Kafka process
jps
# Output containing Kafka indicates successful start
# Confirm via system process
ps aux | grep kafka
Stop Service
kafka-server-stop.sh
Shell Topic Management
Kafka provides kafka-topics.sh script for Topic CRUD operations:
List All Topics
kafka-topics.sh --list --zookeeper h121.wzk.icu:2181
Create Topic
kafka-topics.sh --zookeeper h121.wzk.icu:2181 \
--create \
--topic wzk_topic_1 \
--partitions 3 \
--replication-factor 2
Parameter description:
| Parameter | Description |
|---|---|
--partitions | Partition count, determines concurrent consumption capability |
--replication-factor | Replica count, cannot exceed total Brokers |
Describe Topic
kafka-topics.sh --zookeeper h121.wzk.icu:2181 \
--describe \
--topic wzk_topic_1
Example output:
Topic: wzk_topic_1 PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: wzk_topic_1 Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: wzk_topic_1 Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: wzk_topic_1 Partition: 2 Leader: 3 Replicas: 3,1 Isr: 3,1
Alter Topic
# Increase partition count (can only increase, cannot decrease)
kafka-topics.sh --zookeeper h121.wzk.icu:2181 \
--alter \
--topic wzk_topic_1 \
--partitions 6
Delete Topic
kafka-topics.sh --zookeeper h121.wzk.icu:2181 \
--delete \
--topic wzk_topic_1
Note: Deletion requires
delete.topic.enable=trueinserver.properties(default is true).
Shell Message Production/Consumption
Console Producer
kafka-console-producer.sh \
--topic wzk_topic_test \
--broker-list h121.wzk.icu:9092
After startup, each line input is a message, Ctrl+C to exit.
Console Consumer
# Consume from latest messages
kafka-console-consumer.sh \
--bootstrap-server h121.wzk.icu:9092 \
--topic wzk_topic_test
# Consume from beginning (replay all historical messages)
kafka-console-consumer.sh \
--bootstrap-server h121.wzk.icu:9092 \
--topic wzk_topic_test \
--from-beginning
Java Client Development
Maven Dependency
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.2</version>
</dependency>
Key Configuration Parameters
| Parameter | Description | Example Value |
|---|---|---|
bootstrap.servers | Broker address list | h121.wzk.icu:9092 |
key.serializer | Key serializer | IntegerSerializer |
value.serializer | Value serializer | StringSerializer |
acks | Producer reliability level | 0 / 1 / all |
key.deserializer | Key deserializer | IntegerDeserializer |
value.deserializer | Value deserializer | StringDeserializer |
group.id | Consumer group ID | wzk-test |
enable.auto.commit | Auto commit Offset | true / false |
auto.offset.reset | Offset reset strategy | earliest / latest |
Complete Producer Example
import org.apache.kafka.clients.producer.*;
import java.util.*;
import java.util.concurrent.*;
public class TestProducer01 {
public static void main(String[] args) throws Exception {
// Configuration
Map<String, Object> configs = new HashMap<>();
configs.put("bootstrap.servers", "h121.wzk.icu:9092");
configs.put("key.serializer",
"org.apache.kafka.common.serialization.IntegerSerializer");
configs.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
configs.put("acks", "1");
KafkaProducer<Integer, String> producer = new KafkaProducer<>(configs);
// Construct message: (topic, partition, key, value)
// partition=0 means specifically write to Partition 0; not specified uses partition strategy
ProducerRecord<Integer, String> record =
new ProducerRecord<>("wzk_topic_test", 0, 0, "hello world by java!");
// Synchronous wait for send result (timeout 3 seconds)
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get(3_000, TimeUnit.MILLISECONDS);
System.out.println("Sent to partition " + metadata.partition()
+ " at offset " + metadata.offset());
producer.close();
}
}
Async Send (Recommended for production):
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// Log error, retry as needed
exception.printStackTrace();
} else {
System.out.println("Partition: " + metadata.partition()
+ ", Offset: " + metadata.offset());
}
});
producer.flush(); // Ensure all buffered messages are sent
producer.close();
Complete Consumer Example
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
public class TestConsumer01 {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put("bootstrap.servers", "h121.wzk.icu:9092");
configs.put("key.deserializer",
"org.apache.kafka.common.serialization.IntegerDeserializer");
configs.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
configs.put("group.id", "wzk-test");
configs.put("enable.auto.commit", "true");
configs.put("auto.offset.reset", "latest");
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(configs);
// Subscribe to Topic, register Rebalance listener
consumer.subscribe(
Collections.singletonList("wzk_topic_test"),
new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Before Rebalance: can commit current Offset here to avoid duplicate consumption
partitions.forEach(p ->
System.out.println("Partition revoked: " + p.partition()));
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// After Rebalance: newly assigned Partitions, can reset consumption position here
partitions.forEach(p ->
System.out.println("Partition assigned: " + p.partition()));
}
}
);
// Poll messages (timeout 3 seconds)
ConsumerRecords<Integer, String> records =
consumer.poll(java.time.Duration.ofMillis(3_000));
records.forEach(record -> {
System.out.println("Key: " + record.key());
System.out.println("Value: " + record.value());
System.out.println("Partition: " + record.partition());
System.out.println("Offset: " + record.offset());
System.out.println("---");
});
consumer.close();
}
}
Continuous Consumption Loop Pattern
Production Consumers typically run in infinite loop:
try {
while (true) {
ConsumerRecords<Integer, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<Integer, String> record : records) {
// Process business logic
processRecord(record);
}
// Manual commit Offset (when enable.auto.commit=false)
consumer.commitSync();
}
} finally {
consumer.close();
}
Common Issues Troubleshooting
| Symptom | Possible Cause | Investigation Direction |
|---|---|---|
| Consumer not consuming new messages | auto.offset.reset=latest and existing historical Offset | Check submitted records in __consumer_offsets |
| Frequent Rebalance | Consumption time exceeds max.poll.interval.ms | Increase timeout or decrease max.poll.records |
| Duplicate message consumption | Auto commit + consumption exception | Switch to manual commit, commitSync after successful consumption |
| Unbalanced Partition Leaders | Broker restart didn’t trigger preferred replica election | Execute kafka-preferred-replica-election.sh |
Summary
- Production always use
-daemondaemon mode to start Kafka kafka-topics.shis core tool for daily operations, plan partition and replica counts reasonably when creating topics- Java client implements message send/receive via
KafkaProducerandKafkaConsumer,acksandenable.auto.commitare key parameters affecting reliability ConsumerRebalanceListenerused for Offset submission and cleanup during partition reassignment, ensures idempotent message processing