本文是大数据系列第 82 篇,系统介绍 SparkSQL 的 Transformation 与 Action 算子,并配套完整测试用例。
核心概念:懒执行机制
SparkSQL 的核心执行模型是懒执行(Lazy Evaluation):
- Transformation(转换):不立即执行,仅记录逻辑与依赖关系,构建 DAG(有向无环图)
- Action(行动):触发实际计算,将结果返回 Driver 或写入外部存储
这种设计允许 Spark 优化整个执行计划,而非逐步执行。
Transformation 算子
常用算子列表
| 算子 | 说明 |
|---|---|
select() | 列选取 |
filter() / where() | 行过滤 |
join() | DataFrame 合并(inner/outer/left/right/cross) |
groupBy() | 数据分组 |
agg() | 聚合函数(count/sum/avg/max/min) |
orderBy() / sort() | 排序 |
cache() / persist() | 缓存加速 |
distinct() / dropDuplicates() | 去重 |
map / flatMap | 元素级转换 |
选取与过滤
val df = spark.read.option("header", "true").csv("employees.csv")
// 选取特定列
df.select("name", "age", "job").show(3)
// 表达式列操作
df.select($"name", $"sal" + 1000).show(5)
// 条件过滤
df.filter("age > 25").show()
df.where("age > 25 and job == 'ANALYST'").show()
分组与聚合
// 按职位分组求工资总和
df.groupBy("job").sum("sal").show()
// 带 HAVING 条件的聚合
df.groupBy("job").avg("sal").where("avg(sal) > 2000").show()
// 多聚合函数
df.groupBy("deptno").agg(
"sal" -> "max",
"sal" -> "min",
"sal" -> "avg"
).show()
Join 操作
case class StudentAge(sno: Int, name: String, age: Int)
case class StudentHeight(sname: String, height: Int)
val ds1 = spark.createDataset(Seq(
StudentAge(1, "Alice", 20),
StudentAge(2, "Bob", 22)
))
val ds2 = spark.createDataset(Seq(
StudentHeight("Alice", 165),
StudentHeight("Bob", 175)
))
// 内连接
ds1.join(ds2, $"name" === $"sname").show()
// 指定 Join 类型
ds1.join(ds2, $"name" === $"sname", "left").show()
集合操作
// 并集(不去重)
ds3.union(ds4).show()
// 交集
ds3.intersect(ds4).show()
// 差集
ds3.except(ds4).show()
空值处理
// 删除所有含 null 的行
df.na.drop().show()
// 只删除指定列含 null 的行
df.na.drop(Array("sal")).show()
// 用指定值填充 null
df.na.fill(1000).show()
df.na.fill(Map("sal" -> 0, "comm" -> 0)).show()
Action 算子
Action 算子触发实际计算,常见算子如下:
| 算子 | 返回类型 | 说明 |
|---|---|---|
show() | Unit | 打印 DataFrame 内容 |
collect() | Array[Row] | 将所有数据收集到 Driver |
collectAsList() | List[Row] | 以 Java List 形式收集 |
count() | Long | 统计行数 |
take(n) / head(n) | Array[Row] | 取前 n 行 |
first() | Row | 取第一行 |
write() | DataFrameWriter | 写出到外部存储 |
printSchema() | Unit | 打印 Schema 信息 |
explain() | Unit | 打印执行计划 |
基本用法
val df = spark.read.option("header", "true").csv("employees.csv")
// 查看数据
df.show(5)
df.toJSON.show(false) // JSON 格式,不截断
// 收集结果
val rows: Array[Row] = df.collect()
val list: java.util.List[Row] = df.collectAsList()
// 取部分数据
val first: Row = df.head()
val top3: Array[Row] = df.take(3)
// 统计
println(df.count())
查看执行计划
// 查看逻辑计划与物理计划
df.groupBy("deptno").avg("sal").explain(true)
综合示例:员工数据分析
val df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("emp.csv")
// 找出每个部门工资最高的前 3 名员工
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val windowSpec = Window.partitionBy("deptno").orderBy(desc("sal"))
df.withColumn("rank", rank().over(windowSpec))
.filter($"rank" <= 3)
.select("deptno", "ename", "sal", "rank")
.show()
性能优化建议
- 利用懒执行:让 Spark 优化整个执行计划,而不是逐步操作
- 合理使用
cache():对多次使用的 DataFrame 缓存,避免重复计算 - 处理数据倾斜:
groupBy操作时注意热点 key,可使用 salting 等技术 - 选择合适的 Join 策略:小表与大表 Join 时使用
broadcast()提示 - 减少
collect()使用:collect()会将所有数据拉到 Driver,数据量大时慎用
// 对小表使用广播 Join
import org.apache.spark.sql.functions.broadcast
largeDf.join(broadcast(smallDf), "key").show()