This is article 61 in the Big Data series, deeply analyzing Kafka Topic, Partition, and Consumer mechanisms, with focus on rebalance optimization strategies.
Topic
Topic is the logical classification unit for messages in Kafka. Producers publish messages to specific Topics, Consumers subscribe to Topics. Key parameters when creating Topic:
- replication-factor: Replica count, determines data redundancy, recommended 3 in production
- retention.ms: Message retention duration, default 7 days (604800000ms)
- cleanup.policy: Cleanup strategy,
delete(time/size based) orcompact(log compaction, keeps latest value)
Partition
Partition is the basic unit of Kafka physical storage, and the key to horizontal scaling and high throughput:
- Each Topic’s data is distributed across multiple Partitions, partitions are independent of each other
- Partitions are spread across different Brokers, achieving load balancing
- Partition count recommended as integer multiple of Broker count, fully utilizing cluster resources
- Consumer parallelism limited by partition count, each partition in same Consumer Group can only be processed by one Consumer
Consumer Group
Consumer Group is the core design for different consumption patterns in Kafka:
- Unicast mode: Same message processed by only one Consumer in a group, suitable for task distribution
- Broadcast mode: Multiple groups subscribe to same Topic, each group receives complete messages, suitable for event notification
- When Consumer count exceeds partition count, excess Consumers are idle
Consumer
Kafka Consumer uses PULL model to actively pull messages from Broker rather than Broker pushing. The advantage is Consumers can control pull rate based on their processing capacity, avoiding being overwhelmed.
Custom Deserialization
Messages stored in Broker are byte arrays, Consumers must deserialize to process. Implement Deserializer<T> interface for customization:
public class UserDeserializer implements Deserializer<User> {
@Override
public User deserialize(String topic, byte[] data) {
ByteBuffer buffer = ByteBuffer.wrap(data);
int userId = buffer.getInt();
int usernameLen = buffer.getInt();
byte[] usernameBytes = new byte[usernameLen];
buffer.get(usernameBytes);
String username = new String(usernameBytes, StandardCharsets.UTF_8);
int age = buffer.getInt();
User user = new User();
user.setUserId(userId);
user.setUsername(username);
user.setAge(age);
return user;
}
}
Use by specifying in Consumer config:
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, UserDeserializer.class.getName());
Consumer Interceptor
Interceptors provide pluggable message interception capability, commonly used for logging, monitoring and message filtering:
public class ConsumerInterceptor01 implements ConsumerInterceptor<String, String> {
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
System.out.println("=== Interceptor onConsume, message count: " + records.count() + " ===");
// Can filter or modify messages
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
System.out.println("=== Offset committed successfully ===");
}
}
Offset Management
Offset records Consumer’s consumption position in each partition, key to ensuring no message loss or duplication:
| Commit Method | Configuration | Characteristics |
|---|---|---|
| Auto commit | enable.auto.commit=true | Simple, but may lose messages or duplicate |
| Manual sync commit | commitSync() | Reliable, blocks until commit succeeds |
| Manual async commit | commitAsync() | Non-blocking, good performance, need handle failure callback |
Production scenarios recommend manual commit, commit offset after business processing completes, ensuring at-least-once semantics.
Rebalance Optimization
Triggers
Three situations trigger rebalance, causing consumption pause:
- Consumer group membership changes (new Consumer joins or existing leaves/crashes)
- Topic partition count changes
- Consumer subscribed Topics change
All Consumers stop consumption during rebalance, particularly noticeable in large-scale clusters.
Key Configuration Parameters
| Parameter | Default | Purpose |
|---|---|---|
session.timeout.ms | 10000ms | Broker determines Consumer offline timeout window |
heartbeat.interval.ms | 3000ms | Frequency Consumer sends heartbeat to Broker |
max.poll.interval.ms | 300000ms | Max interval allowed between poll() calls |
Best practices:
session.timeout.ms >= 3 × heartbeat.interval.ms
- For scenarios with long business processing time, appropriately increase
max.poll.interval.ms - Avoid time-consuming synchronous IO operations in poll loop
- Can control messages pulled per call via
max.poll.records, reducing single processing time
Static Membership
Kafka 2.3+ introduces group.instance.id. After configuration, Consumers join Consumer Group as static members. Static members don’t immediately trigger rebalance on restart, only reassigned after session.timeout.ms timeout, suitable for stateful Consumer scenarios (e.g., stream processing).