本文是大数据系列第 89 篇,深入对比 Spark Streaming 整合 Kafka 的两种核心模式,重点讲解 Direct 模式的生产实践。

完整图文版(含截图):CSDN 原文 | 掘金

两种整合模式概述

Spark Streaming 与 Kafka 的整合经历了两代 API:

  • Receiver 模式(Kafka 0.8):基于 Kafka 高级消费者 API,由 Receiver 拉取数据到 Executor 内存
  • Direct 模式(Kafka 0.10+):无 Receiver,Spark 直接读取 Kafka 分区,业界生产标准

Maven 依赖

<!-- Direct 模式使用 kafka-0-10 包 -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
  <version>${spark.version}</version>
</dependency>
<!-- 注意:不要手动添加 kafka-clients,已传递依赖包含 -->

Receiver 模式详解

架构原理

每个 Receiver 作为一个长期运行的 Task 在 Executor 上执行,每 200ms(由 spark.streaming.blockInterval 控制)将积累的数据封装为 Block,写入 Executor 内存由 BlockManager 管理,底层生成 BlockRDD。

特点:

  • 偏移量由 ZooKeeper 自动管理
  • 默认副本因子为 2,保证容错
  • 每个 Receiver 独占一个 CPU 核心
  • 需要开启 WAL(Write-Ahead Log)才能保证数据不丢

开启 WAL: 在 SparkConf 中设置 spark.streaming.receiver.writeAheadLog.enable = true,数据和 Driver 元数据写入 HDFS,代价是 10-20% 的磁盘 I/O 增加。

Direct 模式详解

架构原理

Direct 模式不依赖 Receiver,Spark 在每个批次开始时直接查询 Kafka 的最新偏移量,生成的 KafkaRDD 分区数与 Kafka Topic 分区数一一对应。

优势:

  • 无 Receiver,不占用额外 CPU 核心
  • 分区与 Kafka 一一对应,并行度自动匹配
  • 手动管理偏移量,实现 Exactly-Once 语义
  • 无需 WAL,降低 I/O 开销

Kafka Producer 实现

package icu.wzk

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import java.util.Properties

object KafkaProducerTest {
  def main(args: Array[String]): Unit = {
    val brokers = "h121.wzk.icu:9092"
    val topic = "spark_streaming_test01"

    val prop = new Properties()
    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      classOf[StringSerializer].getName)
    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      classOf[StringSerializer].getName)

    val producer = new KafkaProducer[String, String](prop)
    for (i <- 1 to 1000) {
      val msg = new ProducerRecord[String, String](topic, i.toString, i.toString)
      producer.send(msg)
      println(s"Sent: $i")
      Thread.sleep(100)
    }
    producer.close()
  }
}

Direct 模式消费者实现

package icu.wzk

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

object KafkaDStream1 {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val conf = new SparkConf()
      .setAppName("KafkaDStream1")
      .setMaster("local[*]")

    val ssc = new StreamingContext(conf, Seconds(2))
    val kafkaParams = getKafkaConsumerParameters("wzkicu")
    val topics = Array("spark_streaming_test01")

    val dstream = KafkaUtils.createDirectStream(
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )

    dstream.foreachRDD { (rdd, time) =>
      if (!rdd.isEmpty()) {
        println(s"rdd.count = ${rdd.count()}, time = $time")
        // 处理完成后手动提交偏移量
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        dstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      }
    }

    ssc.start()
    ssc.awaitTermination()
  }

  private def getKafkaConsumerParameters(groupId: String): Map[String, Object] = {
    Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG      -> "h121.wzk.icu:9092",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.GROUP_ID_CONFIG               -> groupId,
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG      -> "earliest",
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG     -> (false: java.lang.Boolean)
    )
  }
}

关键配置说明:

  • ENABLE_AUTO_COMMIT_CONFIG = false:禁止 Kafka 自动提交偏移量,由应用手动控制
  • AUTO_OFFSET_RESET_CONFIG = earliest:首次消费从最早可用消息开始
  • LocationStrategies.PreferConsistent:均匀分配 Kafka 分区到 Executor

两种模式对比

特性Receiver 模式Direct 模式
并行度受 Receiver 数量限制与 Kafka 分区数一致
偏移量管理ZooKeeper 自动管理应用手动控制
语义保证At-Least-Once(需 WAL)Exactly-Once(可实现)
CPU 开销Receiver 独占核心无额外开销
Kafka 版本0.8.x0.10+
生产推荐不推荐推荐

测试结果

Producer 以 100ms 间隔发送 1000 条消息后,Consumer 接收情况:

rdd.count = 1000, time = ...  # 初始积压批次
rdd.count = 15, time = ...    # 后续正常批次(每 2 秒约 20 条)
rdd.count = 9, time = ...

Direct 模式是 Spark Streaming 消费 Kafka 的业界标准实践,配合手动偏移量提交和幂等写入下游,可以实现完整的端到端 Exactly-Once 语义。