Spark共享变量

共享变量python

一般状况下,当向Spark操做(如map,reduce)传递一个函数时,它会在一个远程集群节点上执行,它会使用函数中全部变量的副本。这些变量被复制到全部的机器上,远程机器上并无被更新的变量会向驱动程序回传。在任务之间使用通用的,支持读写的共享变量是低效的。尽管如此,Spark提供了两种有限类型的共享变量,广播变量和累加器。程序员

 

广播变量算法

广播变量容许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可被用于有效地给每一个节点一个大输入数据集的副本。Spark还尝试使用高效地广播算法来分发变量,进而减小通讯的开销。apache

Spark的动做经过一系列的步骤执行,这些步骤由分布式的洗牌操做分开。Spark自动地广播每一个步骤每一个任务须要的通用数据。这些广播数据被序列化地缓存,在运行任务以前被反序列化出来。这意味着当咱们须要在多个阶段的任务之间使用相同的数据,或者以反序列化形式缓存数据是十分重要的时候,显式地建立广播变量才有用。编程


经过在一个变量v上调用SparkContext.broadcast(v)能够建立广播变量。广播变量是围绕着v的封装,能够经过value方法访问这个变量。举例以下:数组

 

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0) scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3)

 

在建立了广播变量以后,在集群上的全部函数中应该使用它来替代使用v.这样v就不会不止一次地在节点之间传输了。另外,为了确保全部的节点得到相同的变量,对象v在被广播以后就不该该再修改。缓存

 

累加器分布式

累加器是仅仅被相关操做累加的变量,所以能够在并行中被有效地支持。它能够被用来实现计数器和总和。Spark原生地只支持数字类型的累加器,编程者能够添加新类型的支持。若是建立累加器时指定了名字,能够在Spark的UI界面看到。这有利于理解每一个执行阶段的进程。(对于python还不支持)函数

累加器经过对一个初始化了的变量v调用SparkContext.accumulator(v)来建立。在集群上运行的任务能够经过add或者"+="方法在累加器上进行累加操做。可是,它们不能读取它的值。只有驱动程序可以读取它的值,经过累加器的value方法。this

下面的代码展现了如何把一个数组中的全部元素累加到累加器上:

 

scala> val accum = sc.accumulator(0, "My Accumulator") accum: spark.Accumulator[Int] = 0 scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value res2: Int = 10

 

尽管上面的例子使用了内置支持的累加器类型Int,可是开发人员也能够经过继承AccumulatorParam类来建立它们本身的累加器类型。AccumulatorParam接口有两个方法:

zero方法为你的类型提供一个0值。

addInPlace方法将两个值相加。

假设咱们有一个表明数学vector的Vector类。咱们能够向下面这样实现:

 

object VectorAccumulatorParam extends AccumulatorParam[Vector] { def zero(initialValue: Vector): Vector = { Vector.zeros(initialValue.size) } def addInPlace(v1: Vector, v2: Vector): Vector = { v1 += v2 } } // Then, create an Accumulator of this type: val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)

在Scala里,Spark提供更通用的累加接口来累加数据,尽管结果的类型和累加的数据类型可能不一致(例如,经过收集在一块儿的元素来建立一个列表)。同时,SparkContext..accumulableCollection方法来累加通用的Scala的集合类型。

 

累加器仅仅在动做操做内部被更新,Spark保证每一个任务在累加器上的更新操做只被执行一次,也就是说,重启任务也不会更新。在转换操做中,用户必须意识到每一个任务对累加器的更新操做可能被不仅一次执行,若是从新执行了任务和做业的阶段。

累加器并无改变Spark的惰性求值模型。若是它们被RDD上的操做更新,它们的值只有当RDD由于动做操做被计算时才被更新。所以,当执行一个惰性的转换操做,好比map时,不能保证对累加器值的更新被实际执行了。下面的代码片断演示了此特性:

 

val accum = sc.accumulator(0) data.map { x => accum += x; f(x) } //在这里,accum的值仍然是0,由于没有动做操做引发map被实际的计算.
相关文章
相关标签/搜索