CodeWalk

Doris Routine Load持续消费Kafka的实现与参数调优

作者:孤独的心 · 2026-05-30 12:55

Doris Routine Load是如何实现从Kafka持续消费数据导入的?请解释Routine Load的工作流程(FE调度/BE执行/Offset管理)、关键参数(desired_concurrent_number/max_batch_interval/max_error_number)的调优方法,以及如何监控和恢复失败的Routine Load任务。给出一个JSON格式Kafka数据导入的Routine Load创建语句。

回答

孤独的心

Doris Routine Load原理与配置:

1. 工作流程

  • FE创建Routine Load任务,记录任务元数据
  • FE将任务分配给BE,每个BE负责消费Kafka的部分Partition
  • BE持续拉取Kafka消息,解析后写入Doris
  • FE定期收集BE的消费进度,持久化Offset

2. 创建Routine Load示例

CREATE ROUTINE LOAD db.orders_load ON orders
COLUMNS(order_id, user_id, amount, event_time)
PROPERTIES (
  'desired_concurrent_number' = '3',
  'max_batch_interval' = '10',
  'max_batch_rows' = '200000',
  'max_error_number' = '100',
  'strict_mode' = 'false',
  'format' = 'json',
  'jsonpaths' = '["$.order_id","$.user_id","$.amount","$.ts"]'
) FROM KAFKA (
  'kafka_broker_list' = 'kafka1:9092,kafka2:9092',
  'kafka_topic' = 'orders_topic',
  'property.group.id' = 'doris_orders_group',
  'property.client.id' = 'doris_consumer',
  'property.kafka_default_offsets' = 'OFFSET_BEGINNING'
);

3. 关键参数说明: | 参数 | 说明 | 建议值 | |------|------|--------| | desired_concurrent_number | 并行消费数(≤Kafka分区数)| 3~10 | | max_batch_interval | 最大批处理间隔(秒)| 5~30 | | max_batch_rows | 单批最大行数 | 200000~500000 | | max_error_number | 最大错误行数(超过暂停)| 100~1000 | | strict_mode | 严格模式(字段类型不匹配报错)| false |

4. 监控与恢复

-- 查看任务状态
SHOW ROUTINE LOAD FOR orders_load;
-- 暂停
PAUSE ROUTINE LOAD FOR orders_load;
-- 恢复
RESUME ROUTINE LOAD FOR orders_load;
-- 停止
STOP ROUTINE LOAD FOR orders_load;

5. 常见问题

  • 任务暂停:检查max_error_number是否超限
  • 数据延迟:增大desired_concurrent_number或减少max_batch_interval