本文是大数据系列第 88 篇,深入介绍 Spark Streaming 两类有状态计算:窗口聚合操作和跨批次状态追踪,两者均需配置 Checkpoint。

完整图文版(含截图):CSDN 原文 | 掘金

窗口操作原理

窗口操作在多个批次的数据上进行聚合,而不仅仅是当前批次,适合实现滑动统计、热词排行等场景。

两个核心参数:

  • 窗口长度(windowDuration):包含多少时间范围内的批次数据
  • 滑动间隔(slideDuration):多久重新计算一次

约束:两个参数都必须是 batchDuration 的整数倍。

批次间隔:2s
窗口长度:20s → 包含最近 10 个批次的数据
滑动间隔:10s → 每 10 秒输出一次结果

reduceByWindow:字符串拼接示例

val lines = ssc.socketTextStream("localhost", 9999)

// 20 秒窗口,10 秒滑动,拼接字符串
val res1 = lines.reduceByWindow(_ + " " + _, Seconds(20), Seconds(10))
res1.print()

// 对数字流求和
val res3 = lines.map(_.toInt)
  .reduceByWindow(_ + _, Seconds(20), Seconds(10))
res3.print()

reduceByKeyAndWindow:实时热词统计

val conf = new SparkConf().setAppName("HotWordStats").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(2))
ssc.checkpoint("checkpoint")

val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split("\\s+"))
val pairs = words.map(x => (x, 1))

// 每 10 秒统计最近 20 秒内的词频
val wordCounts = pairs.reduceByKeyAndWindow(
  (a: Int, b: Int) => a + b,
  Seconds(20),
  Seconds(10),
  2  // 并行度
)
wordCounts.print()

优化版本(带减函数,需要 Checkpoint):

// 使用加减函数,避免重复计算滑出窗口的数据
val optimized = pairs.reduceByKeyAndWindow(
  _ + _,   // 加入新批次
  _ - _,   // 移除旧批次
  Seconds(20),
  Seconds(10),
  2
)

带减函数的版本通过增量计算(新窗口 = 旧窗口 + 新进入批次 - 滑出批次)大幅提升性能。

状态追踪:updateStateByKey

updateStateByKey 为每个 key 维护一个跨批次的持久状态,常用于累计统计。

val updateFunc = (currValues: Seq[Int], prevState: Option[Int]) => {
  val currentCount = currValues.sum         // 本批次新增计数
  val previousCount = prevState.getOrElse(0) // 历史累计值
  Some(currentCount + previousCount)         // 更新后的状态
}

// 需要先设置 checkpoint
ssc.checkpoint("checkpoint")

val stateDStream = wordDStream.updateStateByKey[Int](updateFunc)
stateDStream.print()

// 持久化到 HDFS
stateDStream.repartition(1).saveAsTextFiles("output/wordcount")

局限性: 每次批次处理时,即便某个 key 本批次没有数据,也会输出其状态,随时间增长 Checkpoint 开销显著增大。

优化方案:mapWithState

mapWithStateupdateStateByKey 的改进版,只处理本批次有更新的 key,减少 Checkpoint 存储量。

def mappingFunction(
    key: String,
    one: Option[Int],
    state: State[Int]): (String, Int) = {
  val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
  state.update(sum)
  (key, sum)
}

val spec = StateSpec.function(mappingFunction _)
val resultDStream = wordDStream.mapWithState(spec)

resultDStream.cache()
resultDStream.repartition(1).saveAsTextFiles("output/mapwithstate")

ssc.start()
ssc.awaitTermination()

两种状态操作对比

特性updateStateByKeymapWithState
输出范围所有历史 key仅本批次有变化的 key
Checkpoint 开销大(全量状态)小(增量)
性能较低较高
适用场景需要完整状态快照增量更新、高吞吐

测试数据源

object SocketWithWindow {
  def main(args: Array[String]): Unit = {
    val ss = new ServerSocket(9999)
    val socket = ss.accept()
    var i = 0
    while (true) {
      i += 1
      val out = new PrintWriter(socket.getOutputStream)
      out.println(i)
      out.flush()
      Thread.sleep(1000)
    }
  }
}

窗口操作和状态追踪都需要配置 Checkpoint 目录,生产环境建议指向 HDFS 路径以保证持久化。