Stateful Transformations
There are two main types of stateful transformations:
- Window operations
- State tracking operations
Window Operations
Window Operations can set window size and slide window interval to dynamically get current Streaming status. Window-based operations compute results for the entire window by integrating results from multiple batches over a longer time range than StreamingContext’s batchDuration.
Window-based operations require two parameters:
- Window Length (Window Duration): Controls how many recent batches to compute data from
- Slide Duration (Slide Interval): Controls the interval for computing on new DStream
Both must be integer multiples of the batchDuration in StreamingContext
Case 1: SocketWithWindow (Data Sender)
package icu.wzk
import java.io.PrintWriter
import java.net.{ServerSocket, Socket}
object SocketWithWindow {
def main(args: Array[String]): Unit = {
val port = 9999
val ss = new ServerSocket(port)
val socket: Socket = ss.accept()
var i = 0
while (true) {
i += 1
val out = new PrintWriter(socket.getOutputStream)
out.println(i)
out.flush()
Thread.sleep(1000)
}
}
}
Case 2: Observe Window Data
- Observe window data
- Observe relationship between batchDuration, windowDuration, slideDuration
- Use window-related operations
package icu.wzk
object WindowDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("WindowDemo")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("WARN")
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
lines.foreachRDD {
(rdd, time) => {
println(s"rdd = ${rdd.id}; time = $time")
}
rdd.foreach(value => println(value))
}
// 20-second window length (DS contains data within window length range)
// 10-second slide interval (process data once every few time intervals)
val res1: DStream[String] = lines
.reduceByWindow(_ + " " + _, Seconds(20), Seconds(10))
res1.print()
val res2: DStream[String] = lines
.reduceByWindow(_ + _, Seconds(20), Seconds(10))
res2.print()
// Sum window elements
val res3: DStream[Int] = lines
.map(_.toInt)
.reduceByWindow(_ + _, Seconds(20), Seconds(10))
res3.print()
// Sum window elements
val res4 = res2.map(_.toInt).reduce(_ + _)
res4.print()
// Start program
ssc.start()
ssc.awaitTermination()
}
}
Case 3: Real-Time Hot Word Statistics
package icu.wzk
object HotWordStats {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("HotWordStats")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(2))
// Checkpoint setting can also be set to HDFS
ssc.sparkContext.setLogLevel("ERROR")
ssc.checkpoint("checkpoint")
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val words: DStream[String] = lines.flatMap(_.split("\\s+"))
val pairs: DStream[(String, Int)] = words.map(x => (x, 1))
// Use reduceByKeyAndWindow operator, count word frequency in recent 20 seconds every 10 seconds
val wordCounts1: DStream[(String, Int)] = pairs
.reduceByKeyAndWindow(
(a: Int, b: Int) => a + b, Seconds(20), Seconds(10), 2
)
wordCounts1.print()
// Requires CheckPoint support
val wordCounts2: DStream[(String, Int)] = pairs
.reduceByKeyAndWindow(
_ + _, _ - _, Seconds(20), Seconds(10), 2
)
wordCounts2.print()
// Run program
ssc.start()
ssc.awaitTermination()
}
}
State Tracking Operation: updateStateByKey
UpdateStateByKey’s main functions:
- Maintain a State for each Key in Streaming, state can be any type, can be custom objects, update function can also be custom
- Continuously update the Key’s state through update function. For each new batch, Spark Streaming will update state for Keys that already exist when using updateStateByKey
- Need to enable CheckPoint function when using updateStateByKey
Case 1: Global WordCount with updateStateByKey
package icu.wzk
object StateTracker1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("StateTracker1")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("ERROR")
ssc.checkpoint("checkpoint")
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val words: DStream[String] = lines.flatMap(_.split("\\s+"))
val wordDStream: DStream[(String, Int)] = words.map(x => (x, 1))
// Define state update function
// Function constant definition, return type is Some(Int), meaning latest state
// Function adds Key's Value set from current time interval to previous state to get latest state
val updateFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {
// Spark internally uses reduceByKey to reduce by Key, then passes Seq for current batch's Key, calculates current batch sum
val currentCount = currValues.sum
// Accumulated value
val previousCount = prevValueState.getOrElse(0)
Some(currentCount + previousCount)
}
val stateDStream: DStream[(String, Int)] = wordDStream.updateStateByKey[Int](updateFunc)
stateDStream.print()
// Save DStream to text file, generates many small files, one directory per batch
val outputDir = "output1"
stateDStream
.repartition(1)
.saveAsTextFiles(outputDir)
// Start running
ssc.start()
ssc.awaitTermination()
}
}
State Tracking Operation: mapWithState
mapWithState: Also used for global statistics of Key’s state. If there’s no data input, won’t return previous Key’s state, has an incremental feel. Benefit is only caring about Keys that have changes. If no data input, won’t return data for Keys without changes. Even with large data volume, checkpoint won’t occupy as much storage as updateStateByKey.
Case 2: Incremental WordCount with mapWithState
package icu.wzk
object StateTracker2 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName("StateTracker2")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(2))
ssc.sparkContext.setLogLevel("ERROR")
ssc.checkpoint("checkpoint")
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val words: DStream[String] = lines.flatMap(_.split("\\s+"))
val wordDStream: DStream[(String, Int)] = words.map(x => (x, 1))
def mappingFunction(key: String, one: Option[Int], state: State[Int]): (String, Int) = {
val sum: Int = one.getOrElse(0) + state.getOption.getOrElse(0)
state.update(sum)
(key, sum)
}
val spec = StateSpec.function(mappingFunction _)
val resultDStream: DStream[(String, Int)] = wordDStream.mapWithState(spec)
resultDStream.cache()
// Save DStream to text file, generates many small files, one directory per batch
val outputDir = "output2"
resultDStream.repartition(1).saveAsTextFiles(outputDir)
ssc.start()
ssc.awaitTermination()
}
}