Spark AQE(自适应查询执行)核心机制详解
Spark 3.0+的AQE(Adaptive Query Execution)包含哪些核心优化机制?请分别解释动态分区合并(Coalesce Partitions)、动态切换Join策略(Convert SortMergeJoin to BroadcastHashJoin)、动态倾斜Join优化(Optimize Skew Join)的工作原理和配置参数。给出一个AQE调优的配置示例。
回答
小字辈
Spark AQE三大核心优化:
1. 动态分区合并(Dynamic Coalesce Partitions)
SET spark.sql.adaptive.coalescePartitions.enabled = true;
SET spark.sql.adaptive.coalescePartitions.parallelismFirst = false;
SET spark.sql.adaptive.coalescePartitions.minPartitionSize = 64MB;
- 原理:Shuffle Write完成后,根据每个分区的实际数据大小,自动合并小分区
- 效果:避免大量小Task(减少调度开销)
- 默认
spark.sql.adaptive.advisoryPartitionSizeInBytes=64MB控制目标大小
2. 动态切换Join策略(Convert SMJ to BHJ)
SET spark.sql.adaptive.autoBroadcastJoinThreshold = 30MB;
SET spark.sql.adaptive.localShuffleReader.enabled = true;
- 原理:Shuffle前无法知道小表真实大小;Shuffle Write后,AQE得知小表实际数据量,若小于阈值则自动转为BroadcastHashJoin
- 效果:彻底消除Shuffle的Reduce端
3. 动态倾斜Join优化(Optimize Skew Join)
SET spark.sql.adaptive.skewJoin.enabled = true;
SET spark.sql.adaptive.skewJoin.skewedPartitionFactor = 5;
SET spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes = 256MB;
- 原理:Shuffle Write后检测倾斜分区(中位数×factor倍以上),将大分区拆分为子分区,各自Join后UNION
- 效果:自动解决数据倾斜,无需手动加盐
4. 完整AQE配置示例:
spark.sql.adaptive.enabled: true
spark.sql.adaptive.coalescePartitions.enabled: true
spark.sql.adaptive.coalescePartitions.minPartitionSize: 64MB
spark.sql.adaptive.advisoryPartitionSizeInBytes: 128MB
spark.sql.adaptive.skewJoin.enabled: true
spark.sql.adaptive.skewJoin.skewedPartitionFactor: 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes: 256MB
spark.sql.adaptive.autoBroadcastJoinThreshold: 30MB
spark.sql.adaptive.localShuffleReader.enabled: true