本文是大数据系列第 76 篇,系统梳理 Spark 进程通信机制、序列化策略与 RDD 执行原理。
Driver-Executor 架构与进程通信
Spark 采用 Driver-Executor 主从架构。Driver 通过 SparkContext 负责资源申请、任务调度与监控;Executor 运行在各 Worker 节点,负责执行具体计算任务。由于两者运行在不同 JVM 进程中,所有跨进程数据传输(包括任务代码、闭包变量、中间结果)都必须经过序列化处理。
序列化方式对比
Spark 支持两种序列化实现:
| 方式 | 默认 | 性能 | 兼容性 |
|---|---|---|---|
| Java 序列化 | 是 | 较慢,序列化结果较大 | 支持所有实现 Serializable 的类 |
| Kryo 序列化 | 否 | 速度快 3-10x,体积更小 | 需手动注册类型 |
启用 Kryo 序列化:
val conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[MyClass]))
对于计算密集型或大数据量 shuffle 的场景,推荐切换到 Kryo 序列化。
闭包序列化问题排查
RDD 算子(如 map、filter)内的闭包会被序列化后发送到 Executor 执行。若闭包引用了不可序列化的对象,将抛出 NotSerializableException。
三种解决方案
方案一:使用 case class(推荐)
Scala 的 case class 默认实现了序列化接口,适合封装传入闭包的配置对象:
case class Config(threshold: Double)
val config = Config(0.5)
rdd.filter(x => x > config.threshold)
方案二:实现 Serializable 接口
对于 Java 类或已有的普通 Scala 类,显式继承 Serializable:
class MyProcessor extends Serializable {
def process(x: Int): Int = x * 2
}
方案三:使用 lazy 延迟初始化
对于数据库连接、HTTP 客户端等本质上不可序列化的对象,在 Executor 端通过 lazy val 延迟创建,避免序列化传输:
object DBHelper extends Serializable {
lazy val connection = createConnection() // 在 Executor 端初始化
}
RDD 依赖关系
RDD 之间的转换形成有向无环图(DAG),依赖分为两类:
- 窄依赖(Narrow Dependency):父 RDD 的每个分区至多被子 RDD 的一个分区依赖(如
map、filter)。可在同一 Stage 内流水线执行,无需 shuffle。 - 宽依赖(Wide Dependency):父 RDD 的一个分区可被子 RDD 的多个分区依赖(如
groupByKey、reduceByKey)。必须等所有父分区写完 shuffle 文件才能继续,触发 Stage 边界。
任务执行层级
Application
└── Job(每次 Action 触发一个 Job)
└── Stage(以 Shuffle 为边界划分)
└── Task(每个分区对应一个 Task)
DAGScheduler 负责将 DAG 拆分为 Stage,TaskScheduler 将 Task 分发到具体 Executor 执行。
RDD 持久化
cache() 是 persist(StorageLevel.MEMORY_ONLY) 的快捷方式。常用存储级别:
| 级别 | 说明 |
|---|---|
MEMORY_ONLY | 以反序列化对象存于内存,最快,内存不足时重算 |
MEMORY_AND_DISK | 内存放不下则溢写磁盘,推荐通用场景 |
MEMORY_ONLY_SER | 序列化存储,内存占用更小但 CPU 开销略高 |
DISK_ONLY | 仅写磁盘,适合超大 RDD |
持久化是惰性的——调用 persist() 后不会立即触发缓存,第一次 Action 执行时才真正写入。可通过 unpersist() 手动释放缓存,避免内存压力。