CodeWalk

Spark AQE动态分区合并与动态Join策略的协同工作原理

作者:我还是少年 · 2026-05-30 12:55

Spark AQE的动态分区合并(Coalesce Partitions)和动态Join策略切换(SMJ→BHJ)是如何协同工作的?当一个Shuffle Stage结束时,AQE先做分区合并还是先判断Join策略?请以Shuffle SortMergeJoin为例,描述AQE的完整决策链路。给出一个通过Spark UI AQE面板观察各阶段优化效果的排查方法。

回答

我还是少年

Spark AQE Coalesce Partitions + Dynamic Join Strategy协同机制:

1. 决策链路

Stage 0 (Shuffle Write 完成)
     ↓
1. AQE收集所有Map输出的分区大小统计信息
     ↓
2. 先执行 动态分区合并(Coalesce Partitions)
   → 检测小分区(< advisoryPartitionSizeInBytes)
   → 合并相邻小分区,生成新的分区方案
   → 更新Shuffle Read的分区计划
     ↓
3. 再判断 动态Join策略
   → 检查右表Shuffle后的实际数据量
   → 若小于 autoBroadcastJoinThreshold
   → 将SortMergeJoin替换为BroadcastHashJoin
   → 若检测到倾斜分区 → 触发 Skew Join优化
     ↓
4. 生成优化后的 Stage 1 (Shuffle Read)
     ↓
5. 提交Task执行

2. 为什么先合并再判断Join策略

  • 分区合并会改变分区的数量和大小
  • 合并后的分区大小才是实际要处理的数据量
  • 基于合并后的数据量判断是否可Broadcast更准确

3. 配合倾斜Join的使用

Stage 0 Shuffle Write
  ↓ 分区大小统计
正常分区(8个, 各100MB) ─── 不合并(目标128MB)
倾斜分区(1个, 512MB)  ─── 拆分为4个子分区
  ↓
右表Shuffle后实际数据量:50MB(小于30MB阈值)
  ↓
AQE将SMJ切换为BHJ + 倾斜子分区用SortMerge

4. Spark UI AQE面板排查

SQL Tab → 点击Query → 查看AQE Plan

关键信息:
  1. Stage页面:'Shuffle Query Stage' 显示分区数变化
     - Initial Plan: 200 partitions
     - Optimized Plan: 14 partitions (合并后)
  
  2. Join策略变迁:
     - Original: SortMergeJoin
     - AQE Optimized: BroadcastHashJoin
  
  3. Skew信息:
     - Skew detected: partition 5 (size=512MB)
     - Split into: 8 sub-partitions (each 64MB)

5. 调优建议

spark.sql.adaptive.coalescePartitions.parallelismFirst: false  # 优先合并
spark.sql.adaptive.advisoryPartitionSizeInBytes: 128MB         # 目标分区大小
spark.sql.adaptive.autoBroadcastJoinThreshold: 30MB            # 广播阈值