本文是大数据系列第 76 篇,系统梳理 Spark 进程通信机制、序列化策略与 RDD 执行原理。

完整图文版(含截图):CSDN 原文 | 掘金

Driver-Executor 架构与进程通信

Spark 采用 Driver-Executor 主从架构。Driver 通过 SparkContext 负责资源申请、任务调度与监控;Executor 运行在各 Worker 节点,负责执行具体计算任务。由于两者运行在不同 JVM 进程中,所有跨进程数据传输(包括任务代码、闭包变量、中间结果)都必须经过序列化处理。

序列化方式对比

Spark 支持两种序列化实现:

方式默认性能兼容性
Java 序列化较慢,序列化结果较大支持所有实现 Serializable 的类
Kryo 序列化速度快 3-10x,体积更小需手动注册类型

启用 Kryo 序列化:

val conf = new SparkConf()
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .registerKryoClasses(Array(classOf[MyClass]))

对于计算密集型或大数据量 shuffle 的场景,推荐切换到 Kryo 序列化。

闭包序列化问题排查

RDD 算子(如 mapfilter)内的闭包会被序列化后发送到 Executor 执行。若闭包引用了不可序列化的对象,将抛出 NotSerializableException

三种解决方案

方案一:使用 case class(推荐)

Scala 的 case class 默认实现了序列化接口,适合封装传入闭包的配置对象:

case class Config(threshold: Double)

val config = Config(0.5)
rdd.filter(x => x > config.threshold)

方案二:实现 Serializable 接口

对于 Java 类或已有的普通 Scala 类,显式继承 Serializable

class MyProcessor extends Serializable {
  def process(x: Int): Int = x * 2
}

方案三:使用 lazy 延迟初始化

对于数据库连接、HTTP 客户端等本质上不可序列化的对象,在 Executor 端通过 lazy val 延迟创建,避免序列化传输:

object DBHelper extends Serializable {
  lazy val connection = createConnection() // 在 Executor 端初始化
}

RDD 依赖关系

RDD 之间的转换形成有向无环图(DAG),依赖分为两类:

  • 窄依赖(Narrow Dependency):父 RDD 的每个分区至多被子 RDD 的一个分区依赖(如 mapfilter)。可在同一 Stage 内流水线执行,无需 shuffle。
  • 宽依赖(Wide Dependency):父 RDD 的一个分区可被子 RDD 的多个分区依赖(如 groupByKeyreduceByKey)。必须等所有父分区写完 shuffle 文件才能继续,触发 Stage 边界。

任务执行层级

Application
  └── Job(每次 Action 触发一个 Job)
        └── Stage(以 Shuffle 为边界划分)
              └── Task(每个分区对应一个 Task)

DAGScheduler 负责将 DAG 拆分为 Stage,TaskScheduler 将 Task 分发到具体 Executor 执行。

RDD 持久化

cache()persist(StorageLevel.MEMORY_ONLY) 的快捷方式。常用存储级别:

级别说明
MEMORY_ONLY以反序列化对象存于内存,最快,内存不足时重算
MEMORY_AND_DISK内存放不下则溢写磁盘,推荐通用场景
MEMORY_ONLY_SER序列化存储,内存占用更小但 CPU 开销略高
DISK_ONLY仅写磁盘,适合超大 RDD

持久化是惰性的——调用 persist() 后不会立即触发缓存,第一次 Action 执行时才真正写入。可通过 unpersist() 手动释放缓存,避免内存压力。