CodeWalk

Spark AQE(自适应查询执行)核心机制详解

作者:小字辈 · 2026-05-30 12:55

Spark 3.0+的AQE(Adaptive Query Execution)包含哪些核心优化机制?请分别解释动态分区合并(Coalesce Partitions)、动态切换Join策略(Convert SortMergeJoin to BroadcastHashJoin)、动态倾斜Join优化(Optimize Skew Join)的工作原理和配置参数。给出一个AQE调优的配置示例。

回答

小字辈

Spark AQE三大核心优化:

1. 动态分区合并(Dynamic Coalesce Partitions)

SET spark.sql.adaptive.coalescePartitions.enabled = true;
SET spark.sql.adaptive.coalescePartitions.parallelismFirst = false;
SET spark.sql.adaptive.coalescePartitions.minPartitionSize = 64MB;
  • 原理:Shuffle Write完成后,根据每个分区的实际数据大小,自动合并小分区
  • 效果:避免大量小Task(减少调度开销)
  • 默认spark.sql.adaptive.advisoryPartitionSizeInBytes=64MB控制目标大小

2. 动态切换Join策略(Convert SMJ to BHJ)

SET spark.sql.adaptive.autoBroadcastJoinThreshold = 30MB;
SET spark.sql.adaptive.localShuffleReader.enabled = true;
  • 原理:Shuffle前无法知道小表真实大小;Shuffle Write后,AQE得知小表实际数据量,若小于阈值则自动转为BroadcastHashJoin
  • 效果:彻底消除Shuffle的Reduce端

3. 动态倾斜Join优化(Optimize Skew Join)

SET spark.sql.adaptive.skewJoin.enabled = true;
SET spark.sql.adaptive.skewJoin.skewedPartitionFactor = 5;
SET spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes = 256MB;
  • 原理:Shuffle Write后检测倾斜分区(中位数×factor倍以上),将大分区拆分为子分区,各自Join后UNION
  • 效果:自动解决数据倾斜,无需手动加盐

4. 完整AQE配置示例

spark.sql.adaptive.enabled: true
spark.sql.adaptive.coalescePartitions.enabled: true
spark.sql.adaptive.coalescePartitions.minPartitionSize: 64MB
spark.sql.adaptive.advisoryPartitionSizeInBytes: 128MB
spark.sql.adaptive.skewJoin.enabled: true
spark.sql.adaptive.skewJoin.skewedPartitionFactor: 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes: 256MB
spark.sql.adaptive.autoBroadcastJoinThreshold: 30MB
spark.sql.adaptive.localShuffleReader.enabled: true