本文是大数据系列第 58 篇,聚焦 Kafka Producer 发送链路中的序列化与分区两个关键环节,提供自定义 Serializer 和 Partitioner 的完整实现示例。
消息发送流程回顾
Kafka 消息从构造到落盘经历五个阶段:
- 消息创建:构造包含 topic、可选 key、消息体、时间戳和 headers 的
ProducerRecord。 - 序列化:将 key 和 value 转换为字节数组,以便网络传输。
- 分区选择:根据 key 或策略确定目标分区。
- 缓冲与批量传输:消息写入
RecordAccumulator,Sender 线程批量发出。 - ACK 确认:Broker 持久化后按
acks配置返回确认。
本文重点讲解第 2、3 步。
序列化
内置序列化器
Kafka 提供常用的内置实现,配置于 key.serializer / value.serializer:
| 类名 | 适用场景 |
|---|---|
StringSerializer | 字符串消息(最常用) |
ByteArraySerializer | 原始字节,不做转换 |
IntegerSerializer | 整数 key,常用于分区路由 |
LongSerializer | 长整数 key |
自定义序列化器
当业务对象需要发送时,可实现 org.apache.kafka.common.serialization.Serializer<T> 接口:
public interface Serializer<T> extends Closeable {
// 可选:从 Producer 配置读取参数
default void configure(Map<String, ?> configs, boolean isKey) {}
// 核心方法:将对象转为字节数组
byte[] serialize(String topic, T data);
// 可选:携带 Headers 的序列化(Kafka 2.1+)
default byte[] serialize(String topic, Headers headers, T data) {
return serialize(topic, data);
}
default void close() {}
}
示例:User 对象序列化
public class User {
private Integer userId;
private String username;
private String password;
private Integer age;
// getters / setters 省略
}
public class UserSerializer implements Serializer<User> {
@Override
public byte[] serialize(String topic, User data) {
if (data == null) return null;
byte[] usernameBytes = data.getUsername() != null
? data.getUsername().getBytes(StandardCharsets.UTF_8) : new byte[0];
byte[] passwordBytes = data.getPassword() != null
? data.getPassword().getBytes(StandardCharsets.UTF_8) : new byte[0];
// 布局:userId(4) + usernameLen(4) + username + passwordLen(4) + password + age(4)
ByteBuffer buffer = ByteBuffer.allocate(
4 + 4 + usernameBytes.length + 4 + passwordBytes.length + 4);
buffer.putInt(data.getUserId());
buffer.putInt(usernameBytes.length);
buffer.put(usernameBytes);
buffer.putInt(passwordBytes.length);
buffer.put(passwordBytes);
buffer.putInt(data.getAge());
return buffer.array();
}
}
注意:
ByteBuffer容量必须精确计算,否则序列化结果截断或浪费空间。对应的反序列化器(Deserializer<User>)要按相同字段顺序解析。
配置方式:
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class.getName());
分区策略
默认分区规则
Kafka 默认 Partitioner 按以下优先级路由:
ProducerRecord中显式指定了partition→ 直接使用。- 消息带有
key→Math.abs(key.hashCode()) % partitionCount,保证同 key 消息落入同一分区,维持局部有序。 - 消息无
key→ 轮询(Sticky Partitioner,Kafka 2.4+ 默认),在批次填满前粘性发往同一分区,提升批量效率。
自定义分区器
实现 org.apache.kafka.clients.producer.Partitioner 接口:
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// 示例:以 value 字符串长度取模决定分区(实际按业务逻辑定制)
if (valueBytes != null) {
return valueBytes.length % numPartitions;
}
return 0;
}
@Override
public void configure(Map<String, ?> configs) {
// 可从 Producer 配置读取自定义参数
}
@Override
public void close() {}
}
配置方式:
configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
自定义序列化 + 分区的协同
在同一 Producer 配置中同时指定自定义序列化器和分区器时,执行顺序为:
onSend (拦截器) → 序列化 key/value → 分区路由 → 写入缓冲
分区器在序列化 之后 运行,因此 partition() 方法中可以同时访问原始对象(key/value)和字节数组(keyBytes/valueBytes),灵活性更高。
小结
- 自定义序列化器适合传输复杂业务对象,避免 JSON 序列化的反射开销。
- 自定义分区器适合按业务维度(如地区、用户类型)将消息精准路由到指定分区,满足数据局部性需求。
- 实现时注意线程安全,
partition()和serialize()均可能被多线程并发调用。