本文是大数据系列第 82 篇,系统介绍 SparkSQL 的 Transformation 与 Action 算子,并配套完整测试用例。

完整图文版(含截图):CSDN 原文 | 掘金

核心概念:懒执行机制

SparkSQL 的核心执行模型是懒执行(Lazy Evaluation)

  • Transformation(转换):不立即执行,仅记录逻辑与依赖关系,构建 DAG(有向无环图)
  • Action(行动):触发实际计算,将结果返回 Driver 或写入外部存储

这种设计允许 Spark 优化整个执行计划,而非逐步执行。


Transformation 算子

常用算子列表

算子说明
select()列选取
filter() / where()行过滤
join()DataFrame 合并(inner/outer/left/right/cross)
groupBy()数据分组
agg()聚合函数(count/sum/avg/max/min)
orderBy() / sort()排序
cache() / persist()缓存加速
distinct() / dropDuplicates()去重
map / flatMap元素级转换

选取与过滤

val df = spark.read.option("header", "true").csv("employees.csv")

// 选取特定列
df.select("name", "age", "job").show(3)

// 表达式列操作
df.select($"name", $"sal" + 1000).show(5)

// 条件过滤
df.filter("age > 25").show()
df.where("age > 25 and job == 'ANALYST'").show()

分组与聚合

// 按职位分组求工资总和
df.groupBy("job").sum("sal").show()

// 带 HAVING 条件的聚合
df.groupBy("job").avg("sal").where("avg(sal) > 2000").show()

// 多聚合函数
df.groupBy("deptno").agg(
  "sal" -> "max",
  "sal" -> "min",
  "sal" -> "avg"
).show()

Join 操作

case class StudentAge(sno: Int, name: String, age: Int)
case class StudentHeight(sname: String, height: Int)

val ds1 = spark.createDataset(Seq(
  StudentAge(1, "Alice", 20),
  StudentAge(2, "Bob", 22)
))
val ds2 = spark.createDataset(Seq(
  StudentHeight("Alice", 165),
  StudentHeight("Bob", 175)
))

// 内连接
ds1.join(ds2, $"name" === $"sname").show()

// 指定 Join 类型
ds1.join(ds2, $"name" === $"sname", "left").show()

集合操作

// 并集(不去重)
ds3.union(ds4).show()

// 交集
ds3.intersect(ds4).show()

// 差集
ds3.except(ds4).show()

空值处理

// 删除所有含 null 的行
df.na.drop().show()

// 只删除指定列含 null 的行
df.na.drop(Array("sal")).show()

// 用指定值填充 null
df.na.fill(1000).show()
df.na.fill(Map("sal" -> 0, "comm" -> 0)).show()

Action 算子

Action 算子触发实际计算,常见算子如下:

算子返回类型说明
show()Unit打印 DataFrame 内容
collect()Array[Row]将所有数据收集到 Driver
collectAsList()List[Row]以 Java List 形式收集
count()Long统计行数
take(n) / head(n)Array[Row]取前 n 行
first()Row取第一行
write()DataFrameWriter写出到外部存储
printSchema()Unit打印 Schema 信息
explain()Unit打印执行计划

基本用法

val df = spark.read.option("header", "true").csv("employees.csv")

// 查看数据
df.show(5)
df.toJSON.show(false)   // JSON 格式,不截断

// 收集结果
val rows: Array[Row] = df.collect()
val list: java.util.List[Row] = df.collectAsList()

// 取部分数据
val first: Row = df.head()
val top3: Array[Row] = df.take(3)

// 统计
println(df.count())

查看执行计划

// 查看逻辑计划与物理计划
df.groupBy("deptno").avg("sal").explain(true)

综合示例:员工数据分析

val df = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("emp.csv")

// 找出每个部门工资最高的前 3 名员工
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

val windowSpec = Window.partitionBy("deptno").orderBy(desc("sal"))

df.withColumn("rank", rank().over(windowSpec))
  .filter($"rank" <= 3)
  .select("deptno", "ename", "sal", "rank")
  .show()

性能优化建议

  1. 利用懒执行:让 Spark 优化整个执行计划,而不是逐步操作
  2. 合理使用 cache():对多次使用的 DataFrame 缓存,避免重复计算
  3. 处理数据倾斜groupBy 操作时注意热点 key,可使用 salting 等技术
  4. 选择合适的 Join 策略:小表与大表 Join 时使用 broadcast() 提示
  5. 减少 collect() 使用collect() 会将所有数据拉到 Driver,数据量大时慎用
// 对小表使用广播 Join
import org.apache.spark.sql.functions.broadcast

largeDf.join(broadcast(smallDf), "key").show()