CodeWalk

Spark SQL Join策略选择与优化

作者:我还是少年 · 2026-05-30 12:55

Spark SQL支持哪些Join策略(Broadcast Hash Join、Sort Merge Join、Shuffled Hash Join、Broadcast Nested Loop Join)?请说明每种策略的触发条件、适用场景和优化参数。如何通过Hint强制使用特定Join策略?AQE如何动态调整Join策略?

回答

我还是少年

1. Join策略类型

Broadcast Hash Join(BHJ)

-- 小表广播到各Executor
SELECT /*+ BROADCAST(t2) */ * FROM t1 JOIN t2 ON t1.id = t2.id;
  • 条件:小表 ≤ spark.sql.autoBroadcastJoinThreshold(默认10MB)
  • 优点:无Shuffle,最快
  • 缺点:小表必须能装入内存

Sort Merge Join(SMJ)

-- 两表按Join Key排序后合并
SELECT /*+ MERGE(t1) */ * FROM t1 JOIN t2 ON t1.id = t2.id;
  • 条件:大表对大表
  • 优点:稳定,支持大数据量
  • 缺点:需要Sort + Shuffle

Shuffled Hash Join(SHJ)

SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 JOIN t2 ON t1.id = t2.id;
  • 条件:一张表可装入内存
  • 优点:比SMJ少一次Sort
  • 缺点:内存消耗大

Broadcast Nested Loop Join(BNLJ)

  • 条件:Join条件不含等值条件
  • 优点:支持不等值Join
  • 缺点:极慢(O(n²))

2. 策略选择逻辑

// Spark优化器逻辑
if (smallTable.size < broadcastThreshold)
  使用 BHJ
else if (joinType == 等值Join)
  使用 SMJ 或 SHJ(看资源情况)
else
  使用 BNLJ

3. AQE动态调整

# AQE在运行时根据统计信息切换策略
spark.sql.adaptive.enabled=true
spark.sql.adaptive.localShuffleReader.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true

# 例如:
# 优化器选择了SMJ
# AQE运行时发现左表远小于预期
# 动态切换为BHJ

4. Hint使用

-- Spark 3.x支持
SELECT /*+ BROADCAST(t1) */ * FROM t1 JOIN t2;
SELECT /*+ MERGE(t1, t2) */ * FROM t1 JOIN t2;
SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 JOIN t2;

5. 优化配置

spark.sql.autoBroadcastJoinThreshold = 50m  # 调整广播阈值
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold=512mb
spark.sql.join.preferSortMergeJoin=false  # 优先SHJ

6. 调优建议

  • 小表手动调大广播阈值
  • 大Join关闭AQE重分区(可能增加Shuffle)
  • 使用Bucket表避免重复Sort