Iceberg小文件合并(Compaction/Optimize)策略与实现
Iceberg数据湖文件过多会导致查询性能下降,请解释Iceberg的小文件合并策略:如何通过rewriteDataFiles Action合并小文件?合并的触发策略(文件数量/大小阈值)是什么?以及如何配置Flink/Spark的自动合并作业?给出一个Spark自动化合并脚本的示例。
回答
专业代码师
Iceberg小文件合并(Compaction)详解:
1. 小文件产生原因:
- Flink CDC实时写入(每个Checkpoint生成小文件)
- 频繁的MERGE/UPDATE/DELETE操作
- 分区粒度过细
2. rewriteDataFiles合并原理:
// Spark
import org.apache.iceberg.spark.actions.SparkActions
SparkActions.get(spark)
.rewriteDataFiles(table)
.option("target-file-size-bytes", "268435456") // 256MB目标文件
.option("min-file-size-bytes", "75497472") // 75MB以下视为小文件
.option("rewrite-all", "false") // 是否重写所有文件
.option("max-concurrent-file-group-rewrites", "10")
.execute()
3. 合并策略参数: | 参数 | 说明 | 建议值 | |------|------|--------| | target-file-size-bytes | 目标文件大小 | 256MB~512MB | | min-file-size-bytes | 小文件阈值(低于此合并)| 75MB | | max-file-size-bytes | 最大文件大小 | 512MB | | max-concurrent-file-group-rewrites | 并行组数 | 10~20 | | partial-progress.enabled | 支持部分进度 | true |
4. Flink自动合并(通过隐藏作业):
CREATE TABLE iceberg_table (
id INT,
name STRING
) WITH (
'connector' = 'iceberg',
'write.distribution-mode' = 'hash',
'write.target-file-size-bytes' = '268435456',
'write.merge-file-size' = '134217728',
'commit.manifest.target-size-bytes' = '67108864'
);
5. 定时自动合并脚本(Spark):
#!/bin/bash
# 每30分钟执行一次小文件合并
spark-submit \
--class org.apache.iceberg.spark.actions.CompactionAction \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.iceberg_cat=org.apache.iceberg.spark.SparkCatalog \
iceberg-tools.jar --table iceberg_cat.db.table \
--target-file-size 256MB --min-file-size 75MB