本文是大数据系列第 77 篇,深入讲解 Spark RDD 的容错机制,重点分析 Checkpoint 的原理、使用场景和注意事项。

完整图文版(含截图):CSDN 原文 | 掘金

RDD 容错的两种路径

Spark 通过 血统(Lineage)重算 提供基础容错能力——当某个分区丢失时,可沿依赖链追溯并重新计算。但对于依赖链极长或计算代价极高的 RDD,重算成本过大,此时需要 Checkpoint 来切断依赖链并持久化数据。

Checkpoint 执行流程

// 1. 配置 checkpoint 目录(通常指向 HDFS)
sc.setCheckpointDir("hdfs://namenode:8020/spark-checkpoint")

// 2. 标记需要 checkpoint 的 RDD
val rdd = sc.textFile("hdfs://data/input")
  .flatMap(_.split(" "))
  .map((_, 1))
  .reduceByKey(_ + _)

rdd.checkpoint()

// 3. 触发 Action——此时 Spark 会额外提交一个 Job 将数据写入 HDFS
rdd.count()

Checkpoint 的执行是两阶段的:

  1. 调用 checkpoint() 时仅注册标记,不立即执行;
  2. 第一个 Action 完成后,Spark 提交独立的 checkpoint Job,将 RDD 数据写入配置目录。

因此,被 checkpoint 的 RDD 通常建议先 cache(),避免为写 checkpoint 而再次重算原始数据。

rdd.cache()
rdd.checkpoint()
rdd.count() // 先从缓存计算,再将缓存结果写入 checkpoint

Checkpoint vs Persist/Cache

维度persist / cachecheckpoint
存储位置Executor 内存 / 本地磁盘可靠分布式存储(HDFS)
生命周期应用结束后自动清理持久保留,需手动删除
依赖链处理保留完整血统截断依赖链,从 checkpoint RDD 重新开始
节点故障Executor 崩溃后数据丢失不受单节点故障影响
适用场景复用频繁的中间结果超长依赖链、迭代算法

适用场景

1. 迭代算法(PageRank、K-Means 等)

每轮迭代产生新 RDD,依赖链随迭代次数线性增长。建议每隔若干轮对核心 RDD 做一次 checkpoint,防止依赖链过长导致驱动端 OOM 或 task 序列化失败。

2. 长 Lineage 的流水线作业

当 ETL 流水线经过数十次 transformation,任何一步失败都需要从头重算时,在关键检查点落地 HDFS 可大幅缩短故障恢复时间。

3. 多 Job 共享中间结果

将耗时预处理结果 checkpoint 后,多个下游 Job 可直接读取,无需重复计算。

RDD 分区策略

文章还介绍了两种常用分区器:

HashPartitioner(默认):对 key 取哈希后按分区数取模,适合大多数场景。同一 key 保证落在同一分区。

rdd.partitionBy(new HashPartitioner(100))

RangePartitioner:基于蓄水池采样估算数据分布,将数据按值域范围均匀分配到各分区。适合需要有序输出或范围查询的场景,sortByKey 内部即使用此分区器。

验证 Checkpoint 是否生效

println(rdd.isCheckpointed)       // false(Action 前)
rdd.count()
println(rdd.isCheckpointed)       // true(Action 后)
println(rdd.getCheckpointFile)    // 输出 HDFS 路径
println(rdd.toDebugString)        // 依赖链已从 checkpoint RDD 开始

性能权衡

  • 优点:可靠容错、截断过长依赖链、支持跨应用复用
  • 代价:额外的 HDFS I/O 开销、需要更多存储空间、checkpoint Job 增加执行时间

实践建议:生产环境中对运行超过 1 小时的长流水线和迭代算法强制启用 checkpoint,checkpoint 目录定期清理以避免存储膨胀。