本文是大数据系列第 61 篇,深入解析 Kafka 主题、分区与消费者机制,并重点介绍重平衡优化策略。
Topic(主题)
Topic 是 Kafka 中消息的逻辑分类单元,生产者向特定 Topic 发布消息,消费者从 Topic 订阅消息。创建 Topic 时需关注以下核心参数:
- replication-factor:副本数,决定数据冗余备份量,生产环境建议设为 3
- retention.ms:消息保留时长,默认 7 天(604800000ms)
- cleanup.policy:清理策略,
delete(按时间/大小删除)或compact(日志压缩,保留最新值)
Partition(分区)
分区是 Kafka 物理存储的基本单位,也是实现水平扩展和高吞吐的关键:
- 每个 Topic 的数据分布在多个 Partition 上,分区之间相互独立
- 分区分散在不同的 Broker 上,实现负载均衡
- 分区数量建议设置为 Broker 数量的整数倍,充分利用集群资源
- 消费者并发度受分区数限制,同一 Consumer Group 中每个分区只能由一个消费者处理
Consumer Group(消费组)
消费组是 Kafka 实现不同消费模式的核心设计:
- 单播模式:同一消息只被某个消费组中的一个消费者处理,适合任务分发场景
- 广播模式:多个消费组订阅同一 Topic,每个组都能收到完整消息,适合事件通知场景
- 消费者数量超过分区数时,多余的消费者处于空闲状态
Consumer(消费者)
Kafka 消费者采用 PULL 模式 主动拉取消息,而非 Broker 推送。这种设计的优势在于消费者可以按自身处理能力控制拉取速率,避免被压垮。
自定义反序列化
Broker 存储的消息是字节数组,消费者必须执行反序列化才能处理。实现 Deserializer<T> 接口即可自定义:
public class UserDeserializer implements Deserializer<User> {
@Override
public User deserialize(String topic, byte[] data) {
ByteBuffer buffer = ByteBuffer.wrap(data);
int userId = buffer.getInt();
int usernameLen = buffer.getInt();
byte[] usernameBytes = new byte[usernameLen];
buffer.get(usernameBytes);
String username = new String(usernameBytes, StandardCharsets.UTF_8);
int age = buffer.getInt();
User user = new User();
user.setUserId(userId);
user.setUsername(username);
user.setAge(age);
return user;
}
}
使用时在消费者配置中指定:
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class.getName());
消费者拦截器
拦截器提供可插拔的消息拦截能力,常用于日志、监控和消息过滤:
public class ConsumerInterceptor01 implements ConsumerInterceptor<String, String> {
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
System.out.println("=== 拦截器 onConsume,消息数:" + records.count() + " ===");
// 可过滤或修改消息
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
System.out.println("=== 偏移量提交成功 ===");
}
}
偏移量管理
偏移量(Offset)记录消费者在每个分区中的消费位置,是保证消息不丢失、不重复的关键:
| 提交方式 | 配置 | 特点 |
|---|---|---|
| 自动提交 | enable.auto.commit=true | 简单,但可能丢消息或重复消费 |
| 手动同步提交 | commitSync() | 可靠,阻塞直到提交成功 |
| 手动异步提交 | commitAsync() | 非阻塞,性能好,需处理失败回调 |
生产场景推荐手动提交,在业务处理完成后再提交偏移量,确保 at-least-once 语义。
重平衡(Rebalance)优化
触发条件
以下三种情况会触发重平衡,导致消费暂停:
- 消费组成员变化(新消费者加入或已有消费者离开/崩溃)
- Topic 分区数量发生变化
- 消费者订阅的 Topic 发生变化
重平衡期间所有消费者停止消费,大规模集群中影响尤为明显。
关键配置参数
| 参数 | 默认值 | 作用 |
|---|---|---|
session.timeout.ms | 10000ms | Broker 判定消费者下线的超时窗口 |
heartbeat.interval.ms | 3000ms | 消费者向 Broker 发送心跳的频率 |
max.poll.interval.ms | 300000ms | 两次 poll() 调用之间允许的最大间隔 |
最佳实践:
session.timeout.ms >= 3 × heartbeat.interval.ms
- 对于业务处理耗时较长的场景,应适当增大
max.poll.interval.ms - 避免在 poll 循环中执行耗时的同步 IO 操作
- 可通过
max.poll.records控制每次拉取的消息数量,减少单次处理时间
静态成员资格
Kafka 2.3+ 引入 group.instance.id,配置后消费者以静态成员身份加入消费组。静态成员重启时不会立即触发重平衡,而是在 session.timeout.ms 超时后才重新分配,适合有状态的消费者场景(如流处理)。