本文是大数据系列第 85 篇,介绍 Spark 实时计算两代框架的架构思想与演进背景。

完整图文版(含截图):CSDN 原文 | 掘金

实时计算背景

在批处理领域,Spark Core 的 RDD 和 SparkSQL 已经非常成熟。但业务场景中大量存在实时需求:

  • 实时监控与告警
  • 日志流式分析
  • 金融交易实时风控
  • 社交媒体情感分析

Spark 先后推出两代流处理框架来解决这些问题:Spark Streaming(DStream)Structured Streaming


第一代:DStream 微批处理

核心概念

DStream(Discretized Stream,离散化流) 是 Spark Streaming 的高级抽象,将连续数据流表示为一系列连续的 RDD,每个 RDD 包含特定时间间隔内到达的数据。

支持的数据源

  • 外部数据源:Kafka、Flume、HDFS/S3、Socket、Twitter API
  • 内部数据源:对已有 DStream 进行 Transformation 生成新的 DStream

支持的操作

操作类型示例
TransformationmapfilterreduceByKeyjoin
输出操作写入文件系统、数据库、Dashboard
窗口计算windowreduceByKeyAndWindow

微批架构示意

数据流(Kafka/Socket/...)

  Spark Streaming Receiver

   按 Batch Interval 切分

   RDD(时间片1) → 处理 → 输出
   RDD(时间片2) → 处理 → 输出
   RDD(时间片3) → 处理 → 输出
         ...

Batch Interval 配置

Batch Interval 是 DStream 的核心参数,决定每次批处理的时间窗口大小:

场景推荐间隔
实时监控告警500ms ~ 1s
日志分析5s ~ 10s
金融交易处理1s ~ 2s

权衡:间隔越小,延迟越低,但系统开销越大;间隔越大,吞吐量更高,但延迟增加。

DStream 代码示例

import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._

val conf = new SparkConf().setMaster("local[2]").setAppName("StreamingDemo")
val ssc = new StreamingContext(conf, Seconds(1))

// 从 Kafka 读取
val kafkaStream = KafkaUtils.createDirectStream[String, String](
  ssc,
  LocationStrategies.PreferConsistent,
  ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)

// 实时 WordCount
val wordCounts = kafkaStream
  .flatMap(record => record.value().split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _)

wordCounts.print()

ssc.start()
ssc.awaitTermination()

DStream 的优势

  • Exactly-Once 语义:处理且仅处理一次,保证数据精确性
  • 与批处理无缝集成:底层同为 RDD,切换成本低
  • 高吞吐:每秒可处理数百万条记录
  • 自动容错:失败时自动恢复

DStream 的局限性

随着业务复杂度上升,DStream 暴露出三个核心问题:

1. EventTime 处理困难

DStream 只能按 BatchTime(数据到达 Spark 的时间)处理,无法基于 EventTime(事件实际发生时间)进行窗口计算。

实际场景中,由于网络延迟,数据可能乱序到达。DStream 无法优雅处理这种情况。

2. 批流 API 不一致

批处理使用 SparkSQL/DataFrame API,流处理使用 DStream API,开发者需要学习和维护两套代码体系。

3. 端到端一致性由用户保障

Exactly-Once 语义需要用户手动管理 Kafka offset、幂等写入等,实现复杂。


第二代:Structured Streaming

核心设计思想

Structured Streaming 将流数据源抽象为无界表(Unbounded Table)

Kafka 消息流 → 视为不断追加行的无界表
SQL/DataFrame 查询 → 操作该无界表
增量计算 → 将新增数据的结果写入结果表

这样批处理和流处理可以使用完全相同的 SparkSQL/DataFrame API

相比 DStream 的改进

维度DStreamStructured Streaming
API独立的 DStream API统一的 DataFrame/SQL API
EventTime 支持不支持原生支持,含水位线(Watermark)
端到端 Exactly-Once需手动实现框架内置保障
查询优化Catalyst 优化器
延迟模式微批微批 + 连续处理(Continuous)

Structured Streaming 代码示例

val spark = SparkSession.builder()
  .appName("StructuredStreamingDemo")
  .master("local[*]")
  .getOrCreate()

// 从 Kafka 读取流数据(与批处理 API 几乎一致)
val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "events")
  .load()

// 解析并处理(与批处理完全一样的 DataFrame 操作)
val wordCounts = df
  .selectExpr("CAST(value AS STRING) AS word")
  .groupBy("word")
  .count()

// 输出到控制台(流式写出)
wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()
  .awaitTermination()

EventTime 与水位线

// 基于 EventTime 的窗口聚合,容忍 10 分钟的数据延迟
df.withWatermark("event_time", "10 minutes")
  .groupBy(window($"event_time", "5 minutes"), $"user_id")
  .count()

性能调优建议

  1. 并行度:合理设置 Kafka 分区数与 Spark 分区数,保持匹配
  2. 内存管理:增加 Executor 内存防止 OOM,使用 --executor-memory 配置
  3. 背压机制:开启 spark.streaming.backpressure.enabled = true,防止数据积压
  4. Checkpoint:配置 Checkpoint 目录,保障故障恢复
// 开启背压
spark.conf.set("spark.streaming.backpressure.enabled", "true")

// 配置 Checkpoint(Structured Streaming 必须)
query.writeStream
  .option("checkpointLocation", "/path/to/checkpoint")
  .start()

选型建议

  • 新项目:优先选择 Structured Streaming,API 统一,功能更强
  • 已有 DStream 项目:评估迁移成本,逐步向 Structured Streaming 迁移
  • 超低延迟(< 100ms):考虑 Structured Streaming 的 Continuous Processing 模式

Structured Streaming 代表了 Spark 流处理的未来方向,随着版本迭代其功能已日趋完善,是构建生产级实时数据管道的推荐选择。