Spark AQE动态分区合并与动态Join策略的协同工作原理
Spark AQE的动态分区合并(Coalesce Partitions)和动态Join策略切换(SMJ→BHJ)是如何协同工作的?当一个Shuffle Stage结束时,AQE先做分区合并还是先判断Join策略?请以Shuffle SortMergeJoin为例,描述AQE的完整决策链路。给出一个通过Spark UI AQE面板观察各阶段优化效果的排查方法。
回答
我还是少年
Spark AQE Coalesce Partitions + Dynamic Join Strategy协同机制:
1. 决策链路:
Stage 0 (Shuffle Write 完成)
↓
1. AQE收集所有Map输出的分区大小统计信息
↓
2. 先执行 动态分区合并(Coalesce Partitions)
→ 检测小分区(< advisoryPartitionSizeInBytes)
→ 合并相邻小分区,生成新的分区方案
→ 更新Shuffle Read的分区计划
↓
3. 再判断 动态Join策略
→ 检查右表Shuffle后的实际数据量
→ 若小于 autoBroadcastJoinThreshold
→ 将SortMergeJoin替换为BroadcastHashJoin
→ 若检测到倾斜分区 → 触发 Skew Join优化
↓
4. 生成优化后的 Stage 1 (Shuffle Read)
↓
5. 提交Task执行
2. 为什么先合并再判断Join策略:
- 分区合并会改变分区的数量和大小
- 合并后的分区大小才是实际要处理的数据量
- 基于合并后的数据量判断是否可Broadcast更准确
3. 配合倾斜Join的使用:
Stage 0 Shuffle Write
↓ 分区大小统计
正常分区(8个, 各100MB) ─── 不合并(目标128MB)
倾斜分区(1个, 512MB) ─── 拆分为4个子分区
↓
右表Shuffle后实际数据量:50MB(小于30MB阈值)
↓
AQE将SMJ切换为BHJ + 倾斜子分区用SortMerge
4. Spark UI AQE面板排查:
SQL Tab → 点击Query → 查看AQE Plan
关键信息:
1. Stage页面:'Shuffle Query Stage' 显示分区数变化
- Initial Plan: 200 partitions
- Optimized Plan: 14 partitions (合并后)
2. Join策略变迁:
- Original: SortMergeJoin
- AQE Optimized: BroadcastHashJoin
3. Skew信息:
- Skew detected: partition 5 (size=512MB)
- Split into: 8 sub-partitions (each 64MB)
5. 调优建议:
spark.sql.adaptive.coalescePartitions.parallelismFirst: false # 优先合并
spark.sql.adaptive.advisoryPartitionSizeInBytes: 128MB # 目标分区大小
spark.sql.adaptive.autoBroadcastJoinThreshold: 30MB # 广播阈值