This is article 72 in the Big Data series, systematically reviewing Spark RDD Action operators. Unlike Transformation lazy evaluation, Action operations trigger actual computation and return results to Driver or write to external storage.

Why Action is Important

Spark Transformations (like map, filter) use lazy evaluation, only record operation descriptions, don’t execute computation. Only when calling Action does Spark submit Job, schedule DAG, execute computation on Executors. Understanding Action classification and applicable scenarios is foundation for writing efficient Spark programs.

Data Collection Category

This category of Actions pulls RDD data back to Driver:

OperatorDescription
collect()Convert entire RDD to array, use carefully with large data (Driver OOM)
collectAsMap()Convert K-V RDD to Map, duplicate keys overwritten
first()Return first element
take(n)Return array of first n elements
top(n)Return largest n elements (default descending)
takeSample(withReplacement, n, seed)Randomly sample n elements
val rdd = sc.parallelize(1 to 100)
rdd.take(5)          // Array(1, 2, 3, 4, 5)
rdd.top(3)           // Array(100, 99, 98)

Statistics Category

val rdd = sc.parallelize(List(1.0, 2.0, 3.0, 4.0, 5.0))
rdd.count()   // 5
rdd.mean()    // 3.0
rdd.stdev()   // 1.4142...
rdd.max()     // 5.0
rdd.min()     // 1.0

// stats() returns StatCounter at once
val s = rdd.stats()
println(s"count=${s.count}, mean=${s.mean}, stdev=${s.stdev}")

Aggregation Category

reduce

val sum = sc.parallelize(1 to 100).reduce(_ + _)  // 5050

reduce requires operation to satisfy commutative and associative law, otherwise result is undefined.

fold

Similar to reduce, but requires providing initial value (zero value) for each partition:

val sum = sc.parallelize(1 to 10).fold(0)(_ + _)  // 55

aggregate

Most flexible aggregation operator, supports different logic for in-partition aggregation and cross-partition merging, can be used for complex statistics like calculating average:

val (sum, count) = sc.parallelize(1 to 10).aggregate((0, 0))(
  (acc, x) => (acc._1 + x, acc._2 + 1),   // seqOp: in-partition accumulation
  (a, b)   => (a._1 + b._1, a._2 + b._2)  // combOp: cross-partition merge
)
println(s"avg = ${sum.toDouble / count}")  // avg = 5.5

Iteration Category

// foreach: execute element by element, commonly used for writing to external systems
rdd.foreach(x => println(x))

// foreachPartition: create connection once per partition, suitable for database output
rdd.foreachPartition { iter =>
  val conn = getConnection()
  iter.foreach(x => conn.execute(s"INSERT INTO t VALUES ($x)"))
  conn.close()
}

foreachPartition has better performance than foreach because database connection cost reduced from “once per record” to “once per partition”.

Storage Category

// Write as text file (one file per partition)
rdd.saveAsTextFile("hdfs://namenode/output/words")

// Write as Hadoop SequenceFile (K-V format)
pairRdd.saveAsSequenceFile("hdfs://namenode/output/seq")

// Write as Java serialized object file
rdd.saveAsObjectFile("hdfs://namenode/output/obj")

Key-Value RDD Specific Operators

Create PairRDD

val arr = List(1, 2, 3, 4, 5)
val pairRdd = sc.makeRDD(arr.map(x => (x, (x * 10, x * 100))))

Value Transformation

val rdd = sc.parallelize(List((1, 2), (3, 4), (5, 6)))

// mapValues: transform only value, keep key unchanged
rdd.mapValues(x => 1 to x).collect()
// Array((1,Range(1,1)), (3,Range(1,3)), ...)

// flatMapValues: expand value collection
rdd.flatMapValues(x => 1 to x).collect()
// Array((1,1), (3,1), (3,2), (3,3), ...)

Aggregation and Sorting

// reduceByKey: aggregate by key (more efficient than groupByKey)
wordRdd.reduceByKey(_ + _)

// groupByKey: group by key, suitable for scenarios needing to keep all values
// Example: calculate daily book average sales
salesRdd.groupByKey()
  .mapValues(v => v.sum.toDouble / v.size)
  .collect()

// sortByKey: sort by key
rdd.sortByKey(ascending = false)

Join Operations

val rdd1 = sc.parallelize(List((1, "a"), (2, "b")))
val rdd2 = sc.parallelize(List((1, "x"), (3, "y")))

rdd1.join(rdd2)           // Inner join: only key=1
rdd1.leftOuterJoin(rdd2)  // Left outer join: keep all keys from rdd1
rdd1.rightOuterJoin(rdd2) // Right outer join: keep all keys from rdd2
rdd1.fullOuterJoin(rdd2) // Full outer join: keep all keys

// cogroup: aggregate multiple RDDs by key
rdd1.cogroup(rdd2).collect()
// Array((1,(CompactBuffer(a),CompactBuffer(x))), ...)

lookup

rdd1.lookup(1)  // List(a) — return all values where key=1

Notes

  1. Action triggers entire DAG computation, high-frequency Action calls cause repeated computation. Suggest first cache() or persist()
  2. Avoid collect() on large datasets, use take() or directly saveAsTextFile()
  3. groupByKey gathers all data for same key to single Executor, poor performance on data skew, prefer reduceByKey