本文是大数据系列第 84 篇,深入解析 SparkSQL 内核中 Join 策略的自动选择逻辑与 SQL 解析优化流程。

完整图文版(含截图):CSDN 原文 | 掘金

SparkSQL 架构层次

SparkSQL 由四个核心层组成:

  • Core 层:负责数据 I/O,与 Spark Core 交互
  • Catalyst 优化器:负责 SQL 解析、绑定与优化(核心组件)
  • Hive 支持层:处理历史遗留数据
  • ThriftServer:提供 JDBC/ODBC 接入接口

五种 Join 执行策略

Spark 会根据表大小、Join 类型(等值/非等值)、Key 是否可排序自动选择策略。

1. Broadcast Hash Join(BHJ)

原理:将小表数据广播到所有 Executor 节点,在 Map 端完成 Join,彻底避免 Shuffle。

触发条件

  • 小表大小 < spark.sql.autoBroadcastJoinThreshold(默认 10MB)
  • 仅支持等值 Join

优势:条件满足时性能最优,无网络 Shuffle 开销。

// 手动触发广播 Join
import org.apache.spark.sql.functions.broadcast

largeDf.join(broadcast(smallDf), "user_id").show()

2. Shuffle Hash Join(SHJ)

原理:使用相同哈希函数对两张表按 Join Key 重新分区,每个分区内本地构建 Hash Map 后执行 Join。

触发条件

  • 仅支持等值 Join
  • spark.sql.join.preferSortMergeJoin = false
  • 小表大小 × 3 ≤ 大表大小
  • 小表大小 < autoBroadcastJoinThreshold × shuffle分区数

3. Shuffle Sort Merge Join(SMJ)

原理:两张表按 Join Key Shuffle 后分别排序,再以归并方式合并,无需将整个分区加载到内存。

触发条件

  • 仅支持等值 Join(Key 需可排序)
  • spark.sql.join.preferSortMergeJoin = true(默认)

优势:适合两张大表的 Join,内存友好,是生产环境的默认策略。

4. Cartesian Product Join(笛卡尔积)

当没有 Join 条件时触发,输出行数 = 左表行数 × 右表行数。生产环境应极力避免。

5. Broadcast Nested Loop Join(BNLJ)

原理:广播小表,对大表的每一行与小表做嵌套循环扫描。

触发条件:支持非等值 Join(<>LIKE 等条件)和所有 Join 类型。

劣势:因嵌套扫描效率较低,仅在无法使用其他策略时选择。

Join 策略选择优先级

条件满足 → BHJ(最优)
         → SHJ(中等)
         → SMJ(默认大表策略)
         → Cartesian(无条件时)
         → BNLJ(非等值 Join 兜底)

Catalyst 优化器:SQL 解析全流程

Catalyst 是 SparkSQL 的核心,利用 Scala 的模式匹配优雅地处理查询树变换。

处理流水线

SQL 字符串
    ↓  Parser(ANTLR 解析)
AST(未解析的抽象语法树)
    ↓  Analyzer(规则解析)
Resolved Logical Plan(已解析的逻辑计划)
    ↓  Optimizer(优化规则)
Optimized Logical Plan(优化后的逻辑计划)
    ↓  Physical Planner(物理规划)
Physical Plan(物理执行计划)
    ↓  Code Generation(代码生成)
RDD 执行

各阶段详解

1. Parser(解析)

将 SQL 字符串转换为 AST,节点包含未解析的引用(UnresolvedRelationUnresolvedAttribute)。

2. Analyzer(分析)

通过 Catalog 解析表名和列引用,应用规则如:

  • ResolveRelations:解析表名为具体的数据源
  • ResolveFunctions:解析函数引用

3. Optimizer(优化)

应用基于规则的优化:

优化规则效果
谓词下推(Predicate Pushdown)将过滤条件下推到数据源,减少读取量
列裁剪(Column Pruning)只读取查询需要的列
常量折叠(Constant Folding)预计算常量表达式(如 100 + 10 = 110
Whole-Stage CodeGen将多个小操作内联为一个大函数,减少调用开销

4. Physical Planning(物理规划)

将逻辑计划映射为具体的 Spark 执行操作,选择 Join 策略、分区方式等。

实战:查看执行计划

val df = spark.sql(
  """
    |SELECT SUM(v) AS total_score, name
    |FROM (
    |  SELECT stu.id, 100 + 10 + score.score AS v, name
    |  FROM stu
    |  JOIN score ON stu.id = score.id
    |  WHERE stu.age >= 11
    |) tmp
    |GROUP BY name
  """.stripMargin
)

// 查看优化后的逻辑计划
println(df.queryExecution.optimizedPlan)

// 查看完整执行计划(含物理计划)
df.explain(true)

优化效果示例:

  • 100 + 10 + score 中常量部分预计算为 110 + score
  • stu.age >= 11 过滤条件下推到 stu 表扫描阶段,在 Join 前减少数据量

关键配置参数

参数默认值说明
spark.sql.autoBroadcastJoinThreshold10MB触发 BHJ 的表大小阈值,-1 禁用广播
spark.sql.join.preferSortMergeJointrue优先使用 SMJ 而非 SHJ
spark.sql.shuffle.partitions200Shuffle 后的分区数,影响 Join 并行度
// 调大广播阈值,让更多小表走 BHJ
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "50m")

// 增加 Shuffle 分区数应对大数据量
spark.conf.set("spark.sql.shuffle.partitions", "500")

小结

理解 SparkSQL 的 Join 策略选择与 Catalyst 优化流程,是调优分布式查询性能的基础:

  • 优先让小表走 BHJ,消除 Shuffle
  • 两张大表默认走 SMJ,内存稳定
  • 利用 explain()queryExecution 观察实际执行计划
  • 合理设置 shuffle.partitions 避免过多或过少分区