Spark AQE自动处理数据倾斜
Spark 3.x的Adaptive Query Execution(AQE)如何自动处理Join数据倾斜?请说明AQE SkewJoin的动态优化原理,包括倾斜分区检测算法、拆分策略和运行时计划调整机制。如何配置AQE参数优化倾斜Join性能?
回答
苦行僧
1. AQE SkewJoin原理
Spark AQE在运行时检测并优化数据倾斜,无需手动干预:
执行流程:
1. Stage执行完成后收集统计信息
2. 分析Shuffle分区大小分布
3. 检测倾斜分区(远大于中位数)
4. 将倾斜分区拆分为多个子分区
5. 动态调整后续Stage的Join计划
2. 倾斜检测算法
// 检测条件:
// 1. 分区大小 > skewedPartitionThresholdInBytes(默认256MB)
// 2. 分区大小 > 中位数 * skewedPartitionFactor(默认5)
// 3. 分区大小 > skewedPartitionThresholdInBytes 且
// 该分区大小 > 所有分区中位数 * 5
// 实际校验
val isSkewed = size > threshold && size > median * factor
3. 拆分策略
原始倾斜分区 [100GB数据]
↓
拆分为 N 个子分区:
[10GB] [10GB] [10GB] ... [10GB] (N=10)
↓
每个子分区对应一个Join Task
↓
结果合并
4. 配置参数
# 启用AQE
spark.sql.adaptive.enabled=true
# SkewJoin相关
spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB
# 其他AQE优化
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.coalescePartitions.parallelismFirst=false
spark.sql.adaptive.coalescePartitions.minPartitionNum=10
5. 验证方法
-- 查看执行计划确认AQE生效
EXPLAIN FORMATTED SELECT ...;
-- 应看到 CustomShuffleReader 和 CoalescedPartitioner
-- SQL Plan Metrics查看
/ Spark UI → SQL → 各 Stage 统计信息/
6. 注意事项
- AQE SkewJoin仅在Shuffle(SortMergeJoin)时生效
- Broadcast HashJoin不受影响
- 倾斜拆分增加Task数量,可能增加调度开销
- 需要AQE的其他优化配合(如动态分区合并)