本文是大数据系列第 60 篇,深入 Kafka Consumer 侧,讲解消费流程、Consumer Group 分区分配、心跳机制以及 Rebalance 触发条件和参数调优。
Consumer Group 与分区分配
Kafka 消费者通过 group.id 归属于同一 Consumer Group,同一 Group 内每个 Partition 只分配给一个 Consumer,天然实现负载均衡。
以 4 个分区(4P)为例,不同 Consumer 数量下的分配情况:
| 场景 | 分配方式 |
|---|---|
| 4P, 1C | 单 Consumer 消费全部 4 个分区 |
| 4P, 2C | 每个 Consumer 分配 2 个分区 |
| 4P, 4C | 每个 Consumer 分配 1 个分区(最优) |
| 4P, 5C | 4 个 Consumer 各消费 1 个分区,第 5 个空闲 |
建议 Consumer 数量不超过分区数,否则出现空闲 Consumer,浪费资源。
消息消费流程
Kafka 采用 Pull(拉取)模型,Consumer 主动向 Broker 拉取消息,而非 Broker 推送。这带来的好处是消费速度由 Consumer 自主控制,不会因为 Broker 推送过快而压垮消费端。
消费过程分三个阶段:
- 分区分配:Group Coordinator 根据 Consumer 数量和分配策略(RangeAssignor / RoundRobinAssignor / StickyAssignor)为每个 Consumer 分配分区。
- 拉取消息:Consumer 调用
poll()拉取批量消息,每次最多拉取max.poll.records条(默认 500)。 - Offset 提交:处理完消息后提交 offset,记录到内部 topic
__consumer_offsets。支持自动提交(enable.auto.commit=true)和手动提交(commitSync()/commitAsync())。
心跳机制
Consumer 通过心跳告知 Group Coordinator 自己仍然存活:
Consumer ──── heartbeat ────► Group Coordinator
◄──── 响应 ───────────
心跳线程与业务线程(poll() 线程)是分离的,即使 poll() 处理消息耗时较长,心跳依然正常发送。
核心参数
| 参数 | 默认值 | 说明 |
|---|---|---|
session.timeout.ms | 45000 ms | Coordinator 在此时间内未收到心跳则认为 Consumer 下线,触发 Rebalance |
heartbeat.interval.ms | 3000 ms | Consumer 发送心跳的频率,必须 < session.timeout.ms |
max.poll.interval.ms | 300000 ms (5分钟) | 两次 poll() 调用的最大间隔,超时则认为消费者”卡死”,触发 Rebalance |
request.timeout.ms | 30000 ms | 等待 Broker 响应的超时时间 |
推荐配置原则:
session.timeout.ms设置为heartbeat.interval.ms的 3-5 倍,留出足够的宽容窗口。- 如果消费逻辑耗时较长(如批量写数据库),适当增大
max.poll.interval.ms或减小max.poll.records,避免因处理超时触发不必要的 Rebalance。
Rebalance 流程
以下情况会触发 Rebalance(重新分配分区):
- Consumer 加入或退出 Group
- Consumer 心跳超时(
session.timeout.ms超期) poll()调用间隔超过max.poll.interval.ms- Topic 分区数变化
Rebalance 执行过程:
1. 所有 Consumer 暂停消费
2. 释放当前持有的分区
3. Group Coordinator 根据策略重新分配分区
4. Consumer 获取新分区,从上次提交的 offset 继续消费
Rebalance 期间消费暂停,会引起消费延迟。生产环境应尽量减少不必要的 Rebalance,保持 Consumer 数量稳定。
Offset 管理
Offset 存储在 Kafka 内部 topic __consumer_offsets 中,每次提交记录格式为:
GroupId + Topic + Partition → Offset
自动提交(默认开启):
spring:
kafka:
consumer:
enable-auto-commit: true
auto-commit-interval: 5000 # 每 5 秒自动提交
自动提交有丢消息风险:拉取后尚未处理完,offset 已提交,若进程崩溃则消息丢失。
手动提交(推荐生产环境):
@KafkaListener(topics = "wzk_topic_test")
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
try {
// 处理消息
process(record);
// 处理成功后手动提交
ack.acknowledge();
} catch (Exception e) {
// 处理失败,不提交 offset,下次重新消费
log.error("消费失败", e);
}
}
在 Spring-Kafka 中启用手动提交需配置 ack-mode:
spring:
kafka:
listener:
ack-mode: manual
小结
| 关注点 | 推荐做法 |
|---|---|
| Consumer 数量 | 不超过分区数 |
| Rebalance 频率 | 稳定 Consumer 数量,合理设置 timeout 参数 |
| 消息丢失防护 | 使用手动提交 offset |
| 消费卡死检测 | 适当调小 max.poll.interval.ms,或提高消费吞吐 |
| 心跳参数 | session.timeout.ms = 3-5 × heartbeat.interval.ms |
Kafka Consumer 调优的核心目标是:稳定消费、不丢消息、尽量减少 Rebalance。