This is article 88 in the Big Data series, deeply introducing two types of stateful computing in Spark Streaming: window aggregation operations and cross-batch state tracking—both require Checkpoint configuration.
Window Operation Principle
Window operations aggregate data across multiple batches, not just the current batch—suitable for implementing sliding statistics, hot word rankings, etc.
Two Core Parameters:
- Window Length (windowDuration): How many batches of data within the time range to include
- Slide Interval (slideDuration): How often to recalculate
Constraint: Both parameters must be integer multiples of batchDuration.
Batch interval: 2s
Window length: 20s → Contains data from recent 10 batches
Slide interval: 10s → Output results every 10 seconds
reduceByWindow: String Concatenation Example
val lines = ssc.socketTextStream("localhost", 9999)
// 20-second window, 10-second slide, concatenate strings
val res1 = lines.reduceByWindow(_ + " " + _, Seconds(20), Seconds(10))
res1.print()
// Sum for numeric stream
val res3 = lines.map(_.toInt)
.reduceByWindow(_ + _, Seconds(20), Seconds(10))
res3.print()
reduceByKeyAndWindow: Real-Time Hot Word Statistics
val conf = new SparkConf().setAppName("HotWordStats").setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(2))
ssc.checkpoint("checkpoint")
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split("\\s+"))
val pairs = words.map(x => (x, 1))
// Count word frequency in recent 20 seconds, every 10 seconds
val wordCounts = pairs.reduceByKeyAndWindow(
(a: Int, b: Int) => a + b,
Seconds(20),
Seconds(10),
2 // Parallelism
)
wordCounts.print()
Optimized version (with subtract function, requires Checkpoint):
// Use add/subtract functions to avoid recalculating data sliding out of window
val optimized = pairs.reduceByKeyAndWindow(
_ + _, // Add new batch
_ - _, // Remove old batch
Seconds(20),
Seconds(10),
2
)
The version with subtract function significantly improves performance through incremental calculation (new window = old window + entering batch - exiting batch).
State Tracking: updateStateByKey
updateStateByKey maintains persistent state across batches for each key, commonly used for cumulative statistics.
val updateFunc = (currValues: Seq[Int], prevState: Option[Int]) => {
val currentCount = currValues.sum // New count from this batch
val previousCount = prevState.getOrElse(0) // Historical cumulative value
Some(currentCount + previousCount) // Updated state
}
// Need to set checkpoint first
ssc.checkpoint("checkpoint")
val stateDStream = wordDStream.updateStateByKey[Int](updateFunc)
stateDStream.print()
// Persist to HDFS
stateDStream.repartition(1).saveAsTextFiles("output/wordcount")
Limitation: During each batch processing, even if a key has no data in the current batch, it still outputs its state—Checkpoint overhead grows significantly over time.
Optimization: mapWithState
mapWithState is an improved version of updateStateByKey, only processes keys that have updates in the current batch, reducing Checkpoint storage.
def mappingFunction(
key: String,
one: Option[Int],
state: State[Int]): (String, Int) = {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
state.update(sum)
(key, sum)
}
val spec = StateSpec.function(mappingFunction _)
val resultDStream = wordDStream.mapWithState(spec)
resultDStream.cache()
resultDStream.repartition(1).saveAsTextFiles("output/mapwithstate")
ssc.start()
ssc.awaitTermination()
Two State Operations Comparison
| Characteristic | updateStateByKey | mapWithState |
|---|---|---|
| Output Scope | All historical keys | Only keys changed in current batch |
| Checkpoint Overhead | Large (full state) | Small (incremental) |
| Performance | Lower | Higher |
| Use Case | Need complete state snapshot | Incremental update, high throughput |
Test Data Source
object SocketWithWindow {
def main(args: Array[String]): Unit = {
val ss = new ServerSocket(9999)
val socket = ss.accept()
var i = 0
while (true) {
i += 1
val out = new PrintWriter(socket.getOutputStream)
out.println(i)
out.flush()
Thread.sleep(1000)
}
}
}
Window operations and state tracking both require Checkpoint directory configuration—in production, it’s recommended to point to HDFS paths for persistence.