This is article 82 in the Big Data series, systematically introducing SparkSQL Transformation and Action operators with complete test cases.
Core Concept: Lazy Execution Mechanism
SparkSQL’s core execution model is Lazy Evaluation:
- Transformation: Doesn’t execute immediately, only records logic and dependencies, builds DAG (Directed Acyclic Graph)
- Action: Triggers actual computation, returns results to Driver or writes to external storage
This design allows Spark to optimize entire execution plan rather than executing step by step.
Transformation Operators
Common Operators List
| Operator | Description |
|---|---|
select() | Column selection |
filter() / where() | Row filtering |
join() | DataFrame merge (inner/outer/left/right/cross) |
groupBy() | Data grouping |
agg() | Aggregation functions (count/sum/avg/max/min) |
orderBy() / sort() | Sorting |
cache() / persist() | Cache for acceleration |
distinct() / dropDuplicates() | Deduplication |
map / flatMap | Element-level transformation |
Selection and Filtering
val df = spark.read.option("header", "true").csv("employees.csv")
// Select specific columns
df.select("name", "age", "job").show(3)
// Expression column operations
df.select($"name", $"sal" + 1000).show(5)
// Conditional filtering
df.filter("age > 25").show()
df.where("age > 25 and job == 'ANALYST'").show()
Grouping and Aggregation
// Group by job, sum salary
df.groupBy("job").sum("sal").show()
// Aggregation with HAVING condition
df.groupBy("job").avg("sal").where("avg(sal) > 2000").show()
// Multiple aggregation functions
df.groupBy("deptno").agg(
"sal" -> "max",
"sal" -> "min",
"sal" -> "avg"
).show()
Join Operations
case class StudentAge(sno: Int, name: String, age: Int)
case class StudentHeight(sname: String, height: Int)
val ds1 = spark.createDataset(Seq(
StudentAge(1, "Alice", 20),
StudentAge(2, "Bob", 22)
))
val ds2 = spark.createDataset(Seq(
StudentHeight("Alice", 165),
StudentHeight("Bob", 175)
))
// Inner join
ds1.join(ds2, $"name" === $"sname").show()
// Specify join type
ds1.join(ds2, $"name" === $"sname", "left").show()
Set Operations
// Union (not deduplicated)
ds3.union(ds4).show()
// Intersection
ds3.intersect(ds4).show()
// Except
ds3.except(ds4).show()
Null Handling
// Drop all rows containing null
df.na.drop().show()
// Drop rows with null in specified columns
df.na.drop(Array("sal")).show()
// Fill null with specified value
df.na.fill(1000).show()
df.na.fill(Map("sal" -> 0, "comm" -> 0)).show()
Action Operators
Action operators trigger actual computation, common operators:
| Operator | Return Type | Description |
|---|---|---|
show() | Unit | Print DataFrame content |
collect() | Array[Row] | Collect all data to Driver |
collectAsList() | List[Row] | Collect as Java List |
count() | Long | Count rows |
take(n) / head(n) | Array[Row] | Take first n rows |
first() | Row | Take first row |
write() | DataFrameWriter | Write to external storage |
printSchema() | Unit | Print Schema info |
explain() | Unit | Print execution plan |
Basic Usage
val df = spark.read.option("header", "true").csv("employees.csv")
// View data
df.show(5)
df.toJSON.show(false) // JSON format, no truncation
// Collect results
val rows: Array[Row] = df.collect()
val list: java.util.List[Row] = df.collectAsList()
// Take partial data
val first: Row = df.head()
val top3: Array[Row] = df.take(3)
# Statistics
println(df.count())
View Execution Plan
# View logical plan and physical plan
df.groupBy("deptno").avg("sal").explain(true)
Comprehensive Example: Employee Data Analysis
val df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("emp.csv")
# Find top 3 employees by salary in each department
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val windowSpec = Window.partitionBy("deptno").orderBy(desc("sal"))
df.withColumn("rank", rank().over(windowSpec))
.filter($"rank" <= 3)
.select("deptno", "ename", "sal", "rank")
.show()
Performance Optimization Suggestions
- Leverage lazy evaluation: Let Spark optimize entire execution plan, not step-by-step operations
- Use
cache()reasonably: Cache DataFrames used multiple times to avoid recomputation - Handle data skew: Pay attention to hot keys during
groupByoperations, can use salting techniques - Choose appropriate Join strategy: Use
broadcast()hint when joining small table with large table - Reduce
collect()usage:collect()pulls all data to Driver, use carefully with large data
# Use broadcast Join for small table
import org.apache.spark.sql.functions.broadcast
largeDf.join(broadcast(smallDf), "key").show()