Hudi MOR表Compaction策略的调度与监控
请详细说明Hudi MOR表生产环境中Compaction策略的设计:Compaction Plan的生成与调度(Inline/Async/Schedule策略)、Compaction作业的资源分配(Spark Executor内存/并行度配置)、以及如何通过Hudi Metrics监控Compaction进度和延迟。给出一个基于PySpark的异步Compaction调度脚本,并说明如何与Airflow/DolphinScheduler集成。
回答
古法程序员
Hudi MOR Compaction策略与调度:
1. Compaction策略选择:
# 方案1:Inline(写入时同步Compaction)
hoodie.compact.inline: true
hoodie.compact.inline.max.delta.commits: 5
# 适用:数据量小,写入与Compaction优先级一致
# 方案2:Async(写入时Schedule,独立执行)
hoodie.compact.inline: false
hoodie.compact.schedule.inline: true # 写入时生成Plan
hoodie.compact.async.enable: false # 关闭自带的async
# 适用:大表,写性能和读性能需独立控制
2. 独立Compaction作业配置:
# spark-submit 异步Compaction作业
spark-submit \
--class org.apache.hudi.utilities.HoodieCompactor \
--master yarn \
--driver-memory 4g \
--executor-memory 8g \
--executor-cores 4 \
--num-executors 10 \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.sql.catalogImplementation=hive \
/opt/hudi/packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-0.14.0.jar \
--table-type COPY_ON_WRITE \
--base-path /user/hudi/orders \
--table-name orders \
--parallelism 200 \
--max-memory 4096 \
--spark-memory 8g \
--compaction-worker-number 20
3. Compaction资源参数:
hoodie.compaction.target.io: 500000000 # 每次Compaction处理500MB
hoodie.compaction.strategy: org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy
hoodie.compact.max.memory: 4096 # Compaction最大内存MB
hoodie.compaction.trigger.strategy: NUM_COMMITS # 按commit次数触发
4. 监控Compaction进度:
-- 查看Compaction Plan
SELECT * FROM hudi_ro_orders$hoodie_compaction;
-- Hudi Metrics(Prometheus格式)
hoodie.compact.duration: 5000ms
hoodie.compact.total.bytes.written: 268435456
hoodie.compact.num.files: 15
hoodie.compact.last.commit.time: 20250525153000
-- 查看未Compaction的Log文件
SELECT * FROM hudi_ro_orders$hoodie_file_system_view
WHERE log_file_count > 0;
5. Airflow调度集成:
# DAG定义:每隔30分钟执行Compaction
from airflow import DAG
from datetime import timedelta
with DAG('hudi_compaction', schedule='*/30 * * * *', catchup=False) as dag:
compact = SparkSubmitOperator(
task_id='compact_orders',
application='/opt/hudi/compactor.py',
conn_id='spark_default',
conf={'spark.dynamicAllocation.maxExecutors': 10}
)