This is article 58 in the Big Data series, focusing on serialization and partitioning in Kafka Producer sending pipeline, providing complete implementation examples for custom Serializer and Partitioner.

Message Sending Flow Review

Kafka message goes through five stages from construction to disk:

  1. Message creation: Construct ProducerRecord with topic, optional key, body, timestamp, and headers.
  2. Serialization: Convert key and value to byte arrays for network transmission.
  3. Partition selection: Determine target partition based on key or strategy.
  4. Buffer & batch transmission: Messages written to RecordAccumulator, Sender thread batch sends.
  5. ACK confirmation: Broker persists and returns confirmation based on acks config.

This article focuses on steps 2 and 3.

Serialization

Built-in Serializers

Kafka provides common built-in implementations, configured via key.serializer / value.serializer:

Class NameUse Case
StringSerializerString messages (most common)
ByteArraySerializerRaw bytes, no conversion
IntegerSerializerInteger key, commonly used for partition routing
LongSerializerLong integer key

Custom Serializer

When business objects need to be sent, implement org.apache.kafka.common.serialization.Serializer<T> interface:

public interface Serializer<T> extends Closeable {
    // Optional: read parameters from Producer config
    default void configure(Map<String, ?> configs, boolean isKey) {}
    // Core method: convert object to byte array
    byte[] serialize(String topic, T data);
    // Optional: serialization with Headers (Kafka 2.1+)
    default byte[] serialize(String topic, Headers headers, T data) {
        return serialize(topic, data);
    }
    default void close() {}
}

Example: User Object Serialization

public class User {
    private Integer userId;
    private String username;
    private String password;
    private Integer age;
    // getters / setters omitted
}

public class UserSerializer implements Serializer<User> {

    @Override
    public byte[] serialize(String topic, User data) {
        if (data == null) return null;

        byte[] usernameBytes = data.getUsername() != null
            ? data.getUsername().getBytes(StandardCharsets.UTF_8) : new byte[0];
        byte[] passwordBytes = data.getPassword() != null
            ? data.getPassword().getBytes(StandardCharsets.UTF_8) : new byte[0];

        // Layout: userId(4) + usernameLen(4) + username + passwordLen(4) + password + age(4)
        ByteBuffer buffer = ByteBuffer.allocate(
            4 + 4 + usernameBytes.length + 4 + passwordBytes.length + 4);

        buffer.putInt(data.getUserId());
        buffer.putInt(usernameBytes.length);
        buffer.put(usernameBytes);
        buffer.putInt(passwordBytes.length);
        buffer.put(passwordBytes);
        buffer.putInt(data.getAge());

        return buffer.array();
    }
}

Note: ByteBuffer capacity must be precisely calculated, otherwise serialization result truncates or wastes space. Corresponding deserializer (Deserializer<User>) must parse in same field order.

Configuration:

configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class.getName());

Partitioning Strategy

Default Partitioning Rules

Kafka default Partitioner routes by priority:

  1. ProducerRecord explicitly specifies partition → use directly.
  2. Message has keyMath.abs(key.hashCode()) % partitionCount, ensures same Key messages in same partition, maintains local order.
  3. Message has no key → Round-Robin (Sticky Partitioner, default in Kafka 2.4+), sticky to same partition until batch fills, improves batch efficiency.

Custom Partitioner

Implement org.apache.kafka.clients.producer.Partitioner interface:

public class MyPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        // Example: modulo by value string length (customize by business logic)
        if (valueBytes != null) {
            return valueBytes.length % numPartitions;
        }
        return 0;
    }

    @Override
    public void configure(Map<String, ?> configs) {
        // Read custom parameters from Producer config
    }

    @Override
    public void close() {}
}

Configuration:

configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());

Custom Serialization + Partitioning Coordination

When specifying both custom serializer and partitioner in same Producer config, execution order:

onSend (interceptor) → serialize key/value → partition routing → write to buffer

Partitioner runs after serialization, so partition() method can access both original objects (key/value) and byte arrays (keyBytes/valueBytes), providing more flexibility.

Summary

  • Custom serializer suitable for transmitting complex business objects, avoids JSON serialization reflection overhead.
  • Custom partitioner suitable for precise message routing by business dimension (e.g., region, user type), meets data locality requirements.
  • Note thread safety during implementation, partition() and serialize() may be called concurrently by multiple threads.