Accumulator操做

Accumulator简介

Accumulator是spark提供的累加器,顾名思义,该变量只可以增长。 
只有driver能获取到Accumulator的值(使用value方法),Task只能对其作增长操做(使用 +=)。你也能够在为Accumulator命名(不支持Python),这样就会在spark web ui中显示,能够帮助你了解程序运行的状况。web

Accumulator使用

使用示例

举个最简单的accumulator的使用例子:缓存

//在driver中定义
val accum = sc.accumulator(0, "Example Accumulator")
//在task中进行累加
sc.parallelize(1 to 10).foreach(x=> accum += 1)

//在driver中输出
accum.value
//结果将返回10
res: 10

累加器的错误用法

val accum= sc.accumulator(0, "Error Accumulator")
val data = sc.parallelize(1 to 10)
//用accumulator统计偶数出现的次数,同时偶数返回0,奇数返回1
val newData = data.map{x => {
  if(x%2 == 0){
    accum += 1
      0
    }else 1
}}
//使用action操做触发执行
newData.count
//此时accum的值为5,是咱们要的结果
accum.value

//继续操做,查看刚才变更的数据,foreach也是action操做
newData.foreach(println)
//上个步骤没有进行累计器操做,但是累加器此时的结果已是10了
//这并非咱们想要的结果
accum.value

缘由分析

官方对这个问题的解释以下描述:app

For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.ide

咱们都知道,spark中的一系列transform操做会构成一串长的任务链,此时须要经过一个action操做来触发,accumulator也是同样。所以在一个action操做以前,你调用value方法查看其数值,确定是没有任何变化的。ui

因此在第一次count(action操做)以后,咱们发现累加器的数值变成了5,是咱们要的答案。spa

以后又对新产生的的newData进行了一次foreach(action操做),其实这个时候又执行了一次map(transform)操做,因此累加器又增长了5。最终得到的结果变成了10。rest

这里写图片描述

解决办法

看了上面的分析,你们都有这种印象了,那就是使用累加器的过程当中只能使用一次action的操做才能保证结果的准确性。code

事实上,仍是有解决方案的,只要将任务之间的依赖关系切断就能够了。什么方法有这种功能呢?大家确定都想到了,cache,persist。调用这个方法的时候会将以前的依赖切除,后续的累加器就不会再被以前的transfrom操做影响到了。orm

这里写图片描述

//
val accum= sc.accumulator(0, "Error Accumulator")
val data = sc.parallelize(1 to 10)

//代码和上方相同
val newData = data.map{x => {...}}
//使用cache缓存数据,切断依赖。
newData.cache.count
//此时accum的值为5
accum.value

newData.foreach(println)
//此时的accum依旧是5
accum.value

总结

使用Accumulator时,为了保证准确性,只使用一次action操做。若是须要使用屡次则使用cache或persist操做切断依赖。

相关文章
相关标签/搜索