本文是大数据系列第 61 篇,深入解析 Kafka 主题、分区与消费者机制,并重点介绍重平衡优化策略。

完整图文版(含截图):CSDN 原文 | 掘金

Topic(主题)

Topic 是 Kafka 中消息的逻辑分类单元,生产者向特定 Topic 发布消息,消费者从 Topic 订阅消息。创建 Topic 时需关注以下核心参数:

  • replication-factor:副本数,决定数据冗余备份量,生产环境建议设为 3
  • retention.ms:消息保留时长,默认 7 天(604800000ms)
  • cleanup.policy:清理策略,delete(按时间/大小删除)或 compact(日志压缩,保留最新值)

Partition(分区)

分区是 Kafka 物理存储的基本单位,也是实现水平扩展和高吞吐的关键:

  • 每个 Topic 的数据分布在多个 Partition 上,分区之间相互独立
  • 分区分散在不同的 Broker 上,实现负载均衡
  • 分区数量建议设置为 Broker 数量的整数倍,充分利用集群资源
  • 消费者并发度受分区数限制,同一 Consumer Group 中每个分区只能由一个消费者处理

Consumer Group(消费组)

消费组是 Kafka 实现不同消费模式的核心设计:

  • 单播模式:同一消息只被某个消费组中的一个消费者处理,适合任务分发场景
  • 广播模式:多个消费组订阅同一 Topic,每个组都能收到完整消息,适合事件通知场景
  • 消费者数量超过分区数时,多余的消费者处于空闲状态

Consumer(消费者)

Kafka 消费者采用 PULL 模式 主动拉取消息,而非 Broker 推送。这种设计的优势在于消费者可以按自身处理能力控制拉取速率,避免被压垮。

自定义反序列化

Broker 存储的消息是字节数组,消费者必须执行反序列化才能处理。实现 Deserializer<T> 接口即可自定义:

public class UserDeserializer implements Deserializer<User> {
    @Override
    public User deserialize(String topic, byte[] data) {
        ByteBuffer buffer = ByteBuffer.wrap(data);
        int userId = buffer.getInt();
        int usernameLen = buffer.getInt();
        byte[] usernameBytes = new byte[usernameLen];
        buffer.get(usernameBytes);
        String username = new String(usernameBytes, StandardCharsets.UTF_8);
        int age = buffer.getInt();

        User user = new User();
        user.setUserId(userId);
        user.setUsername(username);
        user.setAge(age);
        return user;
    }
}

使用时在消费者配置中指定:

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class.getName());

消费者拦截器

拦截器提供可插拔的消息拦截能力,常用于日志、监控和消息过滤:

public class ConsumerInterceptor01 implements ConsumerInterceptor<String, String> {
    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        System.out.println("=== 拦截器 onConsume,消息数:" + records.count() + " ===");
        // 可过滤或修改消息
        return records;
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        System.out.println("=== 偏移量提交成功 ===");
    }
}

偏移量管理

偏移量(Offset)记录消费者在每个分区中的消费位置,是保证消息不丢失、不重复的关键:

提交方式配置特点
自动提交enable.auto.commit=true简单,但可能丢消息或重复消费
手动同步提交commitSync()可靠,阻塞直到提交成功
手动异步提交commitAsync()非阻塞,性能好,需处理失败回调

生产场景推荐手动提交,在业务处理完成后再提交偏移量,确保 at-least-once 语义。

重平衡(Rebalance)优化

触发条件

以下三种情况会触发重平衡,导致消费暂停:

  1. 消费组成员变化(新消费者加入或已有消费者离开/崩溃)
  2. Topic 分区数量发生变化
  3. 消费者订阅的 Topic 发生变化

重平衡期间所有消费者停止消费,大规模集群中影响尤为明显。

关键配置参数

参数默认值作用
session.timeout.ms10000msBroker 判定消费者下线的超时窗口
heartbeat.interval.ms3000ms消费者向 Broker 发送心跳的频率
max.poll.interval.ms300000ms两次 poll() 调用之间允许的最大间隔

最佳实践:

session.timeout.ms >= 3 × heartbeat.interval.ms
  • 对于业务处理耗时较长的场景,应适当增大 max.poll.interval.ms
  • 避免在 poll 循环中执行耗时的同步 IO 操作
  • 可通过 max.poll.records 控制每次拉取的消息数量,减少单次处理时间

静态成员资格

Kafka 2.3+ 引入 group.instance.id,配置后消费者以静态成员身份加入消费组。静态成员重启时不会立即触发重平衡,而是在 session.timeout.ms 超时后才重新分配,适合有状态的消费者场景(如流处理)。