本文是大数据系列第 85 篇,介绍 Spark 实时计算两代框架的架构思想与演进背景。
实时计算背景
在批处理领域,Spark Core 的 RDD 和 SparkSQL 已经非常成熟。但业务场景中大量存在实时需求:
- 实时监控与告警
- 日志流式分析
- 金融交易实时风控
- 社交媒体情感分析
Spark 先后推出两代流处理框架来解决这些问题:Spark Streaming(DStream) 和 Structured Streaming。
第一代:DStream 微批处理
核心概念
DStream(Discretized Stream,离散化流) 是 Spark Streaming 的高级抽象,将连续数据流表示为一系列连续的 RDD,每个 RDD 包含特定时间间隔内到达的数据。
支持的数据源
- 外部数据源:Kafka、Flume、HDFS/S3、Socket、Twitter API
- 内部数据源:对已有 DStream 进行 Transformation 生成新的 DStream
支持的操作
| 操作类型 | 示例 |
|---|---|
| Transformation | map、filter、reduceByKey、join |
| 输出操作 | 写入文件系统、数据库、Dashboard |
| 窗口计算 | window、reduceByKeyAndWindow |
微批架构示意
数据流(Kafka/Socket/...)
↓
Spark Streaming Receiver
↓
按 Batch Interval 切分
↓
RDD(时间片1) → 处理 → 输出
RDD(时间片2) → 处理 → 输出
RDD(时间片3) → 处理 → 输出
...
Batch Interval 配置
Batch Interval 是 DStream 的核心参数,决定每次批处理的时间窗口大小:
| 场景 | 推荐间隔 |
|---|---|
| 实时监控告警 | 500ms ~ 1s |
| 日志分析 | 5s ~ 10s |
| 金融交易处理 | 1s ~ 2s |
权衡:间隔越小,延迟越低,但系统开销越大;间隔越大,吞吐量更高,但延迟增加。
DStream 代码示例
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
val conf = new SparkConf().setMaster("local[2]").setAppName("StreamingDemo")
val ssc = new StreamingContext(conf, Seconds(1))
// 从 Kafka 读取
val kafkaStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
// 实时 WordCount
val wordCounts = kafkaStream
.flatMap(record => record.value().split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
DStream 的优势
- Exactly-Once 语义:处理且仅处理一次,保证数据精确性
- 与批处理无缝集成:底层同为 RDD,切换成本低
- 高吞吐:每秒可处理数百万条记录
- 自动容错:失败时自动恢复
DStream 的局限性
随着业务复杂度上升,DStream 暴露出三个核心问题:
1. EventTime 处理困难
DStream 只能按 BatchTime(数据到达 Spark 的时间)处理,无法基于 EventTime(事件实际发生时间)进行窗口计算。
实际场景中,由于网络延迟,数据可能乱序到达。DStream 无法优雅处理这种情况。
2. 批流 API 不一致
批处理使用 SparkSQL/DataFrame API,流处理使用 DStream API,开发者需要学习和维护两套代码体系。
3. 端到端一致性由用户保障
Exactly-Once 语义需要用户手动管理 Kafka offset、幂等写入等,实现复杂。
第二代:Structured Streaming
核心设计思想
Structured Streaming 将流数据源抽象为无界表(Unbounded Table):
Kafka 消息流 → 视为不断追加行的无界表
SQL/DataFrame 查询 → 操作该无界表
增量计算 → 将新增数据的结果写入结果表
这样批处理和流处理可以使用完全相同的 SparkSQL/DataFrame API。
相比 DStream 的改进
| 维度 | DStream | Structured Streaming |
|---|---|---|
| API | 独立的 DStream API | 统一的 DataFrame/SQL API |
| EventTime 支持 | 不支持 | 原生支持,含水位线(Watermark) |
| 端到端 Exactly-Once | 需手动实现 | 框架内置保障 |
| 查询优化 | 无 | Catalyst 优化器 |
| 延迟模式 | 微批 | 微批 + 连续处理(Continuous) |
Structured Streaming 代码示例
val spark = SparkSession.builder()
.appName("StructuredStreamingDemo")
.master("local[*]")
.getOrCreate()
// 从 Kafka 读取流数据(与批处理 API 几乎一致)
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "events")
.load()
// 解析并处理(与批处理完全一样的 DataFrame 操作)
val wordCounts = df
.selectExpr("CAST(value AS STRING) AS word")
.groupBy("word")
.count()
// 输出到控制台(流式写出)
wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
.awaitTermination()
EventTime 与水位线
// 基于 EventTime 的窗口聚合,容忍 10 分钟的数据延迟
df.withWatermark("event_time", "10 minutes")
.groupBy(window($"event_time", "5 minutes"), $"user_id")
.count()
性能调优建议
- 并行度:合理设置 Kafka 分区数与 Spark 分区数,保持匹配
- 内存管理:增加 Executor 内存防止 OOM,使用
--executor-memory配置 - 背压机制:开启
spark.streaming.backpressure.enabled = true,防止数据积压 - Checkpoint:配置 Checkpoint 目录,保障故障恢复
// 开启背压
spark.conf.set("spark.streaming.backpressure.enabled", "true")
// 配置 Checkpoint(Structured Streaming 必须)
query.writeStream
.option("checkpointLocation", "/path/to/checkpoint")
.start()
选型建议
- 新项目:优先选择 Structured Streaming,API 统一,功能更强
- 已有 DStream 项目:评估迁移成本,逐步向 Structured Streaming 迁移
- 超低延迟(< 100ms):考虑 Structured Streaming 的 Continuous Processing 模式
Structured Streaming 代表了 Spark 流处理的未来方向,随着版本迭代其功能已日趋完善,是构建生产级实时数据管道的推荐选择。