This is article 87 in the Big Data series, systematically introducing DStream transformation operator classification, focusing on three practical usages of transform operation, with blacklist filtering as the throughout case.
DStream Transformation Operator Classification
DStream transformation operations fall into two categories:
Stateless Transformations: Each batch is processed independently without retaining historical state, such as map, flatMap, filter, reduceByKey.
Stateful Transformations: Maintain state across batches, including updateStateByKey and window operations.
Common Stateless Operators
map / flatMap / filter
// Calculate line length
val lengths = lines.map(line => line.length)
// Tokenize
val words = lines.flatMap(line => line.split(" "))
// Filter short words
val filteredWords = words.filter(word => word.length > 5)
reduceByKey / groupByKey
// Word frequency count
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
// Group by key
val grouped = pairs.groupByKey()
reduceByKey performs local aggregation within partitions (similar to MapReduce’s Combiner), performing better than groupByKey.
transform: Direct RDD Operations
transform allows applying arbitrary RDD operators to each batch’s RDD in a DStream, making up for DStream API’s limited expressiveness.
val filtered = words.transform { rdd =>
rdd.filter(_.length > 3)
.map(_.toUpperCase())
}
Core Characteristics:
- Called once per batch interval, generates new DStream
- Can call RDD-only operators in stream processing
- Supports joins between streams and static datasets (broadcast variables, external RDDs)
Practical Case: Stream Data Blacklist Filtering
Scenario Description
Input stream is user click logs in format <id> <keyword>, need to filter out blacklisted keywords in real-time:
1 hadoop
2 spark ← Filter
3 scala ← Filter
4 java
Blacklist: Array(("spark", true), ("scala", true))
Approach 1: leftOuterJoin
val blackList = Array(("spark", true), ("scala", true))
val blackListRDD = ssc.sparkContext.makeRDD(blackList)
clickStreamFormatted.transform(clickRDD => {
val joinedRDD = clickRDD.leftOuterJoin(blackListRDD)
joinedRDD.filter {
case (word, (line, flag)) => !flag.getOrElse(false)
}.map { case (word, (line, _)) => line }
})
After left outer join, records matching blacklist have flag as Some(true), unmatched as None—filtered by negating with getOrElse(false).
Approach 2: SparkSQL Filtering
clickStreamFormatted.transform { clickRDD =>
val spark = SparkSession.builder()
.config(clickRDD.sparkContext.getConf)
.getOrCreate()
val clickDF = clickRDD.toDF("word", "line")
val blackDF = blackListRDD.toDF("word", "flag")
clickDF.join(blackDF, Seq("word"), "left")
.filter("flag is null or flag == false")
.select("line").rdd
}
Using DataFrame API to express filtering logic—better readability, suitable for teams familiar with SQL.
Approach 3: Broadcast Variables (Optimal)
val blackListBC = ssc.sparkContext
.broadcast(blackList.filter(_._2).map(_._1))
clickStream
.map(value => (value.split(" ")(1), value))
.filter { case (word, _) =>
!blackListBC.value.contains(word)
}
.map(_._2)
Broadcast variables distribute blacklist to each Executor, avoiding shuffle—optimal performance, recommended for production.
Three Approaches Comparison
| Approach | Implementation | Shuffle | Recommendation |
|---|---|---|---|
| leftOuterJoin | RDD join | Yes | Average |
| SparkSQL | DataFrame join | Yes | Medium |
| Broadcast Variables | filter + broadcast | No | Recommended |
The core value of transform is bridging DStream and RDD API boundaries, directly reusing all operator capabilities from batch processing in streaming scenarios.