Spark:foreach和foreachpartition的区别

1、RDD基础

1.RDD分布式数据集的五大特性

(1)A list of partitions数据库

(2)A function for computing each split缓存

(3)A list of dependencies on other RDDs微信

(4)Optionally,a Partitioner for key-value RDDs网络

(5)Optionally,a list of preferred locations to compute each split闭包

2.RDD的操做类型

 Transformations:转换,lazy型,不会触发计算分布式

 Action:触发jobide

 Persist:缓存也不会触发job,在第一次触发job以后才会真正进行缓存函数

3.RDD的计算

RDD的计算实际上咱们能够分为两大部分。性能

1)Driver端的计算this

主要是stage划分,task的封装,task调度执行

2)Executor端的计算

真正的计算开始,默认状况下每一个cpu运行一个task。一个task实际上就是一个分区,咱们的方法不管是转换算子里封装的,仍是action算子里封装的都是此时在一个task里面计算一个分区的数据。

2、源码相关

1.第一次封装

/**
   * Applies a function f to all elements of this RDD.
   */
  def foreach(f: T => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
  }

  /**
   * Applies a function f to each partition of this RDD.
   */
  def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
    val cleanF = sc.clean(f)
    sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
  }

能够看到方法经过clean操做(清理闭包,为序列化和网络传输作准备),进行了一次匿名函数的封装, 针对foreach方法,是咱们的方法被传入了迭代器foreach(每一个元素遍历执行一次函数), 而对于foreachpartition方法是迭代器被传入了咱们的方法(每一个分区执行一次函数,咱们获取迭代器后须要自行进行迭代处理)

2.第二次封装

def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: Iterator[T] => U,
      partitions: Seq[Int]): Array[U] = {
    val cleanedFunc = clean(func)
    runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
  }

就是讲上述封装的方法进一步按照匿名函数封装

(ctx:TaskContext,it:Iterator[T] => cleanFunc(it))

3.执行的时候

Spark的Task类型咱们用到的也就两个

1)shuffleMapTask

2)ResultTask

Action算子的方法是在ResultTask中执行的,也即ResultTask的runTask方法。

首先反序列化获得咱们的方法和RDD,而后执行。传入的是迭代器

override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val threadMXBean = ManagementFactory.getThreadMXBean
    val deserializeStartTime = System.currentTimeMillis()
    val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime
    } else 0L
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
    _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
      threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
    } else 0L

    func(context, rdd.iterator(partition, context))
  }

3、总结

RDD.foreach(foreachFunction)

RDD.foreachPatition(foreachPartitionFunction)

通过第二部分析咱们能够理解,展开以后实际上就是

RDD的每一个分区的iterator:

iterator.foreach(foreachFunction)

foreachPartitionFunction(iterator)

这就很明显了,假如咱们的Function中有数据库,网络TCP等IO链接,文件流等等的建立关闭操做,采用foreachPartition方法,针对每一个分区集合进行计算,更能提升咱们的性能。

张泽立微信

相关文章
相关标签/搜索