Spark Shuffle原理与优化策略
请详细介绍Spark的Shuffle机制,包括Hash Shuffle和Sort Shuffle的演变过程以及生产环境下的Shuffle优化策略。
回答
Yahuda
一、Spark Shuffle演进:
1. Hash Shuffle(Spark 1.x早期):
- Map端为每个Reduce创建独立文件(M×R个文件)
- M=200 Map Task,R=200 Reduce Task → 40,000个小文件
- 问题:大量小文件导致IO瓶颈、OOM(文件句柄耗尽)
Hash Shuffle优化(Consolidated模式):
- 同一个Executor的Map Task合并写文件
- 文件数=Executor数×R
2. Sort Shuffle(Spark 1.2+,默认):
原理:
- Map端按Partition ID排序后写入单个文件+索引文件
- 文件数=Map Task数×2(数据文件+索引文件)
- Reduce端根据索引读取对应分区数据
- 大幅减少文件数
3. Tungsten-Sort Shuffle(Spark 1.5+):
- 使用Unsafe Shuffle:操作序列化后的二进制数据(减少GC)
- 要求:无聚合操作、Partition数<2^24
- 速度比普通Sort Shuffle快3~5倍
二、Shuffle配置参数:
spark.shuffle.manager=tungsten-sort # Shuffle管理器
spark.shuffle.sort.bypassMergeThreshold=200 # 低于此值启用Bypass
spark.shuffle.file.buffer=32k # Map端写缓冲区
spark.reducer.maxSizeInFlight=48m # Reduce端拉取缓冲区
spark.shuffle.io.retryWait=5s # 拉取重试间隔
spark.reducer.maxBlocksInFlightPerAddress=5 # 每个主机请求数
三、生产优化策略:
| 问题 | 优化 |
|---|---|
| Shuffle数据量巨大 | 增加分区数(减小每个Partition数据量) |
| Shuffle磁盘IO高 | 使用SSD、配置spark.local.dir多磁盘 |
| Shuffle网络传输慢 | 开启压(spark.shuffle.compress=true) |
| Reduce端OOM | 减小spark.reducer.maxSizeInFlight |
| 频繁GC | 使用Tungsten Sort(Unsafe操作) |
四、Spark 3.x AQE对Shuffle的优化:
- 动态合并小分区(减少Reduce端小Task)
- 动态处理倾斜Join