本文是大数据系列第 72 篇,系统梳理 Spark RDD 的 Action 算子,与 Transformation 惰性求值不同,Action 操作会触发实际计算并将结果返回给 Driver 或写出到外部存储。
为什么 Action 很重要
Spark 的 Transformation(如 map、filter)采用惰性求值,只记录操作描述,不执行计算。只有调用 Action 时,Spark 才会提交 Job、调度 DAG、在 Executor 上执行计算。理解 Action 的分类和适用场景,是写出高效 Spark 程序的基础。
数据收集类
这类 Action 将 RDD 数据拉回到 Driver 端:
| 算子 | 说明 |
|---|---|
collect() | 将整个 RDD 转为数组返回,数据量大时慎用(Driver OOM) |
collectAsMap() | 将 K-V RDD 转为 Map,重复 key 会覆盖 |
first() | 返回第一个元素 |
take(n) | 返回前 n 个元素组成的数组 |
top(n) | 返回最大的 n 个元素(默认降序) |
takeSample(withReplacement, n, seed) | 随机采样 n 个元素 |
val rdd = sc.parallelize(1 to 100)
rdd.take(5) // Array(1, 2, 3, 4, 5)
rdd.top(3) // Array(100, 99, 98)
统计类
val rdd = sc.parallelize(List(1.0, 2.0, 3.0, 4.0, 5.0))
rdd.count() // 5
rdd.mean() // 3.0
rdd.stdev() // 1.4142...
rdd.max() // 5.0
rdd.min() // 1.0
// stats() 一次性返回 StatCounter
val s = rdd.stats()
println(s"count=${s.count}, mean=${s.mean}, stdev=${s.stdev}")
聚合类
reduce
val sum = sc.parallelize(1 to 100).reduce(_ + _) // 5050
reduce 要求操作满足交换律和结合律,否则结果不确定。
fold
类似 reduce,但需要提供每个分区的初始值(零值):
val sum = sc.parallelize(1 to 10).fold(0)(_ + _) // 55
aggregate
最灵活的聚合算子,支持分区内聚合和跨分区合并使用不同逻辑,可用于计算平均值等复杂统计:
val (sum, count) = sc.parallelize(1 to 10).aggregate((0, 0))(
(acc, x) => (acc._1 + x, acc._2 + 1), // seqOp: 分区内累加
(a, b) => (a._1 + b._1, a._2 + b._2) // combOp: 跨分区合并
)
println(s"avg = ${sum.toDouble / count}") // avg = 5.5
迭代类
// foreach:逐元素执行,常用于写外部系统
rdd.foreach(x => println(x))
// foreachPartition:每个分区创建一次连接,适合数据库写出
rdd.foreachPartition { iter =>
val conn = getConnection()
iter.foreach(x => conn.execute(s"INSERT INTO t VALUES ($x)"))
conn.close()
}
foreachPartition 比 foreach 性能更好,因为数据库连接开销从”每条记录一次”降为”每个分区一次”。
存储类
// 写出为文本文件(每个分区一个文件)
rdd.saveAsTextFile("hdfs://namenode/output/words")
// 写出为 Hadoop SequenceFile(K-V 格式)
pairRdd.saveAsSequenceFile("hdfs://namenode/output/seq")
// 写出为 Java 序列化对象文件
rdd.saveAsObjectFile("hdfs://namenode/output/obj")
Key-Value RDD 专项算子
创建 PairRDD
val arr = List(1, 2, 3, 4, 5)
val pairRdd = sc.makeRDD(arr.map(x => (x, (x * 10, x * 100))))
值变换
val rdd = sc.parallelize(List((1, 2), (3, 4), (5, 6)))
// mapValues:只变换 value,key 保持不变
rdd.mapValues(x => 1 to x).collect()
// Array((1,Range(1,1)), (3,Range(1,3)), ...)
// flatMapValues:展开 value 集合
rdd.flatMapValues(x => 1 to x).collect()
// Array((1,1), (3,1), (3,2), (3,3), ...)
聚合与排序
// reduceByKey:按 key 聚合(比 groupByKey 更高效)
wordRdd.reduceByKey(_ + _)
// groupByKey:按 key 分组,适合需要保留所有 value 的场景
// 例:计算每日图书平均销售量
salesRdd.groupByKey()
.mapValues(v => v.sum.toDouble / v.size)
.collect()
// sortByKey:按 key 排序
rdd.sortByKey(ascending = false)
Join 操作
val rdd1 = sc.parallelize(List((1, "a"), (2, "b")))
val rdd2 = sc.parallelize(List((1, "x"), (3, "y")))
rdd1.join(rdd2) // 内连接:只有 key=1
rdd1.leftOuterJoin(rdd2) // 左外连接:保留 rdd1 所有 key
rdd1.rightOuterJoin(rdd2) // 右外连接:保留 rdd2 所有 key
rdd1.fullOuterJoin(rdd2) // 全外连接:保留所有 key
// cogroup:将多个 RDD 按 key 聚合
rdd1.cogroup(rdd2).collect()
// Array((1,(CompactBuffer(a),CompactBuffer(x))), ...)
lookup
rdd1.lookup(1) // List(a) — 返回 key=1 的所有 value
注意事项
- Action 会触发整个 DAG 计算,高频调用 Action 会产生重复计算,建议先
cache()或persist() - 大数据集避免用
collect(),改用take()或直接saveAsTextFile() groupByKey会将同一 key 的所有数据聚集到单个 Executor,数据倾斜时性能差,优先用reduceByKey