Offset Management Mechanism Details
Basic Concepts
When Spark Streaming integrates with Kafka, manages data consumption position through Kafka’s offset mechanism. Each Kafka Topic is divided into one or multiple Partitions, messages within each partition are stored in order. Offset is a monotonically increasing 64-bit integer used to identify precise position of messages in partition.
Importance of Offset Management
-
Data Consistency Guarantee: Proper offset management ensures each message is processed exactly once (Exactly-once semantics), avoiding data loss or duplicate processing.
-
Fault Recovery Mechanism: When application fails, persisted offset can recover processing from last successfully processed position, avoiding reprocessing already processed data, ensuring no unprocessed data is missed.
Offset Storage Methods
Spark Streaming provides multiple offset storage solutions:
1. Checkpoint Mechanism
ssc.checkpoint("hdfs://path/to/checkpoint")
Saves offset together with application state to HDFS, simple and easy to use but not flexible enough.
2. External Storage System
Common external storage choices include: Zookeeper, HBase, Redis, Kafka itself (0.10+ version)
3. Manual Management
Most flexible, requires developer to implement storage logic themselves.
Best Practices
- Commit Timing: Should commit offset after data is successfully processed and persisted
- Atomicity Guarantee: Offset commit and data processing should maintain atomicity
- Monitoring Mechanism: Implement offset lag monitoring, timely discover processing bottlenecks
- Testing Plan: Regularly test fault recovery process
Common Problem Handling
- Duplicate Consumption: Ensure data is persisted before offset commit, implement idempotent processing logic
- Data Loss: Avoid committing offset before processing, add retry mechanism
- Offset Reset: Reasonably configure auto.offset.reset parameter, explicitly specify starting offset on first startup
Using Redis to Manage Offsets
As an efficient in-memory database, Redis is commonly used to store Kafka offsets in Spark Streaming. By manually managing offsets, you can store current batch’s Kafka offset to Redis after each batch data processing.
Implementation Steps
- Get Offsets from Redis: When the application starts, read the previously processed offsets from Redis and start consuming Kafka data from these offsets
- Process Data: Process the data consumed from Kafka through Spark Streaming
- Save Offsets to Redis: After processing each batch of data, store the latest offsets to Redis
Custom Offsets Implementation
Add Jedis dependency:
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
Main code logic:
package icu.wzk
object KafkaDStream3 {
def main(args: Array[String]): Unit = {
Logger.getLogger("args").setLevel(Level.ERROR)
val conf = new SparkConf()
.setAppName("KafkaDStream3")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
val groupId: String = "wzkicu"
val topics: Array[String] = Array("spark_streaming_test01")
val kafkaParams: Map[String, Object] = getKafkaConsumerParameters(groupId)
// Get Offsets from Kafka
val offsets: Map[TopicPartition, Long] = OffsetsRedisUtils.getOffsetsFromRedis(topics, groupId)
// Create DStream
val dstream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets)
)
// DStream transformation & output
dstream.foreachRDD {
(rdd, time) =>
if (!rdd.isEmpty()) {
println(s"====== rdd.count = ${rdd.count()}, time = $time =======")
// Save Offsets to Redis
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
OffsetsRedisUtils.saveOffsetsToRedis(offsetRanges, groupId)
}
}
ssc.start()
ssc.awaitTermination()
}
private def getKafkaConsumerParameters(groupId: String): Map[String, Object] = {
Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "h121.wzk.icu:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
)
}
}
Utility class encapsulation:
package icu.wzk
object OffsetsRedisUtils {
private val config = new JedisPoolConfig
private val redisHost = "h121.wzk.icu"
private val redisPort = 6379
config.setMaxTotal(30)
config.setMaxIdle(10)
private val pool = new JedisPool(config, redisHost, redisPort, 10000)
private val topicPrefix = "kafka:topic"
private def getKey(topic: String, groupId: String, prefix: String = topicPrefix): String = s"$prefix:$topic:$groupId"
private def getRedisConnection: Jedis = pool.getResource
// Get Offsets from Redis
def getOffsetsFromRedis(topics: Array[String], groupId: String): Map[TopicPartition, Long] = {
val jedis: Jedis = getRedisConnection
val offsets: Array[mutable.Map[TopicPartition, Long]] = topics.map {
topic =>
import scala.collection.JavaConverters._
jedis.hgetAll(getKey(topic, groupId))
.asScala
.map {
case (partition, offset) => new TopicPartition(topic, partition.toInt) -> offset.toLong
}
}
jedis.close()
offsets.flatten.toMap
}
// Save Offsets to Redis
def saveOffsetsToRedis(ranges: Array[OffsetRange], groupId: String): Unit = {
val jedis: Jedis = getRedisConnection
ranges
.map(range => (range.topic, range.partition -> range.untilOffset))
.groupBy(_._1)
.map {
case (topic, buffer) => (topic, buffer.map(_._2))
}
.foreach {
case (topic, partitionAndOffset) =>
val offsets: Array[(String, String)] = partitionAndOffset.map(elem => (elem._1.toString, elem._2.toString))
import scala.collection.JavaConverters._
jedis.hmset(getKey(topic, groupId), offsets.toMap.asJava)
}
jedis.close()
}
}
Through a complete offset management mechanism, you can ensure that Spark Streaming applications have high reliability and data consistency, meeting the requirements of critical business scenarios.