在 Spark 中,提供了两种类型的共享变量:累加器 (accumulator) 与广播变量 (broadcast variable):html
累加器:用来对信息进行聚合,主要用于累计计数等场景;apache
广播变量:主要用于在节点间高效分发大对象。数组
这里先看一个具体的场景,对于正常的累计求和,若是在集群模式中使用下面的代码进行计算,会发现执行结果并不是预期:网络
var counter = 0 val data = Array(1, 2, 3, 4, 5) sc.parallelize(data).foreach(x => counter += x) println(counter)
counter 最后的结果是 0,致使这个问题的主要缘由是闭包。闭包
1. Scala 中闭包的概念并发
这里先介绍一下 Scala 中关于闭包的概念:ide
var more = 10 val addMore = (x: Int) => x + more
如上函数 addMore
中有两个变量 x 和 more:函数
x : 是一个绑定变量 (bound variable),由于其是该函数的入参,在函数的上下文中有明确的定义;大数据
more : 是一个自由变量 (free variable),由于函数字面量本生并无给 more 赋予任何含义。ui
按照定义:在建立函数时,若是须要捕获自由变量,那么包含指向被捕获变量的引用的函数就被称为闭包函数。
2. Spark 中的闭包
也能够参考:https://blog.csdn.net/hu_lichao/article/details/112451982
在实际计算时,Spark 会将对 RDD 操做分解为 Task,Task 运行在 Worker Node 上。在执行以前,Spark 会对任务进行闭包,若是闭包内涉及到自由变量,则程序会进行拷贝,并将副本变量放在闭包中,以后闭包被序列化并发送给每一个执行者。所以,当在 foreach 函数中引用 counter
时,它将再也不是 Driver 节点上的 counter
,而是闭包中的副本 counter
,默认状况下,副本 counter
更新后的值不会回传到 Driver,因此 counter
的最终值仍然为零。
须要注意的是:在 Local 模式下,有可能执行 foreach
的 Worker Node 与 Diver 处在相同的 JVM,并引用相同的原始 counter
,这时候更新多是正确的,可是在集群模式下必定不正确。因此在遇到此类问题时应优先使用累加器。
累加器的原理实际上很简单:就是将每一个副本变量的最终值传回 Driver,由 Driver 聚合后获得最终值,并更新原始变量。
SparkContext
中定义了全部建立累加器的方法,须要注意的是:被中横线划掉的累加器方法在 Spark 2.0.0 以后被标识为废弃。
使用示例和执行结果分别以下:
val data = Array(1, 2, 3, 4, 5) // 定义累加器 val accum = sc.longAccumulator("My Accumulator") sc.parallelize(data).foreach(x => accum.add(x)) // 获取累加器的值 accum.value
在上面介绍中闭包的过程当中咱们说道每一个 Task 任务的闭包都会持有自由变量的副本,若是变量很大且 Task 任务不少的状况下,这必然会对网络 IO 形成压力,为了解决这个状况,Spark 提供了广播变量。
广播变量的作法很简单:就是不把副本变量分发到每一个 Task 中,而是将其分发到每一个 Executor,Executor 中的全部 Task 共享一个副本变量。
// 把一个数组定义为一个广播变量 val broadcastVar = sc.broadcast(Array(1, 2, 3, 4, 5)) // 以后用到该数组时应优先使用广播变量,而不是原值 sc.parallelize(broadcastVar.value).map(_ * 10).collect()
建立的Accumulator变量的值可以在Spark Web UI上看到,在建立时应该尽可能为其命名,下面探讨如何在Spark Web UI上查看累加器的值
http://www.javashuo.com/article/p-puyjsavb-ge.html
https://www.cnblogs.com/zz-ksw/p/12448650.html
吴邪,小三爷,混迹于后台,大数据,人工智能领域的小菜鸟。
更多请关注