本文是大数据系列第 78 篇,深入讲解 Spark 广播变量的原理与应用,重点介绍如何用广播优化大小表 Join 的性能。

完整图文版(含截图):CSDN 原文 | 掘金

广播变量的核心问题

在没有广播变量的情况下,若 RDD 的 Task 闭包中引用了一个外部集合(如字典表),该集合会随 Task 序列化,每个 Task 都携带一份副本发送到 Executor。当集合较大且 Task 数量众多时,网络传输量 = 集合大小 × Task 数量,严重消耗驱动端带宽和集群内存。

广播变量将数据每个 Executor 只发送一次,所有运行在该 Executor 上的 Task 共享同一份副本,网络传输量降至 集合大小 × Executor 数量。

工作原理

Driver
  │  broadcast(data)
  ├──────────────► Executor 1 (缓存一份副本)
  │                  ├── Task 1 → data.value
  │                  ├── Task 2 → data.value
  │                  └── Task 3 → data.value
  ├──────────────► Executor 2 (缓存一份副本)
  │                  ├── Task 4 → data.value
  │                  └── Task 5 → data.value
  └──────────────► Executor N ...

广播数据是只读不可变的——一旦广播完成,不能在 Executor 端修改内容。

基础用法

val sc = spark.sparkContext

// 创建广播变量(在 Driver 端)
val productMap = Map(1 -> "商品A", 2 -> "商品B", 3 -> "商品C")
val broadcastMap = sc.broadcast(productMap)

// 在 Executor 端通过 .value 访问
val orders = sc.parallelize(Seq((1, 100), (2, 200), (3, 150)))
val result = orders.map { case (productId, amount) =>
  val productName = broadcastMap.value.getOrElse(productId, "未知商品")
  (productName, amount)
}

result.collect().foreach(println)
// 输出:(商品A,100), (商品B,200), (商品C,150)

// 手动释放广播变量(可选)
broadcastMap.unpersist()
broadcastMap.destroy() // 彻底销毁

典型应用:MapSideJoin

广播变量最重要的应用场景是小表关联大表(替代 shuffle join)。

普通 join 需要将两张表的数据按 key shuffle 到同一分区,开销巨大;将小表广播后,每个 Executor 直接在本地做 map 端关联,完全消除 shuffle

// 普通 join(产生 shuffle)
val result1 = ordersRDD.join(productsRDD)  // ~2.2 秒

// MapSideJoin(广播小表)
val productsBroadcast = sc.broadcast(productsRDD.collectAsMap())
val result2 = ordersRDD.map { case (productId, amount) =>
  val productName = productsBroadcast.value(productId)
  (productName, amount)
}  // ~0.1 秒,提升约 20 倍

关键配置参数

参数默认值说明
spark.broadcast.blockSize4m广播传输的分块大小;GB 级数据建议调大到 8m-32m
spark.broadcast.compresstrue启用 Snappy 压缩;文本类数据可减少 50-70% 体积
spark.broadcast.checksumtrue传输完整性校验;增加约 1-3% CPU 开销
spark.sql.autoBroadcastJoinThreshold10mSparkSQL 自动广播小表的大小阈值

最佳实践

  1. 控制大小:广播数据建议控制在 1GB 以内,否则序列化和网络传输本身成为瓶颈
  2. 只广播必要数据:若只需要字典中的部分键值,先在 Driver 端过滤再广播
  3. 及时释放:长时间运行的应用中,不再需要的广播变量应调用 unpersist() 释放 Executor 内存
  4. 压缩优化:对结构化数据(JSON、序列化对象)开启压缩可显著降低内存和传输开销
  5. 避免广播可变对象:广播后的数据在各 Executor 上是独立副本,驱动端修改不会同步

广播变量 vs 累加器

广播变量累加器
数据流向Driver → Executor(只读)Executor → Driver(只写聚合)
典型用途共享查找表、配置、模型参数计数、求和、调试统计
可修改仅 Driver 端读取