有状态转换
有状态转换主要有两种:
- 窗口操作
- 状态跟踪操作
窗口操作
Window Operations 可以设置窗口大小和滑动窗口间隔来动态获取当前Streaming的状态。基于窗口的操作会在一个比 StreamingContext 的 batchDuration(批次间隔)更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。
基于窗口的操作需要两个参数:
- 窗口长度(Window Duration):控制每次计算最近的多少个批次的数据
- 滑动间隔(Slide Duration):用来控制对新的 DStream 进行计算的间隔
两者都必须是StreamingContext中批次间隔(batchDuration)的整数倍
案例1:SocketWithWindow(数据发送端)
package icu.wzk
import java.io.PrintWriter
import java.net.{ServerSocket, Socket}
object SocketWithWindow {
def main(args: Array[String]): Unit = {
val port = 9999
val ss = new ServerSocket(port)
val socket: Socket = ss.accept()
var i = 0
while (true) {
i += 1
val out = new PrintWriter(socket.getOutputStream)
out.println(i)
out.flush()
Thread.sleep(1000)
}
}
}
案例2:观察窗口数据
- 观察窗口的数据
- 观察 batchDuration、windowDuration、slideDuration 三者之间的关系
- 使用窗口相关的操作
package icu.wzk
object WindowDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("WindowDemo")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("WARN")
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
lines.foreachRDD {
(rdd, time) => {
println(s"rdd = ${rdd.id}; time = $time")
}
rdd.foreach(value => println(value))
}
// 20秒窗口长度(DS包含窗口长度范围内的数据)
// 10秒滑动间隔(多次时间处理一次数据)
val res1: DStream[String] = lines
.reduceByWindow(_ + " " + _, Seconds(20), Seconds(10))
res1.print()
val res2: DStream[String] = lines
.reduceByWindow(_ + _, Seconds(20), Seconds(10))
res2.print()
// 求窗口元素的和
val res3: DStream[Int] = lines
.map(_.toInt)
.reduceByWindow(_ + _, Seconds(20), Seconds(10))
res3.print()
// 请窗口元素和
val res4 = res2.map(_.toInt).reduce(_ + _)
res4.print()
// 程序启动
ssc.start()
ssc.awaitTermination()
}
}
案例3:热点搜索词实时统计
package icu.wzk
object HotWordStats {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("HotWordStats")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(2))
// 检查点设置 也可以设置到 HDFS
ssc.sparkContext.setLogLevel("ERROR")
ssc.checkpoint("checkpoint")
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val words: DStream[String] = lines.flatMap(_.split("\\s+"))
val pairs: DStream[(String, Int)] = words.map(x => (x, 1))
// 通过 reduceByKeyAndWindow算子 每隔10秒统计最近20秒的词出现的的次数
val wordCounts1: DStream[(String, Int)] = pairs
.reduceByKeyAndWindow(
(a: Int, b: Int) => a + b, Seconds(20), Seconds(10), 2
)
wordCounts1.print()
// 需要CheckPoint的支持
val wordCounts2: DStream[(String, Int)] = pairs
.reduceByKeyAndWindow(
_ + _, _ - _, Seconds(20), Seconds(10), 2
)
wordCounts2.print()
// 运行程序
ssc.start()
ssc.awaitTermination()
}
}
状态跟踪操作:updateStateByKey
UpdateStateByKey的主要功能:
- 为Streaming中每一个Key维护一份State状态,state类型可以是任意类型的,可以是自定义对象,更新函数也可以是自定义的
- 通过更新函数对该Key的状态不断更新,对于每个新的batch而言,Spark Streaming会在使用updateStateByKey的时候已经存在的key进行state状态更新
- 使用updateStateByKey时要开启 CheckPoint 功能
案例1:updateStateByKey实现全局WordCount
package icu.wzk
object StateTracker1 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("StateTracker1")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("ERROR")
ssc.checkpoint("checkpoint")
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val words: DStream[String] = lines.flatMap(_.split("\\s+"))
val wordDStream: DStream[(String, Int)] = words.map(x => (x, 1))
// 定义状态更新函数
// 函数常量定义 返回类型是 Some(Int),表示的含义是最新状态
// 函数的功能是将当前时间间隔内产生的Key的Value集合,加到上一个状态中,得到最新状态
val updateFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {
// 通过Spark内部的reduceByKey按Key规约,然后这里传入某Key当前批次的Seq,再计算当前批次的总和
val currentCount = currValues.sum
// 已累加的值
val previousCount = prevValueState.getOrElse(0)
Some(currentCount + previousCount)
}
val stateDStream: DStream[(String, Int)] = wordDStream.updateStateByKey[Int](updateFunc)
stateDStream.print()
// 把DStream保存到文本文件中 会生成很多的小文件 一个批次生成一个目录
val outputDir = "output1"
stateDStream
.repartition(1)
.saveAsTextFiles(outputDir)
// 开始运行
ssc.start()
ssc.awaitTermination()
}
}
状态跟踪操作:mapWithState
mapWithState:也是用于全局统计Key的状态,如果没有数据输入,便不会返回之前的Key的状态,有一点增量的感觉。这样做的好处是,只关心那些已经发生的变化的Key,对于没有数据输入,则不会返回那些没有变化的Key的数据,即使数据量很大,checkpoint也不会像updateStateByKey那样,占用太多的存储。
案例2:mapWithState实现增量WordCount
package icu.wzk
object StateTracker2 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName("StateTracker2")
.setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(2))
ssc.sparkContext.setLogLevel("ERROR")
ssc.checkpoint("checkpoint")
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val words: DStream[String] = lines.flatMap(_.split("\\s+"))
val wordDStream: DStream[(String, Int)] = words.map(x => (x, 1))
def mappingFunction(key: String, one: Option[Int], state: State[Int]): (String, Int) = {
val sum: Int = one.getOrElse(0) + state.getOption.getOrElse(0)
state.update(sum)
(key, sum)
}
val spec = StateSpec.function(mappingFunction _)
val resultDStream: DStream[(String, Int)] = wordDStream.mapWithState(spec)
resultDStream.cache()
// 把DStream保存到文本文件中,会生成很多的小文件。一个批次生成一个目录
val outputDir = "output2"
resultDStream.repartition(1).saveAsTextFiles(outputDir)
ssc.start()
ssc.awaitTermination()
}
}