This is article 89 in the Big Data series, deeply comparing two core modes of Spark Streaming integration with Kafka, focusing on Direct mode production practices.

Two Integration Mode Overview

Spark Streaming integration with Kafka has gone through two generations of APIs:

  • Receiver Mode (Kafka 0.8): Based on Kafka’s high-level consumer API, Receiver pulls data into Executor memory
  • Direct Mode (Kafka 0.10+): No Receiver, Spark directly reads Kafka partitions—industry production standard

Maven Dependency

<!-- Direct mode uses kafka-0-10 package -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
  <version>${spark.version}</version>
</dependency>
<!-- Note: Don't manually add kafka-clients, transitive dependency included -->

Receiver Mode Deep Dive

Architecture Principle

Each Receiver runs as a long-running Task on an Executor—every 200ms (controlled by spark.streaming.blockInterval), accumulated data is packaged as Blocks, written to Executor memory managed by BlockManager, generating BlockRDD underneath.

Characteristics:

  • Offsets automatically managed by ZooKeeper
  • Default replication factor is 2 for fault tolerance
  • Each Receiver occupies one CPU core exclusively
  • Requires enabling WAL (Write-Ahead Log) to guarantee no data loss

Enable WAL: Set spark.streaming.receiver.writeAheadLog.enable = true in SparkConf—data and Driver metadata written to HDFS, cost is 10-20% increase in disk I/O.

Direct Mode Deep Dive

Architecture Principle

Direct mode doesn’t rely on Receiver—Spark directly queries Kafka’s latest offsets at the start of each batch, generated KafkaRDD partition count one-to-one corresponds with Kafka Topic partitions.

Advantages:

  • No Receiver, doesn’t occupy extra CPU cores
  • Partitions one-to-one with Kafka, parallelism automatically matches
  • Manual offset management, achieving Exactly-Once semantics
  • No WAL needed, lower I/O overhead

Kafka Producer Implementation

package icu.wzk

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import java.util.Properties

object KafkaProducerTest {
  def main(args: Array[String]): Unit = {
    val brokers = "h121.wzk.icu:9092"
    val topic = "spark_streaming_test01"

    val prop = new Properties()
    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      classOf[StringSerializer].getName)
    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      classOf[StringSerializer].getName)

    val producer = new KafkaProducer[String, String](prop)
    for (i <- 1 to 1000) {
      val msg = new ProducerRecord[String, String](topic, i.toString, i.toString)
      producer.send(msg)
      println(s"Sent: $i")
      Thread.sleep(100)
    }
    producer.close()
  }
}

Direct Mode Consumer Implementation

package icu.wzk

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

object KafkaDStream1 {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf()
      .setAppName("KafkaDStream1")
      .setMaster("local[*]")

    val ssc = new StreamingContext(conf, Seconds(2))
    val kafkaParams = getKafkaConsumerParameters("wzkicu")
    val topics = Array("spark_streaming_test01")

    val dstream = KafkaUtils.createDirectStream(
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )

    dstream.foreachRDD { (rdd, time) =>
      if (!rdd.isEmpty()) {
        println(s"rdd.count = ${rdd.count()}, time = $time")
        // Manually commit offsets after processing
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        dstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      }
    }

    ssc.start()
    ssc.awaitTermination()
  }

  private def getKafkaConsumerParameters(groupId: String): Map[String, Object] = {
    Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG      -> "h121.wzk.icu:9092",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.GROUP_ID_CONFIG               -> groupId,
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG      -> "earliest",
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG     -> (false: java.lang.Boolean)
    )
  }
}

Key configuration notes:

  • ENABLE_AUTO_COMMIT_CONFIG = false: Disable Kafka auto-commit, let application control manually
  • AUTO_OFFSET_RESET_CONFIG = earliest: First consumption starts from earliest available message
  • LocationStrategies.PreferConsistent: Evenly distribute Kafka partitions to Executors

Two Modes Comparison

CharacteristicReceiver ModeDirect Mode
ParallelismLimited by Receiver countMatches Kafka partition count
Offset ManagementZooKeeper auto-managementApplication manual control
Semantic GuaranteeAt-Least-Once (needs WAL)Exactly-Once (achievable)
CPU OverheadReceiver occupies coreNo extra overhead
Kafka Version0.8.x0.10+
Production RecommendationNot recommendedRecommended

Test Results

After Producer sends 1000 messages at 100ms intervals, Consumer receives:

rdd.count = 1000, time = ...  # Initial backlog batch
rdd.count = 15, time = ...    # Subsequent normal batches (~20 per 2 seconds)
rdd.count = 9, time = ...

Direct mode is the industry standard practice for Spark Streaming consuming Kafka. Combined with manual offset commit and idempotent writes to downstream, it can achieve complete end-to-end Exactly-Once semantics.