This is article 82 in the Big Data series, systematically introducing SparkSQL Transformation and Action operators with complete test cases.

Core Concept: Lazy Execution Mechanism

SparkSQL’s core execution model is Lazy Evaluation:

  • Transformation: Doesn’t execute immediately, only records logic and dependencies, builds DAG (Directed Acyclic Graph)
  • Action: Triggers actual computation, returns results to Driver or writes to external storage

This design allows Spark to optimize entire execution plan rather than executing step by step.


Transformation Operators

Common Operators List

OperatorDescription
select()Column selection
filter() / where()Row filtering
join()DataFrame merge (inner/outer/left/right/cross)
groupBy()Data grouping
agg()Aggregation functions (count/sum/avg/max/min)
orderBy() / sort()Sorting
cache() / persist()Cache for acceleration
distinct() / dropDuplicates()Deduplication
map / flatMapElement-level transformation

Selection and Filtering

val df = spark.read.option("header", "true").csv("employees.csv")

// Select specific columns
df.select("name", "age", "job").show(3)

// Expression column operations
df.select($"name", $"sal" + 1000).show(5)

// Conditional filtering
df.filter("age > 25").show()
df.where("age > 25 and job == 'ANALYST'").show()

Grouping and Aggregation

// Group by job, sum salary
df.groupBy("job").sum("sal").show()

// Aggregation with HAVING condition
df.groupBy("job").avg("sal").where("avg(sal) > 2000").show()

// Multiple aggregation functions
df.groupBy("deptno").agg(
  "sal" -> "max",
  "sal" -> "min",
  "sal" -> "avg"
).show()

Join Operations

case class StudentAge(sno: Int, name: String, age: Int)
case class StudentHeight(sname: String, height: Int)

val ds1 = spark.createDataset(Seq(
  StudentAge(1, "Alice", 20),
  StudentAge(2, "Bob", 22)
))
val ds2 = spark.createDataset(Seq(
  StudentHeight("Alice", 165),
  StudentHeight("Bob", 175)
))

// Inner join
ds1.join(ds2, $"name" === $"sname").show()

// Specify join type
ds1.join(ds2, $"name" === $"sname", "left").show()

Set Operations

// Union (not deduplicated)
ds3.union(ds4).show()

// Intersection
ds3.intersect(ds4).show()

// Except
ds3.except(ds4).show()

Null Handling

// Drop all rows containing null
df.na.drop().show()

// Drop rows with null in specified columns
df.na.drop(Array("sal")).show()

// Fill null with specified value
df.na.fill(1000).show()
df.na.fill(Map("sal" -> 0, "comm" -> 0)).show()

Action Operators

Action operators trigger actual computation, common operators:

OperatorReturn TypeDescription
show()UnitPrint DataFrame content
collect()Array[Row]Collect all data to Driver
collectAsList()List[Row]Collect as Java List
count()LongCount rows
take(n) / head(n)Array[Row]Take first n rows
first()RowTake first row
write()DataFrameWriterWrite to external storage
printSchema()UnitPrint Schema info
explain()UnitPrint execution plan

Basic Usage

val df = spark.read.option("header", "true").csv("employees.csv")

// View data
df.show(5)
df.toJSON.show(false)   // JSON format, no truncation

// Collect results
val rows: Array[Row] = df.collect()
val list: java.util.List[Row] = df.collectAsList()

// Take partial data
val first: Row = df.head()
val top3: Array[Row] = df.take(3)

# Statistics
println(df.count())

View Execution Plan

# View logical plan and physical plan
df.groupBy("deptno").avg("sal").explain(true)

Comprehensive Example: Employee Data Analysis

val df = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("emp.csv")

# Find top 3 employees by salary in each department
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val windowSpec = Window.partitionBy("deptno").orderBy(desc("sal"))

df.withColumn("rank", rank().over(windowSpec))
  .filter($"rank" <= 3)
  .select("deptno", "ename", "sal", "rank")
  .show()

Performance Optimization Suggestions

  1. Leverage lazy evaluation: Let Spark optimize entire execution plan, not step-by-step operations
  2. Use cache() reasonably: Cache DataFrames used multiple times to avoid recomputation
  3. Handle data skew: Pay attention to hot keys during groupBy operations, can use salting techniques
  4. Choose appropriate Join strategy: Use broadcast() hint when joining small table with large table
  5. Reduce collect() usage: collect() pulls all data to Driver, use carefully with large data
# Use broadcast Join for small table
import org.apache.spark.sql.functions.broadcast

largeDf.join(broadcast(smallDf), "key").show()