本文是大数据系列第 88 篇,深入介绍 Spark Streaming 两类有状态计算:窗口聚合操作和跨批次状态追踪,两者均需配置 Checkpoint。
窗口操作原理
窗口操作在多个批次的数据上进行聚合,而不仅仅是当前批次,适合实现滑动统计、热词排行等场景。
两个核心参数:
- 窗口长度(windowDuration):包含多少时间范围内的批次数据
- 滑动间隔(slideDuration):多久重新计算一次
约束:两个参数都必须是 batchDuration 的整数倍。
批次间隔:2s
窗口长度:20s → 包含最近 10 个批次的数据
滑动间隔:10s → 每 10 秒输出一次结果
reduceByWindow:字符串拼接示例
val lines = ssc.socketTextStream("localhost", 9999)
// 20 秒窗口,10 秒滑动,拼接字符串
val res1 = lines.reduceByWindow(_ + " " + _, Seconds(20), Seconds(10))
res1.print()
// 对数字流求和
val res3 = lines.map(_.toInt)
.reduceByWindow(_ + _, Seconds(20), Seconds(10))
res3.print()
reduceByKeyAndWindow:实时热词统计
val conf = new SparkConf().setAppName("HotWordStats").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(2))
ssc.checkpoint("checkpoint")
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split("\\s+"))
val pairs = words.map(x => (x, 1))
// 每 10 秒统计最近 20 秒内的词频
val wordCounts = pairs.reduceByKeyAndWindow(
(a: Int, b: Int) => a + b,
Seconds(20),
Seconds(10),
2 // 并行度
)
wordCounts.print()
优化版本(带减函数,需要 Checkpoint):
// 使用加减函数,避免重复计算滑出窗口的数据
val optimized = pairs.reduceByKeyAndWindow(
_ + _, // 加入新批次
_ - _, // 移除旧批次
Seconds(20),
Seconds(10),
2
)
带减函数的版本通过增量计算(新窗口 = 旧窗口 + 新进入批次 - 滑出批次)大幅提升性能。
状态追踪:updateStateByKey
updateStateByKey 为每个 key 维护一个跨批次的持久状态,常用于累计统计。
val updateFunc = (currValues: Seq[Int], prevState: Option[Int]) => {
val currentCount = currValues.sum // 本批次新增计数
val previousCount = prevState.getOrElse(0) // 历史累计值
Some(currentCount + previousCount) // 更新后的状态
}
// 需要先设置 checkpoint
ssc.checkpoint("checkpoint")
val stateDStream = wordDStream.updateStateByKey[Int](updateFunc)
stateDStream.print()
// 持久化到 HDFS
stateDStream.repartition(1).saveAsTextFiles("output/wordcount")
局限性: 每次批次处理时,即便某个 key 本批次没有数据,也会输出其状态,随时间增长 Checkpoint 开销显著增大。
优化方案:mapWithState
mapWithState 是 updateStateByKey 的改进版,只处理本批次有更新的 key,减少 Checkpoint 存储量。
def mappingFunction(
key: String,
one: Option[Int],
state: State[Int]): (String, Int) = {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
state.update(sum)
(key, sum)
}
val spec = StateSpec.function(mappingFunction _)
val resultDStream = wordDStream.mapWithState(spec)
resultDStream.cache()
resultDStream.repartition(1).saveAsTextFiles("output/mapwithstate")
ssc.start()
ssc.awaitTermination()
两种状态操作对比
| 特性 | updateStateByKey | mapWithState |
|---|---|---|
| 输出范围 | 所有历史 key | 仅本批次有变化的 key |
| Checkpoint 开销 | 大(全量状态) | 小(增量) |
| 性能 | 较低 | 较高 |
| 适用场景 | 需要完整状态快照 | 增量更新、高吞吐 |
测试数据源
object SocketWithWindow {
def main(args: Array[String]): Unit = {
val ss = new ServerSocket(9999)
val socket = ss.accept()
var i = 0
while (true) {
i += 1
val out = new PrintWriter(socket.getOutputStream)
out.println(i)
out.flush()
Thread.sleep(1000)
}
}
}
窗口操作和状态追踪都需要配置 Checkpoint 目录,生产环境建议指向 HDFS 路径以保证持久化。