This is article 84 in the Big Data series, deeply analyzing SparkSQL kernel’s Join strategy auto-selection logic and SQL parsing optimization flow.

SparkSQL Architecture Layers

SparkSQL consists of four core layers:

  • Core Layer: Responsible for data I/O, interacting with Spark Core
  • Catalyst Optimizer: Responsible for SQL parsing, binding, and optimization (core component)
  • Hive Support Layer: Handles legacy data
  • ThriftServer: Provides JDBC/ODBC access interface

Five Join Execution Strategies

Spark automatically selects strategies based on table size, Join type (equi/non-equi), and whether keys are sortable.

1. Broadcast Hash Join (BHJ)

Principle: Broadcasts small table data to all Executor nodes, completes Join on the Map side, completely avoiding Shuffle.

Trigger conditions:

  • Small table size < spark.sql.autoBroadcastJoinThreshold (default 10MB)
  • Only supports equi-Join

Advantage: Optimal performance when conditions are met, no network Shuffle overhead.

// Manually trigger broadcast Join
import org.apache.spark.sql.functions.broadcast

largeDf.join(broadcast(smallDf), "user_id").show()

2. Shuffle Hash Join (SHJ)

Principle: Uses the same hash function to repartition both tables by Join Key, builds Hash Map locally in each partition, then executes Join.

Trigger conditions:

  • Only supports equi-Join
  • spark.sql.join.preferSortMergeJoin = false
  • Small table size × 3 ≤ Large table size
  • Small table size < autoBroadcastJoinThreshold × shuffle partition count

3. Shuffle Sort Merge Join (SMJ)

Principle: After Shuffling both tables by Join Key, sorts each partition, then merges using merge-sort approach—no need to load entire partition into memory.

Trigger conditions:

  • Only supports equi-Join (keys must be sortable)
  • spark.sql.join.preferSortMergeJoin = true (default)

Advantage: Suitable for joining two large tables, memory-friendly, default production strategy.

4. Cartesian Product Join

Triggered when there is no Join condition—output rows = left table rows × right table rows. Should be strictly avoided in production.

5. Broadcast Nested Loop Join (BNLJ)

Principle: Broadcasts small table, performs nested loop scan for each row of large table against small table.

Trigger conditions: Supports non-equi-Join (<, >, LIKE conditions) and all Join types.

Disadvantage: Due to nested scan inefficiency, only selected when other strategies cannot be used.

Join Strategy Selection Priority

Conditions met → BHJ (optimal)
               → SHJ (medium)
               → SMJ (default large table strategy)
               → Cartesian (when no condition)
               → BNLJ (non-equi Join fallback)

Catalyst Optimizer: SQL Parsing Complete Flow

Catalyst is the core of SparkSQL, elegantly handling query tree transformations using Scala’s pattern matching.

Processing Pipeline

SQL String
    ↓  Parser (ANTLR parsing)
AST (Unresolved Abstract Syntax Tree)
    ↓  Analyzer (Rule resolution)
Resolved Logical Plan
    ↓  Optimizer (Optimization rules)
Optimized Logical Plan
    ↓  Physical Planning
Physical Plan
    ↓  Code Generation
RDD Execution

Stage Details

1. Parser

Converts SQL string to AST with unresolved references (UnresolvedRelation, UnresolvedAttribute).

2. Analyzer

Resolves table names and column references through Catalog, applies rules like:

  • ResolveRelations: Resolves table names to specific data sources
  • ResolveFunctions: Resolves function references

3. Optimizer

Applies rule-based optimization:

Optimization RuleEffect
Predicate PushdownPushes filter conditions to data source, reducing read volume
Column PruningOnly reads columns needed by query
Constant FoldingPre-computes constant expressions (e.g., 100 + 10 = 110)
Whole-Stage CodeGenInlines multiple small operations into one large function, reducing call overhead

4. Physical Planning

Maps logical plan to specific Spark execution operations, selects Join strategies, partitioning methods, etc.

Practice: View Execution Plan

val df = spark.sql(
  """
    |SELECT SUM(v) AS total_score, name
    |FROM (
    |  SELECT stu.id, 100 + 10 + score.score AS v, name
    |  FROM stu
    |  JOIN score ON stu.id = score.id
    |  WHERE stu.age >= 11
    |) tmp
    |GROUP BY name
  """.stripMargin
)

// View optimized logical plan
println(df.queryExecution.optimizedPlan)

// View full execution plan (including physical plan)
df.explain(true)

Optimization example:

  • Constants pre-computed: 100 + 10 + score becomes 110 + score
  • Filter predicate stu.age >= 11 pushed down to stu table scan, reducing data before Join

Key Configuration Parameters

ParameterDefaultDescription
spark.sql.autoBroadcastJoinThreshold10MBTable size threshold for BHJ, -1 to disable
spark.sql.join.preferSortMergeJointruePrefer SMJ over SHJ
spark.sql.shuffle.partitions200Partition count after Shuffle, affects Join parallelism
// Increase broadcast threshold for more small tables to use BHJ
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "50m")

// Increase Shuffle partitions for large data volumes
spark.conf.set("spark.sql.shuffle.partitions", "500")

Summary

Understanding SparkSQL’s Join strategy selection and Catalyst optimization flow is fundamental to tuning distributed query performance:

  • Prefer BHJ for small tables to eliminate Shuffle
  • Two large tables use SMJ by default for stable memory
  • Use explain() and queryExecution to observe actual execution plans
  • Set shuffle.partitions appropriately to avoid too many or too few partitions