CodeWalk

Spark Structured Streaming容错与一致性

作者:孤独的心 · 2026-05-30 12:55

Spark Structured Streaming如何保证端到端的Exactly-Once语义?请从输入源(Kafka/Socket)、状态管理(State Store)、输出(Sink)三个环节详细说明容错机制。WAL(Write-Ahead Log)和Offset管理如何配合实现exactly-once?

回答

孤独的心

1. Exactly-Once实现策略

输入源(Source)

  • Kafka Source:记录Offset到Offset Log
  • 重启时从可靠存储读取Offset,重新消费
  • 支持 Kafka 动态分区发现
// Kafka Source内部
// 每个微批次提交Offset到checkpoint
// spark.sql.streaming.checkpointLocation

状态管理(State Store)

  • 状态存储在HDFS/DFS
  • 使用 WAL(Write-Ahead Log) 记录状态变更
  • 增量更新:只保存变化的SST文件
State Store架构:
├── Checkpoint/
│   ├── sources/       # Offset信息
│   ├── offsets/       # 已提交Offset
│   ├── state/         # RocksDB状态快照
│   └── commits/       # 已完成批次

输出(Sink)

// 幂等写入
// Kafka Sink:使用idempotent producer
// File Sink:使用PartitionedFile
// JDBC Sink:使用事务或UPSERT

// 事务写入
.writeStream
.format("kafka")
.option("kafka.enable.idempotence", "true")

2. 容错恢复流程

1. 重启后从checkpoint恢复
2. 读取commits确认最后成功批次
3. 读取offsets获取该批次输入位置
4. 重新处理后续批次
5. 状态从state目录恢复
6. 输出使用幂等机制确保不重复

3. 限制与注意事项

环节Exactly-Once难度推荐方案
Kafka→StructuredOffset管理
状态计算中等WAL+Checkpoint
输出到HDFS幂等文件写入
输出到Kafka幂等生产者
输出到MySQLUPSERT/事务

4. 最佳实践

// 必须设置checkpointLocation
query = df.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/checkpoint/app1")
  .trigger(Trigger.ProcessingTime("1 minute"))
  .start()

注意:Structured Streaming做不到纯Exactly-Once(下游需幂等支持),但其处理引擎保证重新处理不改变状态正确性。