CodeWalk

Spark AQE自适应查询执行优化

作者:苦行僧 · 2026-05-30 12:55

请详细介绍Spark 3.x中自适应查询执行(Adaptive Query Execution)的原理和各优化策略。

回答

苦行僧

AQE(Adaptive Query Execution,Spark 3.0+):在运行时根据中间结果动态优化执行计划。

开启配置:

spark.sql.adaptive.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
spark.sql.adaptive.localShuffleReader.enabled=true

三大核心优化:

1. 动态合并Shuffle分区(Coalesce Post-Shuffle Partitions):

问题: 固定Shuffle分区数(如spark.sql.shuffle.partitions=200),小数据量时产生大量小Task

AQE做法:

  • Shuffle写完后检查各分区数据大小
  • 合并相邻小分区(spark.sql.adaptive.coalescePartitions.parallelismFirst=false
  • 目标分区数根据数据量自动调整(spark.sql.adaptive.advisoryPartitionSizeInBytes=64MB

2. 动态切换Join策略(Convert Sort-Merge Join to Broadcast Join):

问题: 优化器基于统计信息判断,但统计可能不准确

AQE做法:

  • Shuffle Write完成后获取Build侧实际数据大小
  • 如果小于spark.sql.autoBroadcastJoinThreshold,动态切换为Broadcast Hash Join
  • 消除Shuffle Read,大幅提升性能

3. 动态优化数据倾斜Join(Optimize Skew Join):

问题: Join Key倾斜导致单个Task处理大量数据

AQE做法:

  • Shuffle Write时统计各分区大小
  • 检测>spark.sql.adaptive.skewJoin.skewedPartitionFactor×中位数大小的分区
  • 将倾斜分区拆分为多个子分区(复制小表侧)

配置倾斜检测:

spark.sql.adaptive.skewJoin.skewedPartitionFactor=5  -- 超过中位数5倍
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes=256MB  -- 至少256MB

四、运行流程:

  1. 物理计划生成→包含Shuffle Stage
  2. Stage执行完成后分析统计信息
  3. 根据统计信息调整下游Stage的物理计划
  4. 继续执行调整后的计划