本文是大数据系列第 78 篇,深入讲解 Spark 广播变量的原理与应用,重点介绍如何用广播优化大小表 Join 的性能。
广播变量的核心问题
在没有广播变量的情况下,若 RDD 的 Task 闭包中引用了一个外部集合(如字典表),该集合会随 Task 序列化,每个 Task 都携带一份副本发送到 Executor。当集合较大且 Task 数量众多时,网络传输量 = 集合大小 × Task 数量,严重消耗驱动端带宽和集群内存。
广播变量将数据每个 Executor 只发送一次,所有运行在该 Executor 上的 Task 共享同一份副本,网络传输量降至 集合大小 × Executor 数量。
工作原理
Driver
│ broadcast(data)
├──────────────► Executor 1 (缓存一份副本)
│ ├── Task 1 → data.value
│ ├── Task 2 → data.value
│ └── Task 3 → data.value
├──────────────► Executor 2 (缓存一份副本)
│ ├── Task 4 → data.value
│ └── Task 5 → data.value
└──────────────► Executor N ...
广播数据是只读不可变的——一旦广播完成,不能在 Executor 端修改内容。
基础用法
val sc = spark.sparkContext
// 创建广播变量(在 Driver 端)
val productMap = Map(1 -> "商品A", 2 -> "商品B", 3 -> "商品C")
val broadcastMap = sc.broadcast(productMap)
// 在 Executor 端通过 .value 访问
val orders = sc.parallelize(Seq((1, 100), (2, 200), (3, 150)))
val result = orders.map { case (productId, amount) =>
val productName = broadcastMap.value.getOrElse(productId, "未知商品")
(productName, amount)
}
result.collect().foreach(println)
// 输出:(商品A,100), (商品B,200), (商品C,150)
// 手动释放广播变量(可选)
broadcastMap.unpersist()
broadcastMap.destroy() // 彻底销毁
典型应用:MapSideJoin
广播变量最重要的应用场景是小表关联大表(替代 shuffle join)。
普通 join 需要将两张表的数据按 key shuffle 到同一分区,开销巨大;将小表广播后,每个 Executor 直接在本地做 map 端关联,完全消除 shuffle。
// 普通 join(产生 shuffle)
val result1 = ordersRDD.join(productsRDD) // ~2.2 秒
// MapSideJoin(广播小表)
val productsBroadcast = sc.broadcast(productsRDD.collectAsMap())
val result2 = ordersRDD.map { case (productId, amount) =>
val productName = productsBroadcast.value(productId)
(productName, amount)
} // ~0.1 秒,提升约 20 倍
关键配置参数
| 参数 | 默认值 | 说明 |
|---|---|---|
spark.broadcast.blockSize | 4m | 广播传输的分块大小;GB 级数据建议调大到 8m-32m |
spark.broadcast.compress | true | 启用 Snappy 压缩;文本类数据可减少 50-70% 体积 |
spark.broadcast.checksum | true | 传输完整性校验;增加约 1-3% CPU 开销 |
spark.sql.autoBroadcastJoinThreshold | 10m | SparkSQL 自动广播小表的大小阈值 |
最佳实践
- 控制大小:广播数据建议控制在 1GB 以内,否则序列化和网络传输本身成为瓶颈
- 只广播必要数据:若只需要字典中的部分键值,先在 Driver 端过滤再广播
- 及时释放:长时间运行的应用中,不再需要的广播变量应调用
unpersist()释放 Executor 内存 - 压缩优化:对结构化数据(JSON、序列化对象)开启压缩可显著降低内存和传输开销
- 避免广播可变对象:广播后的数据在各 Executor 上是独立副本,驱动端修改不会同步
广播变量 vs 累加器
| 广播变量 | 累加器 | |
|---|---|---|
| 数据流向 | Driver → Executor(只读) | Executor → Driver(只写聚合) |
| 典型用途 | 共享查找表、配置、模型参数 | 计数、求和、调试统计 |
| 可修改 | 否 | 仅 Driver 端读取 |