This is article 88 in the Big Data series, deeply introducing two types of stateful computing in Spark Streaming: window aggregation operations and cross-batch state tracking—both require Checkpoint configuration.

Window Operation Principle

Window operations aggregate data across multiple batches, not just the current batch—suitable for implementing sliding statistics, hot word rankings, etc.

Two Core Parameters:

  • Window Length (windowDuration): How many batches of data within the time range to include
  • Slide Interval (slideDuration): How often to recalculate

Constraint: Both parameters must be integer multiples of batchDuration.

Batch interval: 2s
Window length: 20s → Contains data from recent 10 batches
Slide interval: 10s → Output results every 10 seconds

reduceByWindow: String Concatenation Example

val lines = ssc.socketTextStream("localhost", 9999)

// 20-second window, 10-second slide, concatenate strings
val res1 = lines.reduceByWindow(_ + " " + _, Seconds(20), Seconds(10))
res1.print()

// Sum for numeric stream
val res3 = lines.map(_.toInt)
  .reduceByWindow(_ + _, Seconds(20), Seconds(10))
res3.print()

reduceByKeyAndWindow: Real-Time Hot Word Statistics

val conf = new SparkConf().setAppName("HotWordStats").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(2))
ssc.checkpoint("checkpoint")

val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split("\\s+"))
val pairs = words.map(x => (x, 1))

// Count word frequency in recent 20 seconds, every 10 seconds
val wordCounts = pairs.reduceByKeyAndWindow(
  (a: Int, b: Int) => a + b,
  Seconds(20),
  Seconds(10),
  2  // Parallelism
)
wordCounts.print()

Optimized version (with subtract function, requires Checkpoint):

// Use add/subtract functions to avoid recalculating data sliding out of window
val optimized = pairs.reduceByKeyAndWindow(
  _ + _,   // Add new batch
  _ - _,   // Remove old batch
  Seconds(20),
  Seconds(10),
  2
)

The version with subtract function significantly improves performance through incremental calculation (new window = old window + entering batch - exiting batch).

State Tracking: updateStateByKey

updateStateByKey maintains persistent state across batches for each key, commonly used for cumulative statistics.

val updateFunc = (currValues: Seq[Int], prevState: Option[Int]) => {
  val currentCount = currValues.sum         // New count from this batch
  val previousCount = prevState.getOrElse(0) // Historical cumulative value
  Some(currentCount + previousCount)         // Updated state
}

// Need to set checkpoint first
ssc.checkpoint("checkpoint")

val stateDStream = wordDStream.updateStateByKey[Int](updateFunc)
stateDStream.print()

// Persist to HDFS
stateDStream.repartition(1).saveAsTextFiles("output/wordcount")

Limitation: During each batch processing, even if a key has no data in the current batch, it still outputs its state—Checkpoint overhead grows significantly over time.

Optimization: mapWithState

mapWithState is an improved version of updateStateByKey, only processes keys that have updates in the current batch, reducing Checkpoint storage.

def mappingFunction(
    key: String,
    one: Option[Int],
    state: State[Int]): (String, Int) = {
  val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
  state.update(sum)
  (key, sum)
}

val spec = StateSpec.function(mappingFunction _)
val resultDStream = wordDStream.mapWithState(spec)

resultDStream.cache()
resultDStream.repartition(1).saveAsTextFiles("output/mapwithstate")

ssc.start()
ssc.awaitTermination()

Two State Operations Comparison

CharacteristicupdateStateByKeymapWithState
Output ScopeAll historical keysOnly keys changed in current batch
Checkpoint OverheadLarge (full state)Small (incremental)
PerformanceLowerHigher
Use CaseNeed complete state snapshotIncremental update, high throughput

Test Data Source

object SocketWithWindow {
  def main(args: Array[String]): Unit = {
    val ss = new ServerSocket(9999)
    val socket = ss.accept()
    var i = 0
    while (true) {
      i += 1
      val out = new PrintWriter(socket.getOutputStream)
      out.println(i)
      out.flush()
      Thread.sleep(1000)
    }
  }
}

Window operations and state tracking both require Checkpoint directory configuration—in production, it’s recommended to point to HDFS paths for persistence.