This is article 58 in the Big Data series, focusing on serialization and partitioning in Kafka Producer sending pipeline, providing complete implementation examples for custom Serializer and Partitioner.
Message Sending Flow Review
Kafka message goes through five stages from construction to disk:
- Message creation: Construct
ProducerRecordwith topic, optional key, body, timestamp, and headers. - Serialization: Convert key and value to byte arrays for network transmission.
- Partition selection: Determine target partition based on key or strategy.
- Buffer & batch transmission: Messages written to
RecordAccumulator, Sender thread batch sends. - ACK confirmation: Broker persists and returns confirmation based on
acksconfig.
This article focuses on steps 2 and 3.
Serialization
Built-in Serializers
Kafka provides common built-in implementations, configured via key.serializer / value.serializer:
| Class Name | Use Case |
|---|---|
StringSerializer | String messages (most common) |
ByteArraySerializer | Raw bytes, no conversion |
IntegerSerializer | Integer key, commonly used for partition routing |
LongSerializer | Long integer key |
Custom Serializer
When business objects need to be sent, implement org.apache.kafka.common.serialization.Serializer<T> interface:
public interface Serializer<T> extends Closeable {
// Optional: read parameters from Producer config
default void configure(Map<String, ?> configs, boolean isKey) {}
// Core method: convert object to byte array
byte[] serialize(String topic, T data);
// Optional: serialization with Headers (Kafka 2.1+)
default byte[] serialize(String topic, Headers headers, T data) {
return serialize(topic, data);
}
default void close() {}
}
Example: User Object Serialization
public class User {
private Integer userId;
private String username;
private String password;
private Integer age;
// getters / setters omitted
}
public class UserSerializer implements Serializer<User> {
@Override
public byte[] serialize(String topic, User data) {
if (data == null) return null;
byte[] usernameBytes = data.getUsername() != null
? data.getUsername().getBytes(StandardCharsets.UTF_8) : new byte[0];
byte[] passwordBytes = data.getPassword() != null
? data.getPassword().getBytes(StandardCharsets.UTF_8) : new byte[0];
// Layout: userId(4) + usernameLen(4) + username + passwordLen(4) + password + age(4)
ByteBuffer buffer = ByteBuffer.allocate(
4 + 4 + usernameBytes.length + 4 + passwordBytes.length + 4);
buffer.putInt(data.getUserId());
buffer.putInt(usernameBytes.length);
buffer.put(usernameBytes);
buffer.putInt(passwordBytes.length);
buffer.put(passwordBytes);
buffer.putInt(data.getAge());
return buffer.array();
}
}
Note:
ByteBuffercapacity must be precisely calculated, otherwise serialization result truncates or wastes space. Corresponding deserializer (Deserializer<User>) must parse in same field order.
Configuration:
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class.getName());
Partitioning Strategy
Default Partitioning Rules
Kafka default Partitioner routes by priority:
ProducerRecordexplicitly specifiespartition→ use directly.- Message has
key→Math.abs(key.hashCode()) % partitionCount, ensures same Key messages in same partition, maintains local order. - Message has no
key→ Round-Robin (Sticky Partitioner, default in Kafka 2.4+), sticky to same partition until batch fills, improves batch efficiency.
Custom Partitioner
Implement org.apache.kafka.clients.producer.Partitioner interface:
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// Example: modulo by value string length (customize by business logic)
if (valueBytes != null) {
return valueBytes.length % numPartitions;
}
return 0;
}
@Override
public void configure(Map<String, ?> configs) {
// Read custom parameters from Producer config
}
@Override
public void close() {}
}
Configuration:
configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
Custom Serialization + Partitioning Coordination
When specifying both custom serializer and partitioner in same Producer config, execution order:
onSend (interceptor) → serialize key/value → partition routing → write to buffer
Partitioner runs after serialization, so partition() method can access both original objects (key/value) and byte arrays (keyBytes/valueBytes), providing more flexibility.
Summary
- Custom serializer suitable for transmitting complex business objects, avoids JSON serialization reflection overhead.
- Custom partitioner suitable for precise message routing by business dimension (e.g., region, user type), meets data locality requirements.
- Note thread safety during implementation,
partition()andserialize()may be called concurrently by multiple threads.