Offset Management
When Spark Streaming integrates with Kafka, allows reading data from one or multiple Topics. A Kafka Topic contains one or multiple partitions, messages in each partition are stored in order, using offset to mark message position. Developers can control data reading position through offset in Spark Streaming application.
Offset management is very important for ensuring data coherence throughout streaming application lifecycle. If Offset is not persisted before application stops or exits with error, message will be lost, then Spark Streaming cannot continue consuming from position where it stopped or saved.
Obtaining Offsets
When Spark Streaming integrates with Kafka, allows obtaining consumed Offset. Specific methods:
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter => val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
}
Note: Type conversion to HashOffsetRanges only succeeds when completed in the first method called on createDirectStream, not in subsequent method chains. Correspondence between RDD partitions and Kafka partitions is lost after Shuffle or repartition, like reduceByKey or window.
Storing Offsets
In case of Spark Streaming program failure, Kafka delivery semantics depend on how and when offsets are stored. Spark output operation semantics are at-least-once.
To implement EOS (Exactly Once Semantics), must store offsets after idempotent output or put offset storage and output in one transaction.
CheckPoint
CheckPoint is an important mechanism during Spark Streaming operation, mainly used to save two types of key information:
-
Metadata Information:
- Application configuration information
- Unfinished batch operations
- DStream operation graph
- Generated RDDs and their dependencies
-
Data State:
- Each RDD’s data state
- Kafka consumption Offset information
- Window operation state data
This information is periodically persisted (default interval 10 seconds) to reliable storage systems, common choices:
- HDFS (most commonly used)
- AWS S3
- Other Hadoop-compatible file systems
When application unexpectedly terminates or cluster fails, Spark Streaming can use CheckPoint information to quickly recover to recent consistent state, ensuring exactly-once data processing semantics.
About CheckPoint version compatibility issues:
When Spark Streaming program code changes and redeploys, common deserialization exceptions (like SerializationException) occur because:
- During first run, CheckPoint mechanism serializes and saves entire application Jar package
- On restart, system tries to deserialize using saved Jar package
- If new version code logic is inconsistent with saved version in CheckPoint (like: class definition changes, method signature modifications, serialization ID changes, business logic adjustments), will cause deserialization failure
Solutions and notes:
-
Standard solution:
- Delete CheckPoint directory on HDFS (like: hdfs dfs -rm -r /checkpoint/path)
- This will also clear saved Offset information, causing program to re-consume from position determined by Kafka’s auto.offset.reset configuration
-
Alternative (scenarios needing to preserve Offset):
- Manually record Offset to external storage (like Zookeeper/HBase)
- Specify starting Offset when using createDirectStream
Kafka
By default, the consumer periodically auto-commits offsets, storing them in a special Kafka topic (_consumer_offsets). However, in certain scenarios this can cause issues because messages may have been pulled from Kafka by the consumer but not yet processed.
You can set enable.auto.commit to false and manually commit offsets after the Spark Streaming program outputs results.
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
// Manually commit offsets after the output operation completes; at this point, offsets are committed to Kafka's message queue
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
Similar to HasOffsetRanges, the conversion to CanCommitOffsets only succeeds when called on the result of createDirectStream, not after transformations. The commitAsync call is thread-safe but must be executed after output.
Custom Storage
Offsets can be managed in various ways, but generally follow these steps:
-
During DStream initialization, specify offsets for each partition to read data from specified positions
-
Read and process messages
-
Store the processed result data
-
Store and commit offsets in a series of operations to meet stricter semantic requirements. This includes idempotent operations and atomic offset storage
-
Store offsets in external persistent databases such as HBase, Kafka, HDFS, ZooKeeper, Redis, MySQL
-
You can store offsets in HDFS, but this is not a good solution because HDFS has high latency. Additionally, storing offsets for each batch in HDFS leads to excessive small file problems.
-
You can store offsets in ZK, but using ZK as storage is also not a wise choice, as ZK is not suitable for frequent read/write operations
Redis Offset Management
Spark Streaming Offset External Storage Management Implementation Plan
Core Function Implementation
-
Load Offsets on Program Startup
- Create a custom OffsetManager class responsible for interaction with external storage
- Call OffsetManager.getSavedOffsets() method when Spark Streaming application starts
- This method reads saved offsets from external storage (such as Kafka, ZooKeeper, Redis, MySQL, or HBase)
-
Update Offsets After Batch Processing
- After processing data in foreachRDD transformation
- Get current batch’s offset information
- Call OffsetManager.saveOffsets() method to save offsets to external storage
External Storage Selection and Implementation
-
Kafka’s Own Storage (Simplest Solution)
- Use Kafka’s built-in __consumer_offsets topic
- Auto-commit via enable.auto.commit=true
- Disadvantage: cannot precisely control commit timing, may have consumed but not committed cases
-
ZooKeeper
- Create /consumers/[group_id]/offsets/[topic]/[partition] nodes
- Use Curator framework to operate ZK
-
Redis
- Use Hash structure storage: key = “offset:[topic]:[group_id]”, field = “[partition]”, value = offset
- Supports atomic operations and TTL settings
-
MySQL/HBase
- Create table structure: (topic, group_id, partition, offset, timestamp)
- Use transactions to ensure data consistency
Notes
- Atomicity Guarantee: Ensure atomicity between offset save and data processing
- Idempotency Design: Offset save operations should support repeated execution
- Error Handling: Implement retry mechanisms for temporary unavailability of storage systems
- Performance Optimization: Batch write offsets to reduce I/O operations
- Monitoring and Alerting: Establish offset lag monitoring mechanism
Complete Implementation Example
object OffsetManager {
// Get saved offsets from Redis
def getSavedOffsets(topic: String, partitionCount: Int): Option[Map[TopicPartition, Long]] = {
val jedis = RedisPool.getResource
try {
val offsets = (0 until partitionCount).map { partition =>
val offset = jedis.hget(s"offset:$topic:$groupId", partition.toString)
new TopicPartition(topic, partition) -> offset.toLong
}.toMap
Some(offsets)
} catch {
case e: Exception => None
} finally {
jedis.close()
}
}
// Save offsets to Redis
def saveOffsets(offsetRanges: Array[OffsetRange]): Unit = {
val jedis = RedisPool.getResource
try {
val pipeline = jedis.pipelined()
offsetRanges.foreach { offsetRange =>
pipeline.hset(
s"offset:${offsetRange.topic}:$groupId",
offsetRange.partition.toString,
offsetRange.untilOffset.toString
)
}
pipeline.sync()
} finally {
jedis.close()
}
}
}
Redis Managing Offsets:
- Data Structure Selection: Hash, Key, Field, Value
- Key: kafka:topic:topicName:groupId
- Value: offset
- Get saved offsets from Redis
- After consuming data, save offsets to Redis
Custom Offsets: Read from Kafka, Print Offsets After Processing
package icu.wzk
object kafkaDStream2 {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf()
.setAppName("KafkaDStream2")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(2))
val kafkaParams: Map[String, Object] = getKafkaConsumerParameters("wzkicu")
val topics: Array[String] = Array("spark_streaming_test01")
// Get Kafka data from specified position
val offsets: collection.Map[TopicPartition, Long] = Map(
new TopicPartition("spark_streaming_test01",0) -> 100,
)
// Get data from Kafka
val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)
)
// DStream output
dstream.foreachRDD {
(rdd, time) => {
println(s"=========== rdd.count = ${rdd.count()}, time = $time ==============")
}
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition {
iter => val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic}, ${o.partition}, ${o.fromOffset}, ${o.untilOffset}")
}
}
ssc.start()
ssc.awaitTermination()
}
private def getKafkaConsumerParameters(groupId: String): Map[String, Object] = {
Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "h121.wzk.icu:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
)
}
}