This is article 84 in the Big Data series, deeply analyzing SparkSQL kernel’s Join strategy auto-selection logic and SQL parsing optimization flow.
SparkSQL Architecture Layers
SparkSQL consists of four core layers:
- Core Layer: Responsible for data I/O, interacting with Spark Core
- Catalyst Optimizer: Responsible for SQL parsing, binding, and optimization (core component)
- Hive Support Layer: Handles legacy data
- ThriftServer: Provides JDBC/ODBC access interface
Five Join Execution Strategies
Spark automatically selects strategies based on table size, Join type (equi/non-equi), and whether keys are sortable.
1. Broadcast Hash Join (BHJ)
Principle: Broadcasts small table data to all Executor nodes, completes Join on the Map side, completely avoiding Shuffle.
Trigger conditions:
- Small table size <
spark.sql.autoBroadcastJoinThreshold(default 10MB) - Only supports equi-Join
Advantage: Optimal performance when conditions are met, no network Shuffle overhead.
// Manually trigger broadcast Join
import org.apache.spark.sql.functions.broadcast
largeDf.join(broadcast(smallDf), "user_id").show()
2. Shuffle Hash Join (SHJ)
Principle: Uses the same hash function to repartition both tables by Join Key, builds Hash Map locally in each partition, then executes Join.
Trigger conditions:
- Only supports equi-Join
spark.sql.join.preferSortMergeJoin = false- Small table size × 3 ≤ Large table size
- Small table size <
autoBroadcastJoinThreshold × shuffle partition count
3. Shuffle Sort Merge Join (SMJ)
Principle: After Shuffling both tables by Join Key, sorts each partition, then merges using merge-sort approach—no need to load entire partition into memory.
Trigger conditions:
- Only supports equi-Join (keys must be sortable)
spark.sql.join.preferSortMergeJoin = true(default)
Advantage: Suitable for joining two large tables, memory-friendly, default production strategy.
4. Cartesian Product Join
Triggered when there is no Join condition—output rows = left table rows × right table rows. Should be strictly avoided in production.
5. Broadcast Nested Loop Join (BNLJ)
Principle: Broadcasts small table, performs nested loop scan for each row of large table against small table.
Trigger conditions: Supports non-equi-Join (<, >, LIKE conditions) and all Join types.
Disadvantage: Due to nested scan inefficiency, only selected when other strategies cannot be used.
Join Strategy Selection Priority
Conditions met → BHJ (optimal)
→ SHJ (medium)
→ SMJ (default large table strategy)
→ Cartesian (when no condition)
→ BNLJ (non-equi Join fallback)
Catalyst Optimizer: SQL Parsing Complete Flow
Catalyst is the core of SparkSQL, elegantly handling query tree transformations using Scala’s pattern matching.
Processing Pipeline
SQL String
↓ Parser (ANTLR parsing)
AST (Unresolved Abstract Syntax Tree)
↓ Analyzer (Rule resolution)
Resolved Logical Plan
↓ Optimizer (Optimization rules)
Optimized Logical Plan
↓ Physical Planning
Physical Plan
↓ Code Generation
RDD Execution
Stage Details
1. Parser
Converts SQL string to AST with unresolved references (UnresolvedRelation, UnresolvedAttribute).
2. Analyzer
Resolves table names and column references through Catalog, applies rules like:
ResolveRelations: Resolves table names to specific data sourcesResolveFunctions: Resolves function references
3. Optimizer
Applies rule-based optimization:
| Optimization Rule | Effect |
|---|---|
| Predicate Pushdown | Pushes filter conditions to data source, reducing read volume |
| Column Pruning | Only reads columns needed by query |
| Constant Folding | Pre-computes constant expressions (e.g., 100 + 10 = 110) |
| Whole-Stage CodeGen | Inlines multiple small operations into one large function, reducing call overhead |
4. Physical Planning
Maps logical plan to specific Spark execution operations, selects Join strategies, partitioning methods, etc.
Practice: View Execution Plan
val df = spark.sql(
"""
|SELECT SUM(v) AS total_score, name
|FROM (
| SELECT stu.id, 100 + 10 + score.score AS v, name
| FROM stu
| JOIN score ON stu.id = score.id
| WHERE stu.age >= 11
|) tmp
|GROUP BY name
""".stripMargin
)
// View optimized logical plan
println(df.queryExecution.optimizedPlan)
// View full execution plan (including physical plan)
df.explain(true)
Optimization example:
- Constants pre-computed:
100 + 10 + scorebecomes110 + score - Filter predicate
stu.age >= 11pushed down tostutable scan, reducing data before Join
Key Configuration Parameters
| Parameter | Default | Description |
|---|---|---|
spark.sql.autoBroadcastJoinThreshold | 10MB | Table size threshold for BHJ, -1 to disable |
spark.sql.join.preferSortMergeJoin | true | Prefer SMJ over SHJ |
spark.sql.shuffle.partitions | 200 | Partition count after Shuffle, affects Join parallelism |
// Increase broadcast threshold for more small tables to use BHJ
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "50m")
// Increase Shuffle partitions for large data volumes
spark.conf.set("spark.sql.shuffle.partitions", "500")
Summary
Understanding SparkSQL’s Join strategy selection and Catalyst optimization flow is fundamental to tuning distributed query performance:
- Prefer BHJ for small tables to eliminate Shuffle
- Two large tables use SMJ by default for stable memory
- Use
explain()andqueryExecutionto observe actual execution plans - Set
shuffle.partitionsappropriately to avoid too many or too few partitions