Spark SQL Join策略选择与优化
Spark SQL支持哪些Join策略(Broadcast Hash Join、Sort Merge Join、Shuffled Hash Join、Broadcast Nested Loop Join)?请说明每种策略的触发条件、适用场景和优化参数。如何通过Hint强制使用特定Join策略?AQE如何动态调整Join策略?
回答
我还是少年
1. Join策略类型
Broadcast Hash Join(BHJ)
-- 小表广播到各Executor
SELECT /*+ BROADCAST(t2) */ * FROM t1 JOIN t2 ON t1.id = t2.id;
- 条件:小表 ≤ spark.sql.autoBroadcastJoinThreshold(默认10MB)
- 优点:无Shuffle,最快
- 缺点:小表必须能装入内存
Sort Merge Join(SMJ)
-- 两表按Join Key排序后合并
SELECT /*+ MERGE(t1) */ * FROM t1 JOIN t2 ON t1.id = t2.id;
- 条件:大表对大表
- 优点:稳定,支持大数据量
- 缺点:需要Sort + Shuffle
Shuffled Hash Join(SHJ)
SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 JOIN t2 ON t1.id = t2.id;
- 条件:一张表可装入内存
- 优点:比SMJ少一次Sort
- 缺点:内存消耗大
Broadcast Nested Loop Join(BNLJ)
- 条件:Join条件不含等值条件
- 优点:支持不等值Join
- 缺点:极慢(O(n²))
2. 策略选择逻辑
// Spark优化器逻辑
if (smallTable.size < broadcastThreshold)
使用 BHJ
else if (joinType == 等值Join)
使用 SMJ 或 SHJ(看资源情况)
else
使用 BNLJ
3. AQE动态调整
# AQE在运行时根据统计信息切换策略
spark.sql.adaptive.enabled=true
spark.sql.adaptive.localShuffleReader.enabled=true
spark.sql.adaptive.coalescePartitions.enabled=true
# 例如:
# 优化器选择了SMJ
# AQE运行时发现左表远小于预期
# 动态切换为BHJ
4. Hint使用
-- Spark 3.x支持
SELECT /*+ BROADCAST(t1) */ * FROM t1 JOIN t2;
SELECT /*+ MERGE(t1, t2) */ * FROM t1 JOIN t2;
SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 JOIN t2;
5. 优化配置
spark.sql.autoBroadcastJoinThreshold = 50m # 调整广播阈值
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold=512mb
spark.sql.join.preferSortMergeJoin=false # 优先SHJ
6. 调优建议
- 小表手动调大广播阈值
- 大Join关闭AQE重分区(可能增加Shuffle)
- 使用Bucket表避免重复Sort