本文是大数据系列第 58 篇,聚焦 Kafka Producer 发送链路中的序列化与分区两个关键环节,提供自定义 SerializerPartitioner 的完整实现示例。

完整图文版(含截图):CSDN 原文 | 掘金

消息发送流程回顾

Kafka 消息从构造到落盘经历五个阶段:

  1. 消息创建:构造包含 topic、可选 key、消息体、时间戳和 headers 的 ProducerRecord
  2. 序列化:将 key 和 value 转换为字节数组,以便网络传输。
  3. 分区选择:根据 key 或策略确定目标分区。
  4. 缓冲与批量传输:消息写入 RecordAccumulator,Sender 线程批量发出。
  5. ACK 确认:Broker 持久化后按 acks 配置返回确认。

本文重点讲解第 2、3 步。

序列化

内置序列化器

Kafka 提供常用的内置实现,配置于 key.serializer / value.serializer

类名适用场景
StringSerializer字符串消息(最常用)
ByteArraySerializer原始字节,不做转换
IntegerSerializer整数 key,常用于分区路由
LongSerializer长整数 key

自定义序列化器

当业务对象需要发送时,可实现 org.apache.kafka.common.serialization.Serializer<T> 接口:

public interface Serializer<T> extends Closeable {
    // 可选:从 Producer 配置读取参数
    default void configure(Map<String, ?> configs, boolean isKey) {}
    // 核心方法:将对象转为字节数组
    byte[] serialize(String topic, T data);
    // 可选:携带 Headers 的序列化(Kafka 2.1+)
    default byte[] serialize(String topic, Headers headers, T data) {
        return serialize(topic, data);
    }
    default void close() {}
}

示例:User 对象序列化

public class User {
    private Integer userId;
    private String username;
    private String password;
    private Integer age;
    // getters / setters 省略
}

public class UserSerializer implements Serializer<User> {

    @Override
    public byte[] serialize(String topic, User data) {
        if (data == null) return null;

        byte[] usernameBytes = data.getUsername() != null
            ? data.getUsername().getBytes(StandardCharsets.UTF_8) : new byte[0];
        byte[] passwordBytes = data.getPassword() != null
            ? data.getPassword().getBytes(StandardCharsets.UTF_8) : new byte[0];

        // 布局:userId(4) + usernameLen(4) + username + passwordLen(4) + password + age(4)
        ByteBuffer buffer = ByteBuffer.allocate(
            4 + 4 + usernameBytes.length + 4 + passwordBytes.length + 4);

        buffer.putInt(data.getUserId());
        buffer.putInt(usernameBytes.length);
        buffer.put(usernameBytes);
        buffer.putInt(passwordBytes.length);
        buffer.put(passwordBytes);
        buffer.putInt(data.getAge());

        return buffer.array();
    }
}

注意:ByteBuffer 容量必须精确计算,否则序列化结果截断或浪费空间。对应的反序列化器(Deserializer<User>)要按相同字段顺序解析。

配置方式:

configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class.getName());

分区策略

默认分区规则

Kafka 默认 Partitioner 按以下优先级路由:

  1. ProducerRecord 中显式指定了 partition → 直接使用。
  2. 消息带有 keyMath.abs(key.hashCode()) % partitionCount,保证同 key 消息落入同一分区,维持局部有序。
  3. 消息无 key → 轮询(Sticky Partitioner,Kafka 2.4+ 默认),在批次填满前粘性发往同一分区,提升批量效率。

自定义分区器

实现 org.apache.kafka.clients.producer.Partitioner 接口:

public class MyPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        // 示例:以 value 字符串长度取模决定分区(实际按业务逻辑定制)
        if (valueBytes != null) {
            return valueBytes.length % numPartitions;
        }
        return 0;
    }

    @Override
    public void configure(Map<String, ?> configs) {
        // 可从 Producer 配置读取自定义参数
    }

    @Override
    public void close() {}
}

配置方式:

configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());

自定义序列化 + 分区的协同

在同一 Producer 配置中同时指定自定义序列化器和分区器时,执行顺序为:

onSend (拦截器) → 序列化 key/value → 分区路由 → 写入缓冲

分区器在序列化 之后 运行,因此 partition() 方法中可以同时访问原始对象(key/value)和字节数组(keyBytes/valueBytes),灵活性更高。

小结

  • 自定义序列化器适合传输复杂业务对象,避免 JSON 序列化的反射开销。
  • 自定义分区器适合按业务维度(如地区、用户类型)将消息精准路由到指定分区,满足数据局部性需求。
  • 实现时注意线程安全,partition()serialize() 均可能被多线程并发调用。