Doris Routine Load故障恢复与Exactly-Once语义保证
Doris Routine Load如何保证从Kafka消费数据的Exactly-Once语义?请解释Routine Load的Offset管理机制(FE持久化Offset→BE消费→Checkpoint提交)、故障恢复流程(BE宕机/网络超时/Kafka重平衡时的处理策略)、以及如何通过Label机制保证数据不重不丢。给出一个Routine Load监控和故障恢复的最佳实践。
回答
编译有声
Doris Routine Load Exactly-Once机制:
1. Offset管理流程:
FE:持久化Routine Load任务的元数据(Kafka Broker/Partition/Offset)
↓ 分配Task
BE:从指定Offset消费Kafka数据
↓ 批量写入Doris
BE:写入成功后,向FE汇报最新Offset(类似Checkpoint)
↓
FE:持久化Offset到元数据(MySQL/EditLog)
↓
FE:向Kafka Broker提交Offset(可选)
2. Label机制去重:
- 每个导入批次(Batch)生成唯一Label
- Label格式:
{routine_load_name}_{task_id}_{batch_index} - Doris记录已成功导入的Label
- 重复Label自动忽略(幂等写入)
3. 故障恢复场景:
| 故障类型 | 恢复策略 |
|---|---|
| BE宕机 | FE将Task分配到其他BE,从上次Checkpoint的Offset重新消费 |
| FE宕机 | 选举新的FE Leader,从元数据恢复所有Routine Load任务 |
| Kafka重平衡 | Routine Load自动检测Partition变更,重新分配消费 |
| 网络超时 | 自动重试(max_batch_interval超限后暂停,可手动恢复) |
| 数据写入失败 | 该批次标记为ERROR,不影响后续批次,通过max_error_number控制 |
4. 监控与恢复的最佳实践:
-- 日常监控
SHOW ROUTINE LOAD;
-- 关键字段:
-- State: RUNNING/PAUSED/STOPPED/CANCELLED
-- Progress: 各Partition的消费Offset
-- ErrorRows: 错误行数
-- UnSelectedRows: 未命中行数
-- 恢复PAUSED任务
RESUME ROUTINE LOAD FOR my_load;
-- 重新创建(如果任务已停止)
ALTER ROUTINE LOAD FOR my_load
PROPERTIES (
'max_error_number' = '5000',
'max_batch_interval' = '20'
);
-- 查看详细错误
SELECT * FROM information_schema.loads
WHERE label LIKE 'my_load%' AND state != 'FINISHED';
5. 推荐配置:
max_error_number: 5000 # 允许较多错误
max_batch_interval: 20 # 20秒批次间隔
desired_concurrent_number: 3 # 3线程并行
strict_mode: false # 宽松模式,允许少量异常行