CodeWalk

Iceberg小文件合并(Compaction/Optimize)策略与实现

作者:专业代码师 · 2026-05-30 12:55

Iceberg数据湖文件过多会导致查询性能下降,请解释Iceberg的小文件合并策略:如何通过rewriteDataFiles Action合并小文件?合并的触发策略(文件数量/大小阈值)是什么?以及如何配置Flink/Spark的自动合并作业?给出一个Spark自动化合并脚本的示例。

回答

专业代码师

Iceberg小文件合并(Compaction)详解:

1. 小文件产生原因

  • Flink CDC实时写入(每个Checkpoint生成小文件)
  • 频繁的MERGE/UPDATE/DELETE操作
  • 分区粒度过细

2. rewriteDataFiles合并原理

// Spark
import org.apache.iceberg.spark.actions.SparkActions

SparkActions.get(spark)
  .rewriteDataFiles(table)
  .option("target-file-size-bytes", "268435456")   // 256MB目标文件
  .option("min-file-size-bytes", "75497472")       // 75MB以下视为小文件
  .option("rewrite-all", "false")                  // 是否重写所有文件
  .option("max-concurrent-file-group-rewrites", "10")
  .execute()

3. 合并策略参数: | 参数 | 说明 | 建议值 | |------|------|--------| | target-file-size-bytes | 目标文件大小 | 256MB~512MB | | min-file-size-bytes | 小文件阈值(低于此合并)| 75MB | | max-file-size-bytes | 最大文件大小 | 512MB | | max-concurrent-file-group-rewrites | 并行组数 | 10~20 | | partial-progress.enabled | 支持部分进度 | true |

4. Flink自动合并(通过隐藏作业)

CREATE TABLE iceberg_table (
  id INT,
  name STRING
) WITH (
  'connector' = 'iceberg',
  'write.distribution-mode' = 'hash',
  'write.target-file-size-bytes' = '268435456',
  'write.merge-file-size' = '134217728',
  'commit.manifest.target-size-bytes' = '67108864'
);

5. 定时自动合并脚本(Spark)

#!/bin/bash
# 每30分钟执行一次小文件合并
spark-submit \
  --class org.apache.iceberg.spark.actions.CompactionAction \
  --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
  --conf spark.sql.catalog.iceberg_cat=org.apache.iceberg.spark.SparkCatalog \
  iceberg-tools.jar --table iceberg_cat.db.table \
  --target-file-size 256MB --min-file-size 75MB