CodeWalk

Spark AQE自动处理数据倾斜

作者:苦行僧 · 2026-05-30 12:55

Spark 3.x的Adaptive Query Execution(AQE)如何自动处理Join数据倾斜?请说明AQE SkewJoin的动态优化原理,包括倾斜分区检测算法、拆分策略和运行时计划调整机制。如何配置AQE参数优化倾斜Join性能?

回答

苦行僧

1. AQE SkewJoin原理

Spark AQE在运行时检测并优化数据倾斜,无需手动干预:

执行流程:
1. Stage执行完成后收集统计信息
2. 分析Shuffle分区大小分布
3. 检测倾斜分区(远大于中位数)
4. 将倾斜分区拆分为多个子分区
5. 动态调整后续Stage的Join计划

2. 倾斜检测算法

// 检测条件:
// 1. 分区大小 > skewedPartitionThresholdInBytes(默认256MB)
// 2. 分区大小 > 中位数 * skewedPartitionFactor(默认5)
// 3. 分区大小 > skewedPartitionThresholdInBytes 且
//    该分区大小 > 所有分区中位数 * 5

// 实际校验
val isSkewed = size > threshold && size > median * factor

3. 拆分策略

原始倾斜分区 [100GB数据]
  ↓
拆分为 N 个子分区:
  [10GB] [10GB] [10GB] ... [10GB]  (N=10)
  ↓
每个子分区对应一个Join Task
  ↓
结果合并

4. 配置参数

# 启用AQE
spark.sql.adaptive.enabled=true

# SkewJoin相关
spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB

# 其他AQE优化
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.coalescePartitions.parallelismFirst=false
spark.sql.adaptive.coalescePartitions.minPartitionNum=10

5. 验证方法

-- 查看执行计划确认AQE生效
EXPLAIN FORMATTED SELECT ...;
-- 应看到 CustomShuffleReader 和 CoalescedPartitioner

-- SQL Plan Metrics查看
/ Spark UI → SQL → 各 Stage 统计信息/

6. 注意事项

  • AQE SkewJoin仅在Shuffle(SortMergeJoin)时生效
  • Broadcast HashJoin不受影响
  • 倾斜拆分增加Task数量,可能增加调度开销
  • 需要AQE的其他优化配合(如动态分区合并)