Spark中foreachPartition和mapPartitions的区别

spark的运算操做有两种类型:分别是Transformation和Action,区别以下:mysql

Transformation:表明的是转化操做就是咱们的计算流程,返回是RDD[T],能够是一个链式的转化,而且是延迟触发的。sql

Action:表明是一个具体的行为,返回的值非RDD类型,能够一个object,或者是一个数值,也能够为Unit表明无返回值,而且action会当即触发job的执行。api

Transformation的官方文档方法集合以下:bash

map
filter
flatMap
mapPartitions
mapPartitionsWithIndex
sample
union
intersection
distinct
groupByKey
reduceByKey
aggregateByKey
sortByKey
join
cogroup
cartesian
pipe
coalesce
repartition
repartitionAndSortWithinPartitions

Action的官方文档方法集合以下:微信

reduce
collect
count
first
take
takeSample
takeOrdered
saveAsTextFile
saveAsSequenceFile
saveAsObjectFile
countByKey
foreach

结合平常开发好比经常使用的count,collect,saveAsTextFile他们都是属于action类型,结果值要么是空,要么是一个数值,或者是object对象。其余的如map,filter返回值都是RDD类型的,因此简单的区分两个不一样之处,就能够用返回值是否是RDD[T]类型来辨别。
接着回到正题,咱们说下foreachPartition和mapPartitions的分别,细心的朋友可能会发现foreachPartition并无出如今上面的方法列表中,缘由多是官方文档并只是列举了经常使用的处理方法,不过这并不影响咱们的使用,首先咱们按照上面的区分原则来看下foreachPartition应该属于那种操做,官网文档的这个方法api以下:this

public void foreachPartition(scala.Function1<scala.collection.Iterator<T>,scala.runtime.BoxedUnit> f)
 
Applies a function f to each partition of this RDD.
 
Parameters:
f - (undocumented)

从上面的返回值是空能够看出foreachPartition应该属于action运算操做,而mapPartitions是在Transformation中,因此是转化操做,此外在应用场景上区别是mapPartitions能够获取返回值,继续在返回RDD上作其余的操做,而foreachPartition由于没有返回值而且是action操做,因此使用它通常都是在程序末尾好比说要落地数据到存储系统中如mysql,es,或者hbase中,能够用它。spa

固然在Transformation中也能够落地数据,可是它必须依赖action操做来触发它,由于Transformation操做是延迟执行的,若是没有任何action方法来触发,那么Transformation操做是不会被执行的,这一点须要注意scala

一个foreachPartition例子:code

def main(args: Array[String]): Unit = {
    val sp = new SparkConf();
    sp.setAppName("zhangzeli")
    sp.setMaster("local")
    val sc = new SparkContext(sp);
    val rdd =sc.parallelize(Seq(1,2,3,4,5,6),3);
    rdd.foreachPartition(p=>{
      p.foreach(line=>{
        // partiton.size 不能执行这个方法,不然下面的foreach方法里面会没有数据,
        //由于iterator只能被执行一次
        println(line)
      })
    });
    while (true){}

  }

一个mapPartitions例子:orm

val sparkConf=new SparkConf()
     val sc=new SparkContext(sparkConf)
      sparkConf.setAppName("spark demo example ")
    val rdd=sc.parallelize(Seq(1,2,3,4,5),3)
 
    rdd.mapPartitions(partiton=>{
      //只能用map,不能用foreach,由于foreach没有返回值
      partiton.map(line=>{
        //save line
      }
      )
    })
 
    rdd.count()//须要action,来触发执行
    sc.stop()

最后,须要注意一点,若是操做是iterator类型,咱们是不能在循环外打印这个iterator的size,一旦执行size方法,至关于iterato就会被执行,因此后续的foreach你会发现是空值的,切记iterator迭代器只能被执行一次。

个人微信张泽立,泽立,泽,微信

相关文章
相关标签/搜索