CodeWalk

Spark Shuffle原理与优化策略

作者:Yahuda · 2026-05-30 12:55

请详细介绍Spark的Shuffle机制,包括Hash Shuffle和Sort Shuffle的演变过程以及生产环境下的Shuffle优化策略。

回答

Yahuda

一、Spark Shuffle演进:

1. Hash Shuffle(Spark 1.x早期):

  • Map端为每个Reduce创建独立文件(M×R个文件)
  • M=200 Map Task,R=200 Reduce Task → 40,000个小文件
  • 问题:大量小文件导致IO瓶颈、OOM(文件句柄耗尽)

Hash Shuffle优化(Consolidated模式):

  • 同一个Executor的Map Task合并写文件
  • 文件数=Executor数×R

2. Sort Shuffle(Spark 1.2+,默认):

原理:

  • Map端按Partition ID排序后写入单个文件+索引文件
  • 文件数=Map Task数×2(数据文件+索引文件)
  • Reduce端根据索引读取对应分区数据
  • 大幅减少文件数

3. Tungsten-Sort Shuffle(Spark 1.5+):

  • 使用Unsafe Shuffle:操作序列化后的二进制数据(减少GC)
  • 要求:无聚合操作、Partition数<2^24
  • 速度比普通Sort Shuffle快3~5倍

二、Shuffle配置参数:

spark.shuffle.manager=tungsten-sort      # Shuffle管理器
spark.shuffle.sort.bypassMergeThreshold=200  # 低于此值启用Bypass
spark.shuffle.file.buffer=32k             # Map端写缓冲区
spark.reducer.maxSizeInFlight=48m         # Reduce端拉取缓冲区
spark.shuffle.io.retryWait=5s             # 拉取重试间隔
spark.reducer.maxBlocksInFlightPerAddress=5  # 每个主机请求数

三、生产优化策略:

问题优化
Shuffle数据量巨大增加分区数(减小每个Partition数据量)
Shuffle磁盘IO高使用SSD、配置spark.local.dir多磁盘
Shuffle网络传输慢开启压(spark.shuffle.compress=true
Reduce端OOM减小spark.reducer.maxSizeInFlight
频繁GC使用Tungsten Sort(Unsafe操作)

四、Spark 3.x AQE对Shuffle的优化:

  • 动态合并小分区(减少Reduce端小Task)
  • 动态处理倾斜Join