经过map操做看RDD的Map过程

RDD中的map,flatMap等操做是怎么串在一块儿造成DAG图的呢?这是个很重要的问题,理解了这一点才能更好的理解Spark的内核实现。本文经过map过程来试图解释这一点。ide

先看看RDD的一个子类:MapPartitionsRDD,它会用在map函数场景下。函数

它的定义:this

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false,
    isFromBarrier: Boolean = false,
    isOrderSensitive: Boolean = false)
  extends RDD[U](prev)

prev是父RDD,就是父类RDD的入参,在后面的代码里就是firstParent。spa

F表明了map函数的定义,其中第二个Int参数是分区索引号。咱们先无论这个f入参怎么传进来的,先看看MapPartitionsRDD须要作哪些事。code

前面说过,对于RDD来讲,最重要的函数就是compute,MapPartitionsRDD的compute方法定义:索引

override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))

很明确,就是用当前的solit分区来执行入参的f函数!it

那么,这个MapPartitionsRDD是怎么产生的呢?原来是在RDD类中的map函数产生的:spark

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
  }

这几行代码什么意思?这里仍是须要好好分析一下的。io

对照MapPartitionsRDD的定义,咱们知道:class

(_, _, iter) => iter.map(cleanF)

里面的_,_表明TaskContext和分区索引,由于在MapPartitionsRDD的compute方法中已经有了split入参和context入参,因此在RDD中就不须要传这两个参数了。

iter表明要处理的数据集,在MapPartitionsRDD中的compute方法中定义为:

firstParent[T].iterator(split, context)

函数就是第一个父类RDD的split分区的数据集。这里就很清楚了,对这个数据集作cleanF操做(也就是sc.clean以后的map函数,sc.clean是去掉不能序列号的字节码的意思,保证能够序列化后分发到其余节点执行)。

相关文章
相关标签/搜索