CodeWalk

Hudi MOR表Compaction策略的调度与监控

作者:古法程序员 · 2026-05-30 12:55

请详细说明Hudi MOR表生产环境中Compaction策略的设计:Compaction Plan的生成与调度(Inline/Async/Schedule策略)、Compaction作业的资源分配(Spark Executor内存/并行度配置)、以及如何通过Hudi Metrics监控Compaction进度和延迟。给出一个基于PySpark的异步Compaction调度脚本,并说明如何与Airflow/DolphinScheduler集成。

回答

古法程序员

Hudi MOR Compaction策略与调度:

1. Compaction策略选择

# 方案1:Inline(写入时同步Compaction)
hoodie.compact.inline: true
hoodie.compact.inline.max.delta.commits: 5
# 适用:数据量小,写入与Compaction优先级一致

# 方案2:Async(写入时Schedule,独立执行)
hoodie.compact.inline: false
hoodie.compact.schedule.inline: true      # 写入时生成Plan
hoodie.compact.async.enable: false        # 关闭自带的async
# 适用:大表,写性能和读性能需独立控制

2. 独立Compaction作业配置

# spark-submit 异步Compaction作业
spark-submit \
  --class org.apache.hudi.utilities.HoodieCompactor \
  --master yarn \
  --driver-memory 4g \
  --executor-memory 8g \
  --executor-cores 4 \
  --num-executors 10 \
  --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
  --conf spark.sql.catalogImplementation=hive \
  /opt/hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.14.0.jar \
  --table-type COPY_ON_WRITE \
  --base-path /user/hudi/orders \
  --table-name orders \
  --parallelism 200 \
  --max-memory 4096 \
  --spark-memory 8g \
  --compaction-worker-number 20

3. Compaction资源参数

hoodie.compaction.target.io: 500000000   # 每次Compaction处理500MB
hoodie.compaction.strategy: org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy
hoodie.compact.max.memory: 4096           # Compaction最大内存MB
hoodie.compaction.trigger.strategy: NUM_COMMITS  # 按commit次数触发

4. 监控Compaction进度

-- 查看Compaction Plan
SELECT * FROM hudi_ro_orders$hoodie_compaction;

-- Hudi Metrics(Prometheus格式)
hoodie.compact.duration: 5000ms
hoodie.compact.total.bytes.written: 268435456
hoodie.compact.num.files: 15
hoodie.compact.last.commit.time: 20250525153000

-- 查看未Compaction的Log文件
SELECT * FROM hudi_ro_orders$hoodie_file_system_view
WHERE log_file_count > 0;

5. Airflow调度集成

# DAG定义:每隔30分钟执行Compaction
from airflow import DAG
from datetime import timedelta

with DAG('hudi_compaction', schedule='*/30 * * * *', catchup=False) as dag:
    compact = SparkSubmitOperator(
        task_id='compact_orders',
        application='/opt/hudi/compactor.py',
        conn_id='spark_default',
        conf={'spark.dynamicAllocation.maxExecutors': 10}
    )