本文是大数据系列第 77 篇,深入讲解 Spark RDD 的容错机制,重点分析 Checkpoint 的原理、使用场景和注意事项。
RDD 容错的两种路径
Spark 通过 血统(Lineage)重算 提供基础容错能力——当某个分区丢失时,可沿依赖链追溯并重新计算。但对于依赖链极长或计算代价极高的 RDD,重算成本过大,此时需要 Checkpoint 来切断依赖链并持久化数据。
Checkpoint 执行流程
// 1. 配置 checkpoint 目录(通常指向 HDFS)
sc.setCheckpointDir("hdfs://namenode:8020/spark-checkpoint")
// 2. 标记需要 checkpoint 的 RDD
val rdd = sc.textFile("hdfs://data/input")
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
rdd.checkpoint()
// 3. 触发 Action——此时 Spark 会额外提交一个 Job 将数据写入 HDFS
rdd.count()
Checkpoint 的执行是两阶段的:
- 调用
checkpoint()时仅注册标记,不立即执行; - 第一个 Action 完成后,Spark 提交独立的 checkpoint Job,将 RDD 数据写入配置目录。
因此,被 checkpoint 的 RDD 通常建议先 cache(),避免为写 checkpoint 而再次重算原始数据。
rdd.cache()
rdd.checkpoint()
rdd.count() // 先从缓存计算,再将缓存结果写入 checkpoint
Checkpoint vs Persist/Cache
| 维度 | persist / cache | checkpoint |
|---|---|---|
| 存储位置 | Executor 内存 / 本地磁盘 | 可靠分布式存储(HDFS) |
| 生命周期 | 应用结束后自动清理 | 持久保留,需手动删除 |
| 依赖链处理 | 保留完整血统 | 截断依赖链,从 checkpoint RDD 重新开始 |
| 节点故障 | Executor 崩溃后数据丢失 | 不受单节点故障影响 |
| 适用场景 | 复用频繁的中间结果 | 超长依赖链、迭代算法 |
适用场景
1. 迭代算法(PageRank、K-Means 等)
每轮迭代产生新 RDD,依赖链随迭代次数线性增长。建议每隔若干轮对核心 RDD 做一次 checkpoint,防止依赖链过长导致驱动端 OOM 或 task 序列化失败。
2. 长 Lineage 的流水线作业
当 ETL 流水线经过数十次 transformation,任何一步失败都需要从头重算时,在关键检查点落地 HDFS 可大幅缩短故障恢复时间。
3. 多 Job 共享中间结果
将耗时预处理结果 checkpoint 后,多个下游 Job 可直接读取,无需重复计算。
RDD 分区策略
文章还介绍了两种常用分区器:
HashPartitioner(默认):对 key 取哈希后按分区数取模,适合大多数场景。同一 key 保证落在同一分区。
rdd.partitionBy(new HashPartitioner(100))
RangePartitioner:基于蓄水池采样估算数据分布,将数据按值域范围均匀分配到各分区。适合需要有序输出或范围查询的场景,sortByKey 内部即使用此分区器。
验证 Checkpoint 是否生效
println(rdd.isCheckpointed) // false(Action 前)
rdd.count()
println(rdd.isCheckpointed) // true(Action 后)
println(rdd.getCheckpointFile) // 输出 HDFS 路径
println(rdd.toDebugString) // 依赖链已从 checkpoint RDD 开始
性能权衡
- 优点:可靠容错、截断过长依赖链、支持跨应用复用
- 代价:额外的 HDFS I/O 开销、需要更多存储空间、checkpoint Job 增加执行时间
实践建议:生产环境中对运行超过 1 小时的长流水线和迭代算法强制启用 checkpoint,checkpoint 目录定期清理以避免存储膨胀。