This is article 78 in the Big Data series, deeply explaining Spark broadcast variable principle and application, focusing on how to use broadcast to optimize large-small table Join performance.

Core Problem of Broadcast Variables

Without broadcast variables, if an RDD’s Task closure references an external collection (like a dictionary table), that collection will be serialized and each Task carries a copy sent to Executor. When collection is large and Task count is many, network transmission = collection size × Task count, severely consuming Driver bandwidth and cluster memory.

Broadcast variable sends data only once per Executor, all Tasks running on that Executor share one copy, network transmission reduced to collection size × Executor count.

Working Principle

Driver
  │  broadcast(data)
  ├──────────────► Executor 1 (cache one copy)
  │                  ├── Task 1 → data.value
  │                  ├── Task 2 → data.value
  │                  └── Task 3 → data.value
  ├──────────────► Executor 2 (cache one copy)
  │                  ├── Task 4 → data.value
  │                  └── Task 5 → data.value
  └──────────────► Executor N ...

Broadcast data is read-only immutable — once broadcast completes, cannot modify content on Executor side.

Basic Usage

val sc = spark.sparkContext

// Create broadcast variable (on Driver side)
val productMap = Map(1 -> "ProductA", 2 -> "ProductB", 3 -> "ProductC")
val broadcastMap = sc.broadcast(productMap)

// Access via .value on Executor side
val orders = sc.parallelize(Seq((1, 100), (2, 200), (3, 150)))
val result = orders.map { case (productId, amount) =>
  val productName = broadcastMap.value.getOrElse(productId, "Unknown Product")
  (productName, amount)
}

result.collect().foreach(println)
// Output: (ProductA,100), (ProductB,200), (ProductC,150)

// Manually release broadcast variable (optional)
broadcastMap.unpersist()
broadcastMap.destroy() // Completely destroy

Typical Application: MapSideJoin

Most important application scenario for broadcast variable is small table joined with large table (replacing shuffle join).

Normal join needs to shuffle data from both tables to same partition by key, huge overhead; after broadcasting small table, each Executor does map-side join locally, completely eliminating shuffle.

// Normal join (produces shuffle)
val result1 = ordersRDD.join(productsRDD)  // ~2.2 seconds

// MapSideJoin (broadcast small table)
val productsBroadcast = sc.broadcast(productsRDD.collectAsMap())
val result2 = ordersRDD.map { case (productId, amount) =>
  val productName = productsBroadcast.value(productId)
  (productName, amount)
}  // ~0.1 seconds, ~20x improvement

Key Configuration Parameters

ParameterDefaultDescription
spark.broadcast.blockSize4mBroadcast transmission chunk size; recommend 8m-32m for GB-level data
spark.broadcast.compresstrueEnable Snappy compression; can reduce 50-70% volume for text data
spark.broadcast.checksumtrueTransmission integrity check; adds ~1-3% CPU overhead
spark.sql.autoBroadcastJoinThreshold10mSparkSQL auto broadcast small table size threshold

Best Practices

  1. Control size: Broadcast data recommended within 1GB, otherwise serialization and network transmission itself becomes bottleneck
  2. Broadcast only necessary data: If only some key-values needed from dictionary, filter on Driver side first then broadcast
  3. Release timely: In long-running applications, broadcast variables no longer needed should call unpersist() to release Executor memory
  4. Compression optimization: Enable compression for structured data (JSON, serialized objects) can significantly reduce memory and transmission overhead
  5. Avoid broadcasting mutable objects: Broadcast data is independent copy on each Executor, driver-side modifications won’t sync

Broadcast Variable vs Accumulator

Broadcast VariableAccumulator
Data flowDriver → Executor (read-only)Executor → Driver (write-only aggregation)
Typical usageShared lookup table, config, model parametersCounting, summing, debugging statistics
ModifiableNoOnly Driver side read