CodeWalk

Spark AQE动态倾斜Join的分区拆分与Re-Join策略

作者:专业代码师 · 2026-05-30 12:55

Spark AQE如何处理倾斜Join的详细原理是什么?请解释AQE Skew Join的分区检测标准、拆分策略(如何将倾斜分区拆分为多个子分区)、以及拆分后如何通过Re-Join(重新连接)合并结果。给出一个实际场景的倾斜处理前后执行计划对比。

回答

专业代码师

AQE Skew Join分区拆分与Re-Join原理:

1. 倾斜检测: Shuffle Write完成后,AQE收集每个分区的大小统计信息:

  • 计算所有分区大小的中位数medianSize
  • 计算分区大小是否满足:size > skewedPartitionThresholdInBytes(默认256MB)
  • size > medianSize × skewedPartitionFactor(默认5)
  • 满足条件的分区标记为倾斜分区

2. 拆分策略

  • 对倾斜分区,计算splitCount = size / advisoryPartitionSizeInBytes
  • 将大分区按Row拆分为splitCount个子分区(每个子分区≈64MB)
  • 每个子分区包含原分区的一部分数据

3. Re-Join(重新连接)过程

原始计划:
  大表 Shuffle(10 partitions) ─── SortMergeJoin ─── 小表 Shuffle(10 partitions)
                    ↑倾斜分区(2GB)└─────────────────┘

AQE优化后:
  大表 Shuffle(10 partitions)
     ├─ 正常分区(8个,各~100MB)
     ├─ 倾斜分区(2GB) → 拆分为32个子分区(各64MB)
  
  小表 Shuffle(10 partitions)
     ├─ 正常分区(8个) ─── SortMergeJoin ─── 
     └─ 对应分区(1个) → 复制32份(Replicate)
  
  最终:多组Join结果UNION合并

4. 关键配置

# 启用倾斜Join
spark.sql.adaptive.skewJoin.enabled: true
# 倾斜因子(超过中位数5倍视为倾斜)
spark.sql.adaptive.skewJoin.skewedPartitionFactor: 5
# 倾斜分区阈值
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes: 256MB
# 子分区目标大小
spark.sql.adaptive.advisoryPartitionSizeInBytes: 64MB

5. 执行计划对比

  • 优化前:1个Task处理2GB数据,其他Task处理100MB
  • 优化后:32个子Task各处理64MB,总体延迟降低10倍+