本文是大数据系列第 87 篇,系统介绍 DStream 转换算子分类,重点解析 transform 操作的三种实战用法,以黑名单过滤为案例贯穿全程。
DStream 转换算子分类
DStream 转换操作分为两大类:
无状态转换(Stateless Transformations): 每个批次独立处理,不保留历史状态,如 map、flatMap、filter、reduceByKey。
有状态转换(Stateful Transformations): 跨批次维护状态,包括 updateStateByKey 和窗口操作。
常用无状态算子
map / flatMap / filter
// 计算每行长度
val lengths = lines.map(line => line.length)
// 分词
val words = lines.flatMap(line => line.split(" "))
// 过滤短词
val filteredWords = words.filter(word => word.length > 5)
reduceByKey / groupByKey
// 词频统计
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
// 按 key 分组
val grouped = pairs.groupByKey()
reduceByKey 在分区内先做本地聚合(类似 MapReduce 的 Combiner),性能优于 groupByKey。
transform:直接操作底层 RDD
transform 允许对 DStream 每个批次的 RDD 应用任意 RDD 算子,弥补了 DStream API 表达能力的不足。
val filtered = words.transform { rdd =>
rdd.filter(_.length > 3)
.map(_.toUpperCase())
}
核心特性:
- 每个批次间隔调用一次,生成新的 DStream
- 可以在流处理中调用只有 RDD 才有的算子
- 支持流与静态数据集(广播变量、外部 RDD)的 join
实战案例:流数据黑名单过滤
场景描述
输入流为用户点击日志,格式为 <id> <keyword>,需实时过滤掉黑名单关键词:
1 hadoop
2 spark ← 过滤
3 scala ← 过滤
4 java
黑名单:Array(("spark", true), ("scala", true))
方案一:leftOuterJoin
val blackList = Array(("spark", true), ("scala", true))
val blackListRDD = ssc.sparkContext.makeRDD(blackList)
clickStreamFormatted.transform(clickRDD => {
val joinedRDD = clickRDD.leftOuterJoin(blackListRDD)
joinedRDD.filter {
case (word, (line, flag)) => !flag.getOrElse(false)
}.map { case (word, (line, _)) => line }
})
左外连接后,黑名单命中的记录 flag 为 Some(true),未命中为 None,通过 getOrElse(false) 取反过滤。
方案二:SparkSQL 过滤
clickStreamFormatted.transform { clickRDD =>
val spark = SparkSession.builder()
.config(clickRDD.sparkContext.getConf)
.getOrCreate()
val clickDF = clickRDD.toDF("word", "line")
val blackDF = blackListRDD.toDF("word", "flag")
clickDF.join(blackDF, Seq("word"), "left")
.filter("flag is null or flag == false")
.select("line").rdd
}
用 DataFrame API 表达过滤逻辑,可读性更高,适合熟悉 SQL 的团队。
方案三:广播变量(最优)
val blackListBC = ssc.sparkContext
.broadcast(blackList.filter(_._2).map(_._1))
clickStream
.map(value => (value.split(" ")(1), value))
.filter { case (word, _) =>
!blackListBC.value.contains(word)
}
.map(_._2)
广播变量将黑名单分发到每个 Executor,避免了 shuffle,性能最优,推荐生产使用。
三种方案对比
| 方案 | 实现方式 | Shuffle | 推荐度 |
|---|---|---|---|
| leftOuterJoin | RDD join | 有 | 一般 |
| SparkSQL | DataFrame join | 有 | 中等 |
| 广播变量 | filter + broadcast | 无 | 推荐 |
transform 的核心价值在于打通 DStream 和 RDD API 的边界,在流处理场景中直接复用批处理的所有算子能力。