本文是大数据系列第 55 篇,介绍 Kafka 的日常运维操作:守护进程启动、Shell 主题管理命令,以及 Java 客户端 Producer/Consumer 完整示例。
Kafka 服务启动
守护进程模式
生产环境必须以守护进程方式启动,防止 SSH 断连后服务终止:
kafka-server-start.sh -daemon /opt/servers/kafka_2.12-2.7.2/config/server.properties
验证服务状态
# 查看 Kafka 进程
jps
# 输出包含 Kafka 表示启动成功
# 通过系统进程确认
ps aux | grep kafka
停止服务
kafka-server-stop.sh
Shell 主题管理
Kafka 提供 kafka-topics.sh 脚本进行主题的增删改查:
查看所有主题
kafka-topics.sh --list --zookeeper h121.wzk.icu:2181
创建主题
kafka-topics.sh --zookeeper h121.wzk.icu:2181 \
--create \
--topic wzk_topic_1 \
--partitions 3 \
--replication-factor 2
参数说明:
| 参数 | 说明 |
|---|---|
--partitions | 分区数,决定并发消费能力 |
--replication-factor | 副本数,不能超过 Broker 总数 |
查看主题详情
kafka-topics.sh --zookeeper h121.wzk.icu:2181 \
--describe \
--topic wzk_topic_1
输出示例:
Topic: wzk_topic_1 PartitionCount: 3 ReplicationFactor: 2 Configs:
Topic: wzk_topic_1 Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: wzk_topic_1 Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: wzk_topic_1 Partition: 2 Leader: 3 Replicas: 3,1 Isr: 3,1
修改主题
# 增加分区数(只能增加,不能减少)
kafka-topics.sh --zookeeper h121.wzk.icu:2181 \
--alter \
--topic wzk_topic_1 \
--partitions 6
删除主题
kafka-topics.sh --zookeeper h121.wzk.icu:2181 \
--delete \
--topic wzk_topic_1
注意:删除需要
server.properties中配置delete.topic.enable=true(默认为 true)。
Shell 消息收发
控制台 Producer
kafka-console-producer.sh \
--topic wzk_topic_test \
--broker-list h121.wzk.icu:9092
启动后每行输入即为一条消息,Ctrl+C 退出。
控制台 Consumer
# 从最新消息开始消费
kafka-console-consumer.sh \
--bootstrap-server h121.wzk.icu:9092 \
--topic wzk_topic_test
# 从头开始消费(重放所有历史消息)
kafka-console-consumer.sh \
--bootstrap-server h121.wzk.icu:9092 \
--topic wzk_topic_test \
--from-beginning
Java 客户端开发
Maven 依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.2</version>
</dependency>
关键配置参数速查
| 参数 | 说明 | 示例值 |
|---|---|---|
bootstrap.servers | Broker 地址列表 | h121.wzk.icu:9092 |
key.serializer | Key 序列化器 | IntegerSerializer |
value.serializer | Value 序列化器 | StringSerializer |
acks | 生产者可靠性级别 | 0 / 1 / all |
key.deserializer | Key 反序列化器 | IntegerDeserializer |
value.deserializer | Value 反序列化器 | StringDeserializer |
group.id | 消费者组 ID | wzk-test |
enable.auto.commit | 自动提交 Offset | true / false |
auto.offset.reset | Offset 重置策略 | earliest / latest |
Producer 完整示例
import org.apache.kafka.clients.producer.*;
import java.util.*;
import java.util.concurrent.*;
public class TestProducer01 {
public static void main(String[] args) throws Exception {
// 配置参数
Map<String, Object> configs = new HashMap<>();
configs.put("bootstrap.servers", "h121.wzk.icu:9092");
configs.put("key.serializer",
"org.apache.kafka.common.serialization.IntegerSerializer");
configs.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
configs.put("acks", "1");
KafkaProducer<Integer, String> producer = new KafkaProducer<>(configs);
// 构造消息:(topic, partition, key, value)
// partition=0 表示指定写入 Partition 0;不指定则按分区策略路由
ProducerRecord<Integer, String> record =
new ProducerRecord<>("wzk_topic_test", 0, 0, "hello world by java!");
// 同步等待发送结果(超时 3 秒)
Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get(3_000, TimeUnit.MILLISECONDS);
System.out.println("Sent to partition " + metadata.partition()
+ " at offset " + metadata.offset());
producer.close();
}
}
异步发送(推荐生产使用):
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// 记录错误日志,按需重试
exception.printStackTrace();
} else {
System.out.println("Partition: " + metadata.partition()
+ ", Offset: " + metadata.offset());
}
});
producer.flush(); // 确保缓冲区消息全部发出
producer.close();
Consumer 完整示例
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
public class TestConsumer01 {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put("bootstrap.servers", "h121.wzk.icu:9092");
configs.put("key.deserializer",
"org.apache.kafka.common.serialization.IntegerDeserializer");
configs.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
configs.put("group.id", "wzk-test");
configs.put("enable.auto.commit", "true");
configs.put("auto.offset.reset", "latest");
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(configs);
// 订阅 Topic,并注册 Rebalance 监听器
consumer.subscribe(
Collections.singletonList("wzk_topic_test"),
new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// Rebalance 前:可在此提交当前 Offset,避免重复消费
partitions.forEach(p ->
System.out.println("Partition revoked: " + p.partition()));
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Rebalance 后:新分配的 Partition,可在此重置消费位置
partitions.forEach(p ->
System.out.println("Partition assigned: " + p.partition()));
}
}
);
// 拉取消息(超时 3 秒)
ConsumerRecords<Integer, String> records =
consumer.poll(java.time.Duration.ofMillis(3_000));
records.forEach(record -> {
System.out.println("Key: " + record.key());
System.out.println("Value: " + record.value());
System.out.println("Partition: " + record.partition());
System.out.println("Offset: " + record.offset());
System.out.println("---");
});
consumer.close();
}
}
持续消费循环模式
生产环境 Consumer 通常以无限循环方式运行:
try {
while (true) {
ConsumerRecords<Integer, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<Integer, String> record : records) {
// 处理业务逻辑
processRecord(record);
}
// 手动提交 Offset(enable.auto.commit=false 时使用)
consumer.commitSync();
}
} finally {
consumer.close();
}
常见问题排查
| 现象 | 可能原因 | 排查方向 |
|---|---|---|
| Consumer 不消费新消息 | auto.offset.reset=latest 且已有历史 Offset | 检查 __consumer_offsets 中的提交记录 |
| Rebalance 频繁触发 | 消费耗时过长超过 max.poll.interval.ms | 增大超时或减小 max.poll.records |
| 消息重复消费 | 自动提交 + 消费异常 | 改为手动提交,消费成功后再 commitSync |
| 分区 Leader 不均衡 | Broker 重启后未触发优先副本选举 | 执行 kafka-preferred-replica-election.sh |
小结
- 生产环境始终使用
-daemon守护进程启动 Kafka kafka-topics.sh是日常运维的核心工具,创建主题时合理规划分区数和副本数- Java 客户端通过
KafkaProducer和KafkaConsumer实现消息收发,acks和enable.auto.commit是影响可靠性的关键参数 ConsumerRebalanceListener用于在分区重分配时执行 Offset 提交等清理操作,保障消息处理的幂等性