This is article 76 in the Big Data series, systematically reviewing Spark process communication mechanism, serialization strategy and RDD execution principle.
Driver-Executor Architecture and Process Communication
Spark uses Driver-Executor master-slave architecture. Driver manages resource application, task scheduling and monitoring through SparkContext; Executor runs on each Worker node, responsible for executing specific computation tasks. Since they run in different JVM processes, all cross-process data transmission (including task code, closure variables, intermediate results) must go through serialization.
Serialization Method Comparison
Spark supports two serialization implementations:
| Method | Default | Performance | Compatibility |
|---|---|---|---|
| Java serialization | Yes | Slower, larger serialized result | Supports all Serializable classes |
| Kryo serialization | No | 3-10x faster, smaller size | Requires manual type registration |
Enable Kryo serialization:
val conf = new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[MyClass]))
For compute-intensive or large data shuffle scenarios, recommend switching to Kryo serialization.
Closure Serialization Problem Troubleshooting
RDD operators (like map, filter) closures are serialized then sent to Executor for execution. If closure references non-serializable object, will throw NotSerializableException.
Three Solutions
Solution 1: Use case class (Recommended)
Scala’s case class implements Serializable by default, suitable for encapsulating configuration objects passed to closures:
case class Config(threshold: Double)
val config = Config(0.5)
rdd.filter(x => x > config.threshold)
Solution 2: Implement Serializable Interface
For Java classes or ordinary Scala classes, explicitly inherit Serializable:
class MyProcessor extends Serializable {
def process(x: Int): Int = x * 2
}
Solution 3: Use lazy for delayed initialization
For inherently non-serializable objects like database connections, HTTP clients, create on Executor side via lazy val to avoid serialization:
object DBHelper extends Serializable {
lazy val connection = createConnection() // Initialize on Executor side
}
RDD Dependencies
RDD transformations form a Directed Acyclic Graph (DAG), dependencies divided into two types:
- Narrow Dependency: Each partition of parent RDD depends on at most one partition of child RDD (like
map,filter). Can pipeline execute within same Stage, no shuffle needed. - Wide Dependency: One partition of parent RDD can depend on multiple partitions of child RDD (like
groupByKey,reduceByKey). Must wait for all parent partitions to write shuffle files before continuing, triggers Stage boundary.
Task Execution Hierarchy
Application
└── Job (each Action triggers a Job)
└── Stage (divided by Shuffle boundary)
└── Task (each partition corresponds to one Task)
DAGScheduler is responsible for splitting DAG into Stages, TaskScheduler distributes Tasks to specific Executors for execution.
RDD Persistence
cache() is shortcut for persist(StorageLevel.MEMORY_ONLY). Common storage levels:
| Level | Description |
|---|---|
MEMORY_ONLY | Store as deserialized objects in memory, fastest, recompute if insufficient memory |
MEMORY_AND_DISK | Overflow to disk if memory insufficient, recommended for general scenarios |
MEMORY_ONLY_SER | Serialized storage, smaller memory footprint but slightly higher CPU overhead |
DISK_ONLY | Write to disk only, suitable for very large RDDs |
Persistence is lazy — after calling persist(), doesn’t immediately trigger caching. First Action execution is when data is actually written. Can manually release cache via unpersist() to avoid memory pressure.