闭包是一个函数,返回值依赖于声明在函数外部的一个或多个变量。闭包一般来说能够简单的认为是能够访问一个函数里面局部变量的另一个函数。json
以下面这段匿名的函数:闭包
val multiplier = (i:Int) => i * 10
函数体内有一个变量 i,它做为函数的一个参数。以下面的另外一段代码:并发
val multiplier = (i:Int) => i * factor
在 multiplier
中有两个变量:i 和 factor。其中的一个 i 是函数的形式参数,在 multiplier
函数被调用时,i 被赋予一个新的值。然而,factor不是形式参数,而是自由变量,考虑下面代码:函数
var factor = 3 val multiplier = (i:Int) => i * factor
这里咱们引入一个自由变量 factor
,这个变量定义在函数外面。大数据
这样定义的函数变量 multiplier
成为一个"闭包",由于它引用到函数外面定义的变量,定义这个函数的过程是将这个自由变量捕获而构成一个封闭的函数人工智能
完整的例子:spa
object Test { def main(args: Array[String]) { println( "muliplier(1) value = " + multiplier(1) ) println( "muliplier(2) value = " + multiplier(2) ) } var factor = 3 val multiplier = (i:Int) => i * factor }
先来看下面一段代码:scala
val data=Array(1, 2, 3, 4, 5) var counter = 0 var rdd = sc.parallelize(data) // ???? 这样作会怎么样 rdd.foreach(x => counter += x) println("Counter value: " + counter)
首先确定的是上面输出的结果是0,park将RDD操做的处理分解为tasks,每一个task由Executor
执行。在执行以前,Spark会计算task的闭包。闭包是Executor
在RDD上进行计算的时候必须可见的那些变量和方法(在这种状况下是foreach())。闭包会被序列化并发送给每一个Executor,可是发送给Executor的是副本,因此在Driver上输出的依然是counter
自己,若是想对全局的进行更新,用累加器,在spark-streaming
里面使用updateStateByKey
来更新公共的状态。code
另外在Spark中的闭包还有别的做用,orm
1.清除Driver发送到Executor上的无用的全局变量等,只复制有用的变量信息给Executor
2.保证发送到Executor上的是序列化之后的数据
好比在使用DataSet时候 case class的定义必须在类下,而不能是方法内,即便语法上没问题,若是使用过json4s来序列化,implicit val formats = DefaultFormats
的引入最好放在类下,不然要单独将这个format序列化,即便你没有使用到它别的东西。
闭包在Spark的整个生命周期中到处可见,就好比从Driver
上拷贝的全部数据都须要序列化 + 闭包的方式到Executor
上的。
吴邪,小三爷,混迹于后台,大数据,人工智能领域的小菜鸟。
更多请关注