CodeWalk

Spark广播变量与累加器

作者:编译有声 · 2026-05-30 12:55

请解释Spark中的广播变量(Broadcast Variable)和累加器(Accumulator)的用途和使用场景。

回答

编译有声

广播变量(Broadcast Variable):

用途: 将只读变量高效地分发到所有Executor,避免每个Task都重复传输。

工作原理:

  1. Driver端将变量序列化后,通过高效广播算法(BitTorrent / 树形拓扑)分发
  2. 使用TorrentBroadcast:节点之间互相传递,类似P2P
  3. 每个Executor只保留一份副本,所有Task共享

适用场景:

  • 大字典表(如地区编码表、黑白名单)
  • 广播小表做MapJoin
  • ML模型的参数向量

示例:

val broadcastDict = sc.broadcast(Map("01" -> "北京", "02" -> "上海"))
rdd.map(kv => (broadcastDict.value.getOrElse(kv._1, "未知"), kv._2))

注意事项:

  • 变量必须是只读的,不能修改
  • 广播变量应在Driver端创建
  • 不要广播超大对象(超过2GB可能OOM)

累加器(Accumulator):

用途: 实现Driver端和Executor端之间的共享变量,用于累加计数/求和。

工作原理:

  1. Driver定义初始值
  2. Executor端通过+=操作累加
  3. Driver端通过.value读取最终结果

适用场景:

  • 统计处理记录数(成功/失败计数)
  • 计数器监控(如空值条数、异常条数)
  • 自定义累加器(如CollectionAccumulator收集异常信息)

示例:

val nullCount = sc.longAccumulator("nullCount")
rdd.foreach { row =>
  if (row.name == null) nullCount.add(1)
}
println(s"Null count: ${nullCount.value}")

注意事项:

  • 自定义累加器需实现AccumulatorV2接口
  • Action算子内才能保证累加结果准确