Spark Checkpoint机制与容错
请解释Spark中的Checkpoint机制,它与cache/persist有什么不同?什么场景下需要使用Checkpoint?
回答
我还是少年
Checkpoint定义: 将RDD的血统(Lineage)信息截断,将RDD数据保存到可靠的外部存储(如HDFS),从而切断依赖链。
与cache/persist的区别: | 维度 | Cache/Persist | Checkpoint | |------|--------------|------------| | 存储位置 | 内存/磁盘(节点本地) | HDFS(跨节点可靠存储)| | 保留Lineage | ✅ 保留完整血统 | ❌ 截断血统,只保留Checkpoint的路径 | | 容错恢复 | 通过Lineage重算 | 直接从HDFS读取 | | 清理方式 | unpersist手动清理 | 作业完成后自动保留直到删除 | | 生命周期 | Application内有效 | 跨Application可读取 |
使用场景:
- 血统链过长: RDD经过百次以上转换,Lineage链过长导致恢复效率低
- 宽依赖的RDD: 涉及shuffle的RDD(如
reduceByKey),重算代价大 - 需要跨Job重用: 同一RDD在多个Action中使用,且不希望每次重算
- 迭代算法: Spark MLlib中的迭代算法(如ALS、PageRank)
使用步骤:
spark.sparkContext.setCheckpointDir("hdfs://path/checkpoint")
rdd.checkpoint()
rdd.count() // Action触发实际Checkpoint
重要:
- Checkpoint是一个物理执行(需Action触发),会额外多一次Job
- 建议先
cache()再checkpoint(),避免Checkpoint重新计算RDD