This is article 72 in the Big Data series, systematically reviewing Spark RDD Action operators. Unlike Transformation lazy evaluation, Action operations trigger actual computation and return results to Driver or write to external storage.
Why Action is Important
Spark Transformations (like map, filter) use lazy evaluation, only record operation descriptions, don’t execute computation. Only when calling Action does Spark submit Job, schedule DAG, execute computation on Executors. Understanding Action classification and applicable scenarios is foundation for writing efficient Spark programs.
Data Collection Category
This category of Actions pulls RDD data back to Driver:
| Operator | Description |
|---|---|
collect() | Convert entire RDD to array, use carefully with large data (Driver OOM) |
collectAsMap() | Convert K-V RDD to Map, duplicate keys overwritten |
first() | Return first element |
take(n) | Return array of first n elements |
top(n) | Return largest n elements (default descending) |
takeSample(withReplacement, n, seed) | Randomly sample n elements |
val rdd = sc.parallelize(1 to 100)
rdd.take(5) // Array(1, 2, 3, 4, 5)
rdd.top(3) // Array(100, 99, 98)
Statistics Category
val rdd = sc.parallelize(List(1.0, 2.0, 3.0, 4.0, 5.0))
rdd.count() // 5
rdd.mean() // 3.0
rdd.stdev() // 1.4142...
rdd.max() // 5.0
rdd.min() // 1.0
// stats() returns StatCounter at once
val s = rdd.stats()
println(s"count=${s.count}, mean=${s.mean}, stdev=${s.stdev}")
Aggregation Category
reduce
val sum = sc.parallelize(1 to 100).reduce(_ + _) // 5050
reduce requires operation to satisfy commutative and associative law, otherwise result is undefined.
fold
Similar to reduce, but requires providing initial value (zero value) for each partition:
val sum = sc.parallelize(1 to 10).fold(0)(_ + _) // 55
aggregate
Most flexible aggregation operator, supports different logic for in-partition aggregation and cross-partition merging, can be used for complex statistics like calculating average:
val (sum, count) = sc.parallelize(1 to 10).aggregate((0, 0))(
(acc, x) => (acc._1 + x, acc._2 + 1), // seqOp: in-partition accumulation
(a, b) => (a._1 + b._1, a._2 + b._2) // combOp: cross-partition merge
)
println(s"avg = ${sum.toDouble / count}") // avg = 5.5
Iteration Category
// foreach: execute element by element, commonly used for writing to external systems
rdd.foreach(x => println(x))
// foreachPartition: create connection once per partition, suitable for database output
rdd.foreachPartition { iter =>
val conn = getConnection()
iter.foreach(x => conn.execute(s"INSERT INTO t VALUES ($x)"))
conn.close()
}
foreachPartition has better performance than foreach because database connection cost reduced from “once per record” to “once per partition”.
Storage Category
// Write as text file (one file per partition)
rdd.saveAsTextFile("hdfs://namenode/output/words")
// Write as Hadoop SequenceFile (K-V format)
pairRdd.saveAsSequenceFile("hdfs://namenode/output/seq")
// Write as Java serialized object file
rdd.saveAsObjectFile("hdfs://namenode/output/obj")
Key-Value RDD Specific Operators
Create PairRDD
val arr = List(1, 2, 3, 4, 5)
val pairRdd = sc.makeRDD(arr.map(x => (x, (x * 10, x * 100))))
Value Transformation
val rdd = sc.parallelize(List((1, 2), (3, 4), (5, 6)))
// mapValues: transform only value, keep key unchanged
rdd.mapValues(x => 1 to x).collect()
// Array((1,Range(1,1)), (3,Range(1,3)), ...)
// flatMapValues: expand value collection
rdd.flatMapValues(x => 1 to x).collect()
// Array((1,1), (3,1), (3,2), (3,3), ...)
Aggregation and Sorting
// reduceByKey: aggregate by key (more efficient than groupByKey)
wordRdd.reduceByKey(_ + _)
// groupByKey: group by key, suitable for scenarios needing to keep all values
// Example: calculate daily book average sales
salesRdd.groupByKey()
.mapValues(v => v.sum.toDouble / v.size)
.collect()
// sortByKey: sort by key
rdd.sortByKey(ascending = false)
Join Operations
val rdd1 = sc.parallelize(List((1, "a"), (2, "b")))
val rdd2 = sc.parallelize(List((1, "x"), (3, "y")))
rdd1.join(rdd2) // Inner join: only key=1
rdd1.leftOuterJoin(rdd2) // Left outer join: keep all keys from rdd1
rdd1.rightOuterJoin(rdd2) // Right outer join: keep all keys from rdd2
rdd1.fullOuterJoin(rdd2) // Full outer join: keep all keys
// cogroup: aggregate multiple RDDs by key
rdd1.cogroup(rdd2).collect()
// Array((1,(CompactBuffer(a),CompactBuffer(x))), ...)
lookup
rdd1.lookup(1) // List(a) — return all values where key=1
Notes
- Action triggers entire DAG computation, high-frequency Action calls cause repeated computation. Suggest first
cache()orpersist() - Avoid
collect()on large datasets, usetake()or directlysaveAsTextFile() groupByKeygathers all data for same key to single Executor, poor performance on data skew, preferreduceByKey