CodeWalk

Spark Shuffle优化与参数调优

作者:屠龙少年 · 2026-05-30 12:55

Spark的Shuffle过程是性能瓶颈的主要来源之一。请说明Spark Shuffle的两种实现(Hash Shuffle vs Sort Shuffle)的原理与演进。如何通过参数调整(Shuffle分区数、Shuffle Write/Read缓冲区、合并机制)优化Shuffle性能?如何处理Shuffle数据倾斜?

回答

屠龙少年

1. Shuffle实现演进

Hash Shuffle(Spark 1.x)

每个Map Task为每个Reduce Task创建1个文件
文件总数 = M * R(量极大,IO性能差)

Sort Shuffle(Spark 2.x+,默认)

每个Map Task写入分区数据到内存Buffer
Buffer满后排序并溢写到单个文件(带索引)
文件总数 = M(大幅减少)

钨丝排序(Tungsten Sort Shuffle)

使用Unsafe内存 + 二进制排序
避免对象序列化开销
适用:序列化数据、大Shuffle

2. 关键参数优化

# 分区数
spark.sql.shuffle.partitions=200  # 默认,需根据数据量调整
# 经验公式:目标每个分区100-200MB
# 如果数据1TB,建议 5000-10000 分区

# Shuffle Write缓冲区
spark.shuffle.file.buffer=64k  # 默认32k
spark.shuffle.spill.batchSize=10000

# Shuffle Read缓冲区
spark.reducer.maxSizeInFlight=96m  # 默认48m
spark.reducer.maxReqsInFlight=5

# 合并
spark.shuffle.consolidateFiles=true  # 合并小文件

3. Shuffle压缩

spark.shuffle.compress=true
spark.shuffle.spill.compress=true
spark.io.compression.codec=lz4  # 比Snappy压缩比更高

4. 避免Shuffle

-- 使用Broadcast Join
SELECT /*+ BROADCAST(t2) */ * FROM t1 JOIN t2;

-- 使用Bucket表
-- 按Join Key分桶,避免Shuffle
CLUSTERED BY (id) INTO 64 BUCKETS;

5. 数据倾斜处理

# AQE自动处理
spark.sql.adaptive.enabled=true
spark.sql.adaptive.skewJoin.enabled=true

# 手动
# 1. 加盐:倾斜Key加随机后缀
# 2. 两阶段聚合:先部分聚合,再去掉盐
# 3. 调整分区键

6. 监控Shuffle

Spark UI → Stages → Shuffle Read/Write Size
关键指标:
- Shuffle Write: 正常范围
- Shuffle Read: 各Task均匀
- Shuffle Spill: 如果存在说明内存不足