This is article 76 in the Big Data series, systematically reviewing Spark process communication mechanism, serialization strategy and RDD execution principle.

Driver-Executor Architecture and Process Communication

Spark uses Driver-Executor master-slave architecture. Driver manages resource application, task scheduling and monitoring through SparkContext; Executor runs on each Worker node, responsible for executing specific computation tasks. Since they run in different JVM processes, all cross-process data transmission (including task code, closure variables, intermediate results) must go through serialization.

Serialization Method Comparison

Spark supports two serialization implementations:

MethodDefaultPerformanceCompatibility
Java serializationYesSlower, larger serialized resultSupports all Serializable classes
Kryo serializationNo3-10x faster, smaller sizeRequires manual type registration

Enable Kryo serialization:

val conf = new SparkConf()
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .registerKryoClasses(Array(classOf[MyClass]))

For compute-intensive or large data shuffle scenarios, recommend switching to Kryo serialization.

Closure Serialization Problem Troubleshooting

RDD operators (like map, filter) closures are serialized then sent to Executor for execution. If closure references non-serializable object, will throw NotSerializableException.

Three Solutions

Solution 1: Use case class (Recommended)

Scala’s case class implements Serializable by default, suitable for encapsulating configuration objects passed to closures:

case class Config(threshold: Double)

val config = Config(0.5)
rdd.filter(x => x > config.threshold)

Solution 2: Implement Serializable Interface

For Java classes or ordinary Scala classes, explicitly inherit Serializable:

class MyProcessor extends Serializable {
  def process(x: Int): Int = x * 2
}

Solution 3: Use lazy for delayed initialization

For inherently non-serializable objects like database connections, HTTP clients, create on Executor side via lazy val to avoid serialization:

object DBHelper extends Serializable {
  lazy val connection = createConnection() // Initialize on Executor side
}

RDD Dependencies

RDD transformations form a Directed Acyclic Graph (DAG), dependencies divided into two types:

  • Narrow Dependency: Each partition of parent RDD depends on at most one partition of child RDD (like map, filter). Can pipeline execute within same Stage, no shuffle needed.
  • Wide Dependency: One partition of parent RDD can depend on multiple partitions of child RDD (like groupByKey, reduceByKey). Must wait for all parent partitions to write shuffle files before continuing, triggers Stage boundary.

Task Execution Hierarchy

Application
  └── Job (each Action triggers a Job)
        └── Stage (divided by Shuffle boundary)
              └── Task (each partition corresponds to one Task)

DAGScheduler is responsible for splitting DAG into Stages, TaskScheduler distributes Tasks to specific Executors for execution.

RDD Persistence

cache() is shortcut for persist(StorageLevel.MEMORY_ONLY). Common storage levels:

LevelDescription
MEMORY_ONLYStore as deserialized objects in memory, fastest, recompute if insufficient memory
MEMORY_AND_DISKOverflow to disk if memory insufficient, recommended for general scenarios
MEMORY_ONLY_SERSerialized storage, smaller memory footprint but slightly higher CPU overhead
DISK_ONLYWrite to disk only, suitable for very large RDDs

Persistence is lazy — after calling persist(), doesn’t immediately trigger caching. First Action execution is when data is actually written. Can manually release cache via unpersist() to avoid memory pressure.