本文是大数据系列第 87 篇,系统介绍 DStream 转换算子分类,重点解析 transform 操作的三种实战用法,以黑名单过滤为案例贯穿全程。

完整图文版(含截图):CSDN 原文 | 掘金

DStream 转换算子分类

DStream 转换操作分为两大类:

无状态转换(Stateless Transformations): 每个批次独立处理,不保留历史状态,如 map、flatMap、filter、reduceByKey。

有状态转换(Stateful Transformations): 跨批次维护状态,包括 updateStateByKey 和窗口操作。

常用无状态算子

map / flatMap / filter

// 计算每行长度
val lengths = lines.map(line => line.length)

// 分词
val words = lines.flatMap(line => line.split(" "))

// 过滤短词
val filteredWords = words.filter(word => word.length > 5)

reduceByKey / groupByKey

// 词频统计
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)

// 按 key 分组
val grouped = pairs.groupByKey()

reduceByKey 在分区内先做本地聚合(类似 MapReduce 的 Combiner),性能优于 groupByKey

transform:直接操作底层 RDD

transform 允许对 DStream 每个批次的 RDD 应用任意 RDD 算子,弥补了 DStream API 表达能力的不足。

val filtered = words.transform { rdd =>
  rdd.filter(_.length > 3)
     .map(_.toUpperCase())
}

核心特性:

  • 每个批次间隔调用一次,生成新的 DStream
  • 可以在流处理中调用只有 RDD 才有的算子
  • 支持流与静态数据集(广播变量、外部 RDD)的 join

实战案例:流数据黑名单过滤

场景描述

输入流为用户点击日志,格式为 <id> <keyword>,需实时过滤掉黑名单关键词:

1 hadoop
2 spark      ← 过滤
3 scala      ← 过滤
4 java

黑名单:Array(("spark", true), ("scala", true))

方案一:leftOuterJoin

val blackList = Array(("spark", true), ("scala", true))
val blackListRDD = ssc.sparkContext.makeRDD(blackList)

clickStreamFormatted.transform(clickRDD => {
  val joinedRDD = clickRDD.leftOuterJoin(blackListRDD)
  joinedRDD.filter {
    case (word, (line, flag)) => !flag.getOrElse(false)
  }.map { case (word, (line, _)) => line }
})

左外连接后,黑名单命中的记录 flagSome(true),未命中为 None,通过 getOrElse(false) 取反过滤。

方案二:SparkSQL 过滤

clickStreamFormatted.transform { clickRDD =>
  val spark = SparkSession.builder()
    .config(clickRDD.sparkContext.getConf)
    .getOrCreate()

  val clickDF = clickRDD.toDF("word", "line")
  val blackDF = blackListRDD.toDF("word", "flag")
  clickDF.join(blackDF, Seq("word"), "left")
    .filter("flag is null or flag == false")
    .select("line").rdd
}

用 DataFrame API 表达过滤逻辑,可读性更高,适合熟悉 SQL 的团队。

方案三:广播变量(最优)

val blackListBC = ssc.sparkContext
  .broadcast(blackList.filter(_._2).map(_._1))

clickStream
  .map(value => (value.split(" ")(1), value))
  .filter { case (word, _) =>
    !blackListBC.value.contains(word)
  }
  .map(_._2)

广播变量将黑名单分发到每个 Executor,避免了 shuffle,性能最优,推荐生产使用。

三种方案对比

方案实现方式Shuffle推荐度
leftOuterJoinRDD join一般
SparkSQLDataFrame join中等
广播变量filter + broadcast推荐

transform 的核心价值在于打通 DStream 和 RDD API 的边界,在流处理场景中直接复用批处理的所有算子能力。