本文是大数据系列第 60 篇,深入 Kafka Consumer 侧,讲解消费流程、Consumer Group 分区分配、心跳机制以及 Rebalance 触发条件和参数调优。

完整图文版(含截图):CSDN 原文 | 掘金

Consumer Group 与分区分配

Kafka 消费者通过 group.id 归属于同一 Consumer Group,同一 Group 内每个 Partition 只分配给一个 Consumer,天然实现负载均衡

以 4 个分区(4P)为例,不同 Consumer 数量下的分配情况:

场景分配方式
4P, 1C单 Consumer 消费全部 4 个分区
4P, 2C每个 Consumer 分配 2 个分区
4P, 4C每个 Consumer 分配 1 个分区(最优)
4P, 5C4 个 Consumer 各消费 1 个分区,第 5 个空闲

建议 Consumer 数量不超过分区数,否则出现空闲 Consumer,浪费资源。

消息消费流程

Kafka 采用 Pull(拉取)模型,Consumer 主动向 Broker 拉取消息,而非 Broker 推送。这带来的好处是消费速度由 Consumer 自主控制,不会因为 Broker 推送过快而压垮消费端。

消费过程分三个阶段:

  1. 分区分配:Group Coordinator 根据 Consumer 数量和分配策略(RangeAssignor / RoundRobinAssignor / StickyAssignor)为每个 Consumer 分配分区。
  2. 拉取消息:Consumer 调用 poll() 拉取批量消息,每次最多拉取 max.poll.records 条(默认 500)。
  3. Offset 提交:处理完消息后提交 offset,记录到内部 topic __consumer_offsets。支持自动提交(enable.auto.commit=true)和手动提交(commitSync() / commitAsync())。

心跳机制

Consumer 通过心跳告知 Group Coordinator 自己仍然存活:

Consumer ──── heartbeat ────► Group Coordinator
        ◄──── 响应 ───────────

心跳线程与业务线程(poll() 线程)是分离的,即使 poll() 处理消息耗时较长,心跳依然正常发送。

核心参数

参数默认值说明
session.timeout.ms45000 msCoordinator 在此时间内未收到心跳则认为 Consumer 下线,触发 Rebalance
heartbeat.interval.ms3000 msConsumer 发送心跳的频率,必须 < session.timeout.ms
max.poll.interval.ms300000 ms (5分钟)两次 poll() 调用的最大间隔,超时则认为消费者”卡死”,触发 Rebalance
request.timeout.ms30000 ms等待 Broker 响应的超时时间

推荐配置原则

  • session.timeout.ms 设置为 heartbeat.interval.ms3-5 倍,留出足够的宽容窗口。
  • 如果消费逻辑耗时较长(如批量写数据库),适当增大 max.poll.interval.ms 或减小 max.poll.records,避免因处理超时触发不必要的 Rebalance。

Rebalance 流程

以下情况会触发 Rebalance(重新分配分区):

  • Consumer 加入或退出 Group
  • Consumer 心跳超时(session.timeout.ms 超期)
  • poll() 调用间隔超过 max.poll.interval.ms
  • Topic 分区数变化

Rebalance 执行过程:

1. 所有 Consumer 暂停消费
2. 释放当前持有的分区
3. Group Coordinator 根据策略重新分配分区
4. Consumer 获取新分区,从上次提交的 offset 继续消费

Rebalance 期间消费暂停,会引起消费延迟。生产环境应尽量减少不必要的 Rebalance,保持 Consumer 数量稳定。

Offset 管理

Offset 存储在 Kafka 内部 topic __consumer_offsets 中,每次提交记录格式为:

GroupId + Topic + Partition → Offset

自动提交(默认开启):

spring:
  kafka:
    consumer:
      enable-auto-commit: true
      auto-commit-interval: 5000  # 每 5 秒自动提交

自动提交有丢消息风险:拉取后尚未处理完,offset 已提交,若进程崩溃则消息丢失。

手动提交(推荐生产环境):

@KafkaListener(topics = "wzk_topic_test")
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
    try {
        // 处理消息
        process(record);
        // 处理成功后手动提交
        ack.acknowledge();
    } catch (Exception e) {
        // 处理失败,不提交 offset,下次重新消费
        log.error("消费失败", e);
    }
}

在 Spring-Kafka 中启用手动提交需配置 ack-mode

spring:
  kafka:
    listener:
      ack-mode: manual

小结

关注点推荐做法
Consumer 数量不超过分区数
Rebalance 频率稳定 Consumer 数量,合理设置 timeout 参数
消息丢失防护使用手动提交 offset
消费卡死检测适当调小 max.poll.interval.ms,或提高消费吞吐
心跳参数session.timeout.ms = 3-5 × heartbeat.interval.ms

Kafka Consumer 调优的核心目标是:稳定消费、不丢消息、尽量减少 Rebalance