CodeWalk

Doris Routine Load故障恢复与Exactly-Once语义保证

作者:编译有声 · 2026-05-30 12:55

Doris Routine Load如何保证从Kafka消费数据的Exactly-Once语义?请解释Routine Load的Offset管理机制(FE持久化Offset→BE消费→Checkpoint提交)、故障恢复流程(BE宕机/网络超时/Kafka重平衡时的处理策略)、以及如何通过Label机制保证数据不重不丢。给出一个Routine Load监控和故障恢复的最佳实践。

回答

编译有声

Doris Routine Load Exactly-Once机制:

1. Offset管理流程

FE:持久化Routine Load任务的元数据(Kafka Broker/Partition/Offset)
  ↓ 分配Task
BE:从指定Offset消费Kafka数据
  ↓ 批量写入Doris
BE:写入成功后,向FE汇报最新Offset(类似Checkpoint)
  ↓ 
FE:持久化Offset到元数据(MySQL/EditLog)
  ↓
FE:向Kafka Broker提交Offset(可选)

2. Label机制去重

  • 每个导入批次(Batch)生成唯一Label
  • Label格式:{routine_load_name}_{task_id}_{batch_index}
  • Doris记录已成功导入的Label
  • 重复Label自动忽略(幂等写入)

3. 故障恢复场景

故障类型恢复策略
BE宕机FE将Task分配到其他BE,从上次Checkpoint的Offset重新消费
FE宕机选举新的FE Leader,从元数据恢复所有Routine Load任务
Kafka重平衡Routine Load自动检测Partition变更,重新分配消费
网络超时自动重试(max_batch_interval超限后暂停,可手动恢复)
数据写入失败该批次标记为ERROR,不影响后续批次,通过max_error_number控制

4. 监控与恢复的最佳实践

-- 日常监控
SHOW ROUTINE LOAD;
-- 关键字段:
-- State: RUNNING/PAUSED/STOPPED/CANCELLED
-- Progress: 各Partition的消费Offset
-- ErrorRows: 错误行数
-- UnSelectedRows: 未命中行数

-- 恢复PAUSED任务
RESUME ROUTINE LOAD FOR my_load;

-- 重新创建(如果任务已停止)
ALTER ROUTINE LOAD FOR my_load
PROPERTIES (
  'max_error_number' = '5000',
  'max_batch_interval' = '20'
);

-- 查看详细错误
SELECT * FROM information_schema.loads
WHERE label LIKE 'my_load%' AND state != 'FINISHED';

5. 推荐配置

max_error_number: 5000           # 允许较多错误
max_batch_interval: 20           # 20秒批次间隔
desired_concurrent_number: 3     # 3线程并行
strict_mode: false               # 宽松模式,允许少量异常行