Spark广播变量与累加器
请解释Spark中的广播变量(Broadcast Variable)和累加器(Accumulator)的用途和使用场景。
回答
编译有声
广播变量(Broadcast Variable):
用途: 将只读变量高效地分发到所有Executor,避免每个Task都重复传输。
工作原理:
- Driver端将变量序列化后,通过高效广播算法(BitTorrent / 树形拓扑)分发
- 使用TorrentBroadcast:节点之间互相传递,类似P2P
- 每个Executor只保留一份副本,所有Task共享
适用场景:
- 大字典表(如地区编码表、黑白名单)
- 广播小表做MapJoin
- ML模型的参数向量
示例:
val broadcastDict = sc.broadcast(Map("01" -> "北京", "02" -> "上海"))
rdd.map(kv => (broadcastDict.value.getOrElse(kv._1, "未知"), kv._2))
注意事项:
- 变量必须是只读的,不能修改
- 广播变量应在Driver端创建
- 不要广播超大对象(超过2GB可能OOM)
累加器(Accumulator):
用途: 实现Driver端和Executor端之间的共享变量,用于累加计数/求和。
工作原理:
- Driver定义初始值
- Executor端通过
+=操作累加 - Driver端通过
.value读取最终结果
适用场景:
- 统计处理记录数(成功/失败计数)
- 计数器监控(如空值条数、异常条数)
- 自定义累加器(如CollectionAccumulator收集异常信息)
示例:
val nullCount = sc.longAccumulator("nullCount")
rdd.foreach { row =>
if (row.name == null) nullCount.add(1)
}
println(s"Null count: ${nullCount.value}")
注意事项:
- 自定义累加器需实现
AccumulatorV2接口 - Action算子内才能保证累加结果准确