本文是大数据系列第 89 篇,深入对比 Spark Streaming 整合 Kafka 的两种核心模式,重点讲解 Direct 模式的生产实践。
两种整合模式概述
Spark Streaming 与 Kafka 的整合经历了两代 API:
- Receiver 模式(Kafka 0.8):基于 Kafka 高级消费者 API,由 Receiver 拉取数据到 Executor 内存
- Direct 模式(Kafka 0.10+):无 Receiver,Spark 直接读取 Kafka 分区,业界生产标准
Maven 依赖
<!-- Direct 模式使用 kafka-0-10 包 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- 注意:不要手动添加 kafka-clients,已传递依赖包含 -->
Receiver 模式详解
架构原理
每个 Receiver 作为一个长期运行的 Task 在 Executor 上执行,每 200ms(由 spark.streaming.blockInterval 控制)将积累的数据封装为 Block,写入 Executor 内存由 BlockManager 管理,底层生成 BlockRDD。
特点:
- 偏移量由 ZooKeeper 自动管理
- 默认副本因子为 2,保证容错
- 每个 Receiver 独占一个 CPU 核心
- 需要开启 WAL(Write-Ahead Log)才能保证数据不丢
开启 WAL: 在 SparkConf 中设置 spark.streaming.receiver.writeAheadLog.enable = true,数据和 Driver 元数据写入 HDFS,代价是 10-20% 的磁盘 I/O 增加。
Direct 模式详解
架构原理
Direct 模式不依赖 Receiver,Spark 在每个批次开始时直接查询 Kafka 的最新偏移量,生成的 KafkaRDD 分区数与 Kafka Topic 分区数一一对应。
优势:
- 无 Receiver,不占用额外 CPU 核心
- 分区与 Kafka 一一对应,并行度自动匹配
- 手动管理偏移量,实现 Exactly-Once 语义
- 无需 WAL,降低 I/O 开销
Kafka Producer 实现
package icu.wzk
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import java.util.Properties
object KafkaProducerTest {
def main(args: Array[String]): Unit = {
val brokers = "h121.wzk.icu:9092"
val topic = "spark_streaming_test01"
val prop = new Properties()
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
classOf[StringSerializer].getName)
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
classOf[StringSerializer].getName)
val producer = new KafkaProducer[String, String](prop)
for (i <- 1 to 1000) {
val msg = new ProducerRecord[String, String](topic, i.toString, i.toString)
producer.send(msg)
println(s"Sent: $i")
Thread.sleep(100)
}
producer.close()
}
}
Direct 模式消费者实现
package icu.wzk
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
object KafkaDStream1 {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val conf = new SparkConf()
.setAppName("KafkaDStream1")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(2))
val kafkaParams = getKafkaConsumerParameters("wzkicu")
val topics = Array("spark_streaming_test01")
val dstream = KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
dstream.foreachRDD { (rdd, time) =>
if (!rdd.isEmpty()) {
println(s"rdd.count = ${rdd.count()}, time = $time")
// 处理完成后手动提交偏移量
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
dstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
}
ssc.start()
ssc.awaitTermination()
}
private def getKafkaConsumerParameters(groupId: String): Map[String, Object] = {
Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "h121.wzk.icu:9092",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
)
}
}
关键配置说明:
ENABLE_AUTO_COMMIT_CONFIG = false:禁止 Kafka 自动提交偏移量,由应用手动控制AUTO_OFFSET_RESET_CONFIG = earliest:首次消费从最早可用消息开始LocationStrategies.PreferConsistent:均匀分配 Kafka 分区到 Executor
两种模式对比
| 特性 | Receiver 模式 | Direct 模式 |
|---|---|---|
| 并行度 | 受 Receiver 数量限制 | 与 Kafka 分区数一致 |
| 偏移量管理 | ZooKeeper 自动管理 | 应用手动控制 |
| 语义保证 | At-Least-Once(需 WAL) | Exactly-Once(可实现) |
| CPU 开销 | Receiver 独占核心 | 无额外开销 |
| Kafka 版本 | 0.8.x | 0.10+ |
| 生产推荐 | 不推荐 | 推荐 |
测试结果
Producer 以 100ms 间隔发送 1000 条消息后,Consumer 接收情况:
rdd.count = 1000, time = ... # 初始积压批次
rdd.count = 15, time = ... # 后续正常批次(每 2 秒约 20 条)
rdd.count = 9, time = ...
Direct 模式是 Spark Streaming 消费 Kafka 的业界标准实践,配合手动偏移量提交和幂等写入下游,可以实现完整的端到端 Exactly-Once 语义。