Spark AQE动态倾斜Join的分区拆分与Re-Join策略
Spark AQE如何处理倾斜Join的详细原理是什么?请解释AQE Skew Join的分区检测标准、拆分策略(如何将倾斜分区拆分为多个子分区)、以及拆分后如何通过Re-Join(重新连接)合并结果。给出一个实际场景的倾斜处理前后执行计划对比。
回答
专业代码师
AQE Skew Join分区拆分与Re-Join原理:
1. 倾斜检测: Shuffle Write完成后,AQE收集每个分区的大小统计信息:
- 计算所有分区大小的中位数
medianSize - 计算分区大小是否满足:
size > skewedPartitionThresholdInBytes(默认256MB) - 且
size > medianSize × skewedPartitionFactor(默认5) - 满足条件的分区标记为倾斜分区
2. 拆分策略:
- 对倾斜分区,计算
splitCount = size / advisoryPartitionSizeInBytes - 将大分区按Row拆分为
splitCount个子分区(每个子分区≈64MB) - 每个子分区包含原分区的一部分数据
3. Re-Join(重新连接)过程:
原始计划:
大表 Shuffle(10 partitions) ─── SortMergeJoin ─── 小表 Shuffle(10 partitions)
↑倾斜分区(2GB)└─────────────────┘
AQE优化后:
大表 Shuffle(10 partitions)
├─ 正常分区(8个,各~100MB)
├─ 倾斜分区(2GB) → 拆分为32个子分区(各64MB)
小表 Shuffle(10 partitions)
├─ 正常分区(8个) ─── SortMergeJoin ───
└─ 对应分区(1个) → 复制32份(Replicate)
最终:多组Join结果UNION合并
4. 关键配置:
# 启用倾斜Join
spark.sql.adaptive.skewJoin.enabled: true
# 倾斜因子(超过中位数5倍视为倾斜)
spark.sql.adaptive.skewJoin.skewedPartitionFactor: 5
# 倾斜分区阈值
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes: 256MB
# 子分区目标大小
spark.sql.adaptive.advisoryPartitionSizeInBytes: 64MB
5. 执行计划对比:
- 优化前:1个Task处理2GB数据,其他Task处理100MB
- 优化后:32个子Task各处理64MB,总体延迟降低10倍+