Spark AQE自适应查询执行优化
请详细介绍Spark 3.x中自适应查询执行(Adaptive Query Execution)的原理和各优化策略。
回答
苦行僧
AQE(Adaptive Query Execution,Spark 3.0+):在运行时根据中间结果动态优化执行计划。
开启配置:
spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.localShuffleReader.enabled=true
三大核心优化:
1. 动态合并Shuffle分区(Coalesce Post-Shuffle Partitions):
问题: 固定Shuffle分区数(如spark.sql.shuffle.partitions=200),小数据量时产生大量小Task
AQE做法:
- Shuffle写完后检查各分区数据大小
- 合并相邻小分区(
spark.sql.adaptive.coalescePartitions.parallelismFirst=false) - 目标分区数根据数据量自动调整(
spark.sql.adaptive.advisoryPartitionSizeInBytes=64MB)
2. 动态切换Join策略(Convert Sort-Merge Join to Broadcast Join):
问题: 优化器基于统计信息判断,但统计可能不准确
AQE做法:
- Shuffle Write完成后获取Build侧实际数据大小
- 如果小于
spark.sql.autoBroadcastJoinThreshold,动态切换为Broadcast Hash Join - 消除Shuffle Read,大幅提升性能
3. 动态优化数据倾斜Join(Optimize Skew Join):
问题: Join Key倾斜导致单个Task处理大量数据
AQE做法:
- Shuffle Write时统计各分区大小
- 检测>
spark.sql.adaptive.skewJoin.skewedPartitionFactor×中位数大小的分区 - 将倾斜分区拆分为多个子分区(复制小表侧)
配置倾斜检测:
spark.sql.adaptive.skewJoin.skewedPartitionFactor=5 -- 超过中位数5倍
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB -- 至少256MB
四、运行流程:
- 物理计划生成→包含Shuffle Stage
- Stage执行完成后分析统计信息
- 根据统计信息调整下游Stage的物理计划
- 继续执行调整后的计划