Flink Checkpoint与Savepoint对比
请详细对比Flink Checkpoint和Savepoint的异同。包括触发方式、生命周期、数据格式、用途场景、以及在生产中的最佳实践。如何从Savepoint恢复Job?如何跨大版本升级时使用Savepoint进行状态迁移?
回答
Yahuda
1. 核心区别
| 维度 | Checkpoint | Savepoint |
|---|---|---|
| 触发 | 自动(周期性) | 手动(用户触发) |
| 目的 | 故障恢复 | 运维操作/升级/迁移 |
| 生命周期 | 自动清理(保留N个) | 手动管理(持久保留) |
| 格式 | 算子状态二进制 | 标准化格式(跨版本兼容) |
| 触发开销 | 低(增量) | 高(全量) |
| 存储路径 | 配置的CK目录 | 指定路径 |
2. 工作原理
Checkpoint流程:
1. JobManager发送Barrier
2. Source接收到Barrier,将当前Offset存入State
3. Barrier在算子链中传递
4. 每个算子快照当前State
5. 所有算子完成 → Checkpoint标记为Completed
Savepoint流程:
# 手动触发
./bin/flink savepoint :jobId [:targetDirectory]
# 本质:特殊的Checkpoint(强制全量)
# 数据格式更通用,便于跨版本恢复
3. 从Savepoint恢复
# 从指定Savepoint恢复
./bin/flink run -s hdfs://path/to/savepoint \
-c com.example.MainClass \
app.jar
# 允许状态重映射
--allowNonRestoredState
4. 跨版本升级
Flink 1.14 → 1.17 升级步骤:
1. 停止作业(触发Savepoint)
2. 升级Flink版本
3. 调整代码(如有算子变更)
4. 从Savepoint恢复
注意:
- 序列化兼容性(Pojo/Kryo)
- 算子UID必须稳定(uid()方法设置)
- 状态数据结构兼容(@TypeInfo)
5. 最佳实践
# Checkpoint配置
state.checkpoints.num-retained: 5
state.checkpoints.dir: hdfs:///flink/checkpoints
# Savepoint策略
# 1. 版本升级前必做Savepoint
# 2. 定期(如每天)做全量Savepoint
# 3. 保留最近7天Savepoint
# 代码规范
DataStream<...> stream = env
.addSource(new SourceFunction())
.uid("my-source"); // 必须设置
6. FAQ
- Savepoint可能很大(全量),需预留存储空间
- 跨大版本升级建议测试恢复流程
- 状态迁移故障可回退旧作业+Savepoint