本文是大数据系列第 84 篇,深入解析 SparkSQL 内核中 Join 策略的自动选择逻辑与 SQL 解析优化流程。
SparkSQL 架构层次
SparkSQL 由四个核心层组成:
- Core 层:负责数据 I/O,与 Spark Core 交互
- Catalyst 优化器:负责 SQL 解析、绑定与优化(核心组件)
- Hive 支持层:处理历史遗留数据
- ThriftServer:提供 JDBC/ODBC 接入接口
五种 Join 执行策略
Spark 会根据表大小、Join 类型(等值/非等值)、Key 是否可排序自动选择策略。
1. Broadcast Hash Join(BHJ)
原理:将小表数据广播到所有 Executor 节点,在 Map 端完成 Join,彻底避免 Shuffle。
触发条件:
- 小表大小 <
spark.sql.autoBroadcastJoinThreshold(默认 10MB) - 仅支持等值 Join
优势:条件满足时性能最优,无网络 Shuffle 开销。
// 手动触发广播 Join
import org.apache.spark.sql.functions.broadcast
largeDf.join(broadcast(smallDf), "user_id").show()
2. Shuffle Hash Join(SHJ)
原理:使用相同哈希函数对两张表按 Join Key 重新分区,每个分区内本地构建 Hash Map 后执行 Join。
触发条件:
- 仅支持等值 Join
spark.sql.join.preferSortMergeJoin = false- 小表大小 × 3 ≤ 大表大小
- 小表大小 <
autoBroadcastJoinThreshold × shuffle分区数
3. Shuffle Sort Merge Join(SMJ)
原理:两张表按 Join Key Shuffle 后分别排序,再以归并方式合并,无需将整个分区加载到内存。
触发条件:
- 仅支持等值 Join(Key 需可排序)
spark.sql.join.preferSortMergeJoin = true(默认)
优势:适合两张大表的 Join,内存友好,是生产环境的默认策略。
4. Cartesian Product Join(笛卡尔积)
当没有 Join 条件时触发,输出行数 = 左表行数 × 右表行数。生产环境应极力避免。
5. Broadcast Nested Loop Join(BNLJ)
原理:广播小表,对大表的每一行与小表做嵌套循环扫描。
触发条件:支持非等值 Join(<、>、LIKE 等条件)和所有 Join 类型。
劣势:因嵌套扫描效率较低,仅在无法使用其他策略时选择。
Join 策略选择优先级
条件满足 → BHJ(最优)
→ SHJ(中等)
→ SMJ(默认大表策略)
→ Cartesian(无条件时)
→ BNLJ(非等值 Join 兜底)
Catalyst 优化器:SQL 解析全流程
Catalyst 是 SparkSQL 的核心,利用 Scala 的模式匹配优雅地处理查询树变换。
处理流水线
SQL 字符串
↓ Parser(ANTLR 解析)
AST(未解析的抽象语法树)
↓ Analyzer(规则解析)
Resolved Logical Plan(已解析的逻辑计划)
↓ Optimizer(优化规则)
Optimized Logical Plan(优化后的逻辑计划)
↓ Physical Planner(物理规划)
Physical Plan(物理执行计划)
↓ Code Generation(代码生成)
RDD 执行
各阶段详解
1. Parser(解析)
将 SQL 字符串转换为 AST,节点包含未解析的引用(UnresolvedRelation、UnresolvedAttribute)。
2. Analyzer(分析)
通过 Catalog 解析表名和列引用,应用规则如:
ResolveRelations:解析表名为具体的数据源ResolveFunctions:解析函数引用
3. Optimizer(优化)
应用基于规则的优化:
| 优化规则 | 效果 |
|---|---|
| 谓词下推(Predicate Pushdown) | 将过滤条件下推到数据源,减少读取量 |
| 列裁剪(Column Pruning) | 只读取查询需要的列 |
| 常量折叠(Constant Folding) | 预计算常量表达式(如 100 + 10 = 110) |
| Whole-Stage CodeGen | 将多个小操作内联为一个大函数,减少调用开销 |
4. Physical Planning(物理规划)
将逻辑计划映射为具体的 Spark 执行操作,选择 Join 策略、分区方式等。
实战:查看执行计划
val df = spark.sql(
"""
|SELECT SUM(v) AS total_score, name
|FROM (
| SELECT stu.id, 100 + 10 + score.score AS v, name
| FROM stu
| JOIN score ON stu.id = score.id
| WHERE stu.age >= 11
|) tmp
|GROUP BY name
""".stripMargin
)
// 查看优化后的逻辑计划
println(df.queryExecution.optimizedPlan)
// 查看完整执行计划(含物理计划)
df.explain(true)
优化效果示例:
100 + 10 + score中常量部分预计算为110 + scorestu.age >= 11过滤条件下推到stu表扫描阶段,在 Join 前减少数据量
关键配置参数
| 参数 | 默认值 | 说明 |
|---|---|---|
spark.sql.autoBroadcastJoinThreshold | 10MB | 触发 BHJ 的表大小阈值,-1 禁用广播 |
spark.sql.join.preferSortMergeJoin | true | 优先使用 SMJ 而非 SHJ |
spark.sql.shuffle.partitions | 200 | Shuffle 后的分区数,影响 Join 并行度 |
// 调大广播阈值,让更多小表走 BHJ
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "50m")
// 增加 Shuffle 分区数应对大数据量
spark.conf.set("spark.sql.shuffle.partitions", "500")
小结
理解 SparkSQL 的 Join 策略选择与 Catalyst 优化流程,是调优分布式查询性能的基础:
- 优先让小表走 BHJ,消除 Shuffle
- 两张大表默认走 SMJ,内存稳定
- 利用
explain()和queryExecution观察实际执行计划 - 合理设置
shuffle.partitions避免过多或过少分区