Spark Shuffle优化与参数调优
Spark的Shuffle过程是性能瓶颈的主要来源之一。请说明Spark Shuffle的两种实现(Hash Shuffle vs Sort Shuffle)的原理与演进。如何通过参数调整(Shuffle分区数、Shuffle Write/Read缓冲区、合并机制)优化Shuffle性能?如何处理Shuffle数据倾斜?
回答
屠龙少年
1. Shuffle实现演进
Hash Shuffle(Spark 1.x)
每个Map Task为每个Reduce Task创建1个文件
文件总数 = M * R(量极大,IO性能差)
Sort Shuffle(Spark 2.x+,默认)
每个Map Task写入分区数据到内存Buffer
Buffer满后排序并溢写到单个文件(带索引)
文件总数 = M(大幅减少)
钨丝排序(Tungsten Sort Shuffle)
使用Unsafe内存 + 二进制排序
避免对象序列化开销
适用:序列化数据、大Shuffle
2. 关键参数优化
# 分区数
spark.sql.shuffle.partitions=200 # 默认,需根据数据量调整
# 经验公式:目标每个分区100-200MB
# 如果数据1TB,建议 5000-10000 分区
# Shuffle Write缓冲区
spark.shuffle.file.buffer=64k # 默认32k
spark.shuffle.spill.batchSize=10000
# Shuffle Read缓冲区
spark.reducer.maxSizeInFlight=96m # 默认48m
spark.reducer.maxReqsInFlight=5
# 合并
spark.shuffle.consolidateFiles=true # 合并小文件
3. Shuffle压缩
spark.shuffle.compress=true
spark.shuffle.spill.compress=true
spark.io.compression.codec=lz4 # 比Snappy压缩比更高
4. 避免Shuffle
-- 使用Broadcast Join
SELECT /*+ BROADCAST(t2) */ * FROM t1 JOIN t2;
-- 使用Bucket表
-- 按Join Key分桶,避免Shuffle
CLUSTERED BY (id) INTO 64 BUCKETS;
5. 数据倾斜处理
# AQE自动处理
spark.sql.adaptive.enabled=true
spark.sql.adaptive.skewJoin.enabled=true
# 手动
# 1. 加盐:倾斜Key加随机后缀
# 2. 两阶段聚合:先部分聚合,再去掉盐
# 3. 调整分区键
6. 监控Shuffle
Spark UI → Stages → Shuffle Read/Write Size
关键指标:
- Shuffle Write: 正常范围
- Shuffle Read: 各Task均匀
- Shuffle Spill: 如果存在说明内存不足