本文是大数据系列第 72 篇,系统梳理 Spark RDD 的 Action 算子,与 Transformation 惰性求值不同,Action 操作会触发实际计算并将结果返回给 Driver 或写出到外部存储。

完整图文版(含截图):CSDN 原文 | 掘金

为什么 Action 很重要

Spark 的 Transformation(如 mapfilter)采用惰性求值,只记录操作描述,不执行计算。只有调用 Action 时,Spark 才会提交 Job、调度 DAG、在 Executor 上执行计算。理解 Action 的分类和适用场景,是写出高效 Spark 程序的基础。

数据收集类

这类 Action 将 RDD 数据拉回到 Driver 端:

算子说明
collect()将整个 RDD 转为数组返回,数据量大时慎用(Driver OOM)
collectAsMap()将 K-V RDD 转为 Map,重复 key 会覆盖
first()返回第一个元素
take(n)返回前 n 个元素组成的数组
top(n)返回最大的 n 个元素(默认降序)
takeSample(withReplacement, n, seed)随机采样 n 个元素
val rdd = sc.parallelize(1 to 100)
rdd.take(5)          // Array(1, 2, 3, 4, 5)
rdd.top(3)           // Array(100, 99, 98)

统计类

val rdd = sc.parallelize(List(1.0, 2.0, 3.0, 4.0, 5.0))
rdd.count()   // 5
rdd.mean()    // 3.0
rdd.stdev()   // 1.4142...
rdd.max()     // 5.0
rdd.min()     // 1.0

// stats() 一次性返回 StatCounter
val s = rdd.stats()
println(s"count=${s.count}, mean=${s.mean}, stdev=${s.stdev}")

聚合类

reduce

val sum = sc.parallelize(1 to 100).reduce(_ + _)  // 5050

reduce 要求操作满足交换律和结合律,否则结果不确定。

fold

类似 reduce,但需要提供每个分区的初始值(零值):

val sum = sc.parallelize(1 to 10).fold(0)(_ + _)  // 55

aggregate

最灵活的聚合算子,支持分区内聚合和跨分区合并使用不同逻辑,可用于计算平均值等复杂统计:

val (sum, count) = sc.parallelize(1 to 10).aggregate((0, 0))(
  (acc, x) => (acc._1 + x, acc._2 + 1),   // seqOp: 分区内累加
  (a, b)   => (a._1 + b._1, a._2 + b._2)  // combOp: 跨分区合并
)
println(s"avg = ${sum.toDouble / count}")  // avg = 5.5

迭代类

// foreach:逐元素执行,常用于写外部系统
rdd.foreach(x => println(x))

// foreachPartition:每个分区创建一次连接,适合数据库写出
rdd.foreachPartition { iter =>
  val conn = getConnection()
  iter.foreach(x => conn.execute(s"INSERT INTO t VALUES ($x)"))
  conn.close()
}

foreachPartitionforeach 性能更好,因为数据库连接开销从”每条记录一次”降为”每个分区一次”。

存储类

// 写出为文本文件(每个分区一个文件)
rdd.saveAsTextFile("hdfs://namenode/output/words")

// 写出为 Hadoop SequenceFile(K-V 格式)
pairRdd.saveAsSequenceFile("hdfs://namenode/output/seq")

// 写出为 Java 序列化对象文件
rdd.saveAsObjectFile("hdfs://namenode/output/obj")

Key-Value RDD 专项算子

创建 PairRDD

val arr = List(1, 2, 3, 4, 5)
val pairRdd = sc.makeRDD(arr.map(x => (x, (x * 10, x * 100))))

值变换

val rdd = sc.parallelize(List((1, 2), (3, 4), (5, 6)))

// mapValues:只变换 value,key 保持不变
rdd.mapValues(x => 1 to x).collect()
// Array((1,Range(1,1)), (3,Range(1,3)), ...)

// flatMapValues:展开 value 集合
rdd.flatMapValues(x => 1 to x).collect()
// Array((1,1), (3,1), (3,2), (3,3), ...)

聚合与排序

// reduceByKey:按 key 聚合(比 groupByKey 更高效)
wordRdd.reduceByKey(_ + _)

// groupByKey:按 key 分组,适合需要保留所有 value 的场景
// 例:计算每日图书平均销售量
salesRdd.groupByKey()
  .mapValues(v => v.sum.toDouble / v.size)
  .collect()

// sortByKey:按 key 排序
rdd.sortByKey(ascending = false)

Join 操作

val rdd1 = sc.parallelize(List((1, "a"), (2, "b")))
val rdd2 = sc.parallelize(List((1, "x"), (3, "y")))

rdd1.join(rdd2)           // 内连接:只有 key=1
rdd1.leftOuterJoin(rdd2)  // 左外连接:保留 rdd1 所有 key
rdd1.rightOuterJoin(rdd2) // 右外连接:保留 rdd2 所有 key
rdd1.fullOuterJoin(rdd2)  // 全外连接:保留所有 key

// cogroup:将多个 RDD 按 key 聚合
rdd1.cogroup(rdd2).collect()
// Array((1,(CompactBuffer(a),CompactBuffer(x))), ...)

lookup

rdd1.lookup(1)  // List(a) — 返回 key=1 的所有 value

注意事项

  1. Action 会触发整个 DAG 计算,高频调用 Action 会产生重复计算,建议先 cache()persist()
  2. 大数据集避免用 collect(),改用 take() 或直接 saveAsTextFile()
  3. groupByKey 会将同一 key 的所有数据聚集到单个 Executor,数据倾斜时性能差,优先用 reduceByKey