This is article 89 in the Big Data series, deeply comparing two core modes of Spark Streaming integration with Kafka, focusing on Direct mode production practices.
Two Integration Mode Overview
Spark Streaming integration with Kafka has gone through two generations of APIs:
- Receiver Mode (Kafka 0.8): Based on Kafka’s high-level consumer API, Receiver pulls data into Executor memory
- Direct Mode (Kafka 0.10+): No Receiver, Spark directly reads Kafka partitions—industry production standard
Maven Dependency
<!-- Direct mode uses kafka-0-10 package -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Note: Don't manually add kafka-clients, transitive dependency included -->
Receiver Mode Deep Dive
Architecture Principle
Each Receiver runs as a long-running Task on an Executor—every 200ms (controlled by spark.streaming.blockInterval), accumulated data is packaged as Blocks, written to Executor memory managed by BlockManager, generating BlockRDD underneath.
Characteristics:
- Offsets automatically managed by ZooKeeper
- Default replication factor is 2 for fault tolerance
- Each Receiver occupies one CPU core exclusively
- Requires enabling WAL (Write-Ahead Log) to guarantee no data loss
Enable WAL: Set spark.streaming.receiver.writeAheadLog.enable = true in SparkConf—data and Driver metadata written to HDFS, cost is 10-20% increase in disk I/O.
Direct Mode Deep Dive
Architecture Principle
Direct mode doesn’t rely on Receiver—Spark directly queries Kafka’s latest offsets at the start of each batch, generated KafkaRDD partition count one-to-one corresponds with Kafka Topic partitions.
Advantages:
- No Receiver, doesn’t occupy extra CPU cores
- Partitions one-to-one with Kafka, parallelism automatically matches
- Manual offset management, achieving Exactly-Once semantics
- No WAL needed, lower I/O overhead
Kafka Producer Implementation
package icu.wzk
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import java.util.Properties
object KafkaProducerTest {
def main(args: Array[String]): Unit = {
val brokers = "h121.wzk.icu:9092"
val topic = "spark_streaming_test01"
val prop = new Properties()
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
classOf[StringSerializer].getName)
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
classOf[StringSerializer].getName)
val producer = new KafkaProducer[String, String](prop)
for (i <- 1 to 1000) {
val msg = new ProducerRecord[String, String](topic, i.toString, i.toString)
producer.send(msg)
println(s"Sent: $i")
Thread.sleep(100)
}
producer.close()
}
}
Direct Mode Consumer Implementation
package icu.wzk
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
object KafkaDStream1 {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf()
.setAppName("KafkaDStream1")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(2))
val kafkaParams = getKafkaConsumerParameters("wzkicu")
val topics = Array("spark_streaming_test01")
val dstream = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
dstream.foreachRDD { (rdd, time) =>
if (!rdd.isEmpty()) {
println(s"rdd.count = ${rdd.count()}, time = $time")
// Manually commit offsets after processing
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
dstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
}
ssc.start()
ssc.awaitTermination()
}
private def getKafkaConsumerParameters(groupId: String): Map[String, Object] = {
Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "h121.wzk.icu:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
)
}
}
Key configuration notes:
ENABLE_AUTO_COMMIT_CONFIG = false: Disable Kafka auto-commit, let application control manuallyAUTO_OFFSET_RESET_CONFIG = earliest: First consumption starts from earliest available messageLocationStrategies.PreferConsistent: Evenly distribute Kafka partitions to Executors
Two Modes Comparison
| Characteristic | Receiver Mode | Direct Mode |
|---|---|---|
| Parallelism | Limited by Receiver count | Matches Kafka partition count |
| Offset Management | ZooKeeper auto-management | Application manual control |
| Semantic Guarantee | At-Least-Once (needs WAL) | Exactly-Once (achievable) |
| CPU Overhead | Receiver occupies core | No extra overhead |
| Kafka Version | 0.8.x | 0.10+ |
| Production Recommendation | Not recommended | Recommended |
Test Results
After Producer sends 1000 messages at 100ms intervals, Consumer receives:
rdd.count = 1000, time = ... # Initial backlog batch
rdd.count = 15, time = ... # Subsequent normal batches (~20 per 2 seconds)
rdd.count = 9, time = ...
Direct mode is the industry standard practice for Spark Streaming consuming Kafka. Combined with manual offset commit and idempotent writes to downstream, it can achieve complete end-to-end Exactly-Once semantics.