Spark中的RDD究竟是怎么玩的?

Spark是开源的分布式计算引擎,基于RDD来构造数据处理流程,并在集群间调度任务,经过分区数据管理机制来划分任务的并行度,并在任务之间交换分区数据,实现分布式的数据处理。java

RDD是Spark中最重要的概念,理解了RDD是什么,基本也就理解了一半Spark的内部机密了。node

 

一、RDD基类算法

RDD是Spark中表示数据集的基类,是可序列号的对象,所以RDD可在Spark节点中复制。RDD定义了数据迭代器来循环读取数据,以及在数据集上定义各种转换操做,生成新的RDD。app

RDD的各类算子会触发生成新的RDD。如:分布式

map操做生成MapPartitionsRDD。ide

filter操做也生成MapPartitionsRDD,filter操做实际上是在以前的RDD迭代器上封装了一层filter操做,其实仍是第一个迭代器,只不过这个迭代器会抛弃掉一些不知足的记录。函数

RDD的计算过程是经过compute方法来触发的。oop

1.1 RDD触发任务this

submit过程是提交spark程序到集群,这时候会触发application事件和driver事件等,并经过master节点选择对应的node来建立app和driver,同时在node上执行spark jar包里的main方法。但task的真正执行要等到RDD的compute动做来触发的。编码

RDD经过compute触发任务,提交FinalStage给Dag执行。如collect(),count()等方法都会触发compute过程,间接提交任务。

RDD.compute()=> finalStage => dag.submitJob()=> submitMissingStage() .

dag.submitJob()=> scheduleImpl.launchTask()=>scheduleBackend => executorBackend=> executor.launchTask()=> executorBackend.taskComplete msg => scheduleBackend.taskCompleted=>dag.stageFinished()=> ...

上面是RDD提交任务的大体流程。Compute函数是触发函数,这会致使最后一个RDD被执行,也是finalStage;finalStage调用DAG的submitJob函数提交stage,这里的stage就是finalStage。

Stage是从源头到finalStage串起来的,执行的时候是反向寻找的,这句话要好好体会,这个过程其实就是RDD的秘密了。

咱们先看下RDD的经典图例。图中中间的部分Transformation是RDD的计算过程,左边的HDFS示意数据源,右边的HDFS示意RDD的finalStage执行的操做(图中的操做是写入hdfs,固然也能够是print操做等等,就看你怎么写了)。

Stage1和stage2是窄依赖,map和union都是窄依赖;stage3是宽依赖,这里是join操做。窄依赖的意思就是操做只依赖一个stage的数据,宽依赖的意思是依赖于多个stage,对这多个stage的数据要作全链接操做。

1.二、RDD执行示例

RDD经过runJob调用来得到执行,以下:

def collect(): Array[T] = withScope {
    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
    Array.concat(results: _*)
  }

Sc是SparkContext。

对每一个分区执行func操做,返回结果是一个长度等于分区数的Array。

Sc.runJob再调dagScheduler.runJob方法。具体能够看DagScheduler的做业执行步骤,这里先不说,看笔者的专门论述DagScheduler的文章。

1.三、迭代器

RDD实际执行是经过迭代器读取数据的。

RDD是抽象类,定义了几个接口:

分别是getPartitions、compute、getPreferredLocations。RDD数据是分区存储,每个分区可能分布在申请spark资源的任何位置。这三个接口能够描述RDD的所有信息,其中getPreferredLocations这个方法是和计算本地化有关的,这里咱们就先忽略它,不影响咱们理解RDD的原理。

override protected def getPartitions: Array[Partition] = {}
override def compute(split: Partition, context: TaskContext): Iterator[java.lang.Integer] = new NextIterator[Integer] {}

getPartitions方法咱们也不用太关注,它的做用是返回一个分区列表,表示这个RDD有几个分区,实际运行的时候RDD的每一个分区会被安排到单独的节点上运行,这样来实现分布式计算的。

咱们最关心的是compute的方法,这个方法返回一个迭代器,这个迭代器就是这个RDD的split这个分区的数据集。至于这个迭代器的数据是什么,是在compute方法体中写代码来生成的。咱们能够定义本身的RDD,只要写代码实现这几个方法就能够了!

自定义RDD有什么好处呢?最大的好处就是能够把本身的数据集归入到Spark的分布式计算体系中,帮助你实现数据分区,任务分配,和其余RDD执行全链接汇聚操做等。

言归正传,回到compute方法自己。

怎么得到Iterator[T],对ShuffleRDD来讲是从BlockManager获取迭代器Iterator[T]。这种迭代器是blockResult,是ShuffleMapTask执行结果的保存格式;另外一种就是直接得到iter,这种是ResultTask的执行结果的数据。

第一种状况,看BlockManager可否找到本RDD的partition的BlockResult。看看getOrElseUpdate方法还传递了一个函数做为最后一个入参,若是不存在指定的BlockResult,则返回入参函数来计算获得iter,方法体定义以下:

() => {
  readCachedBlock = false
  computeOrReadCheckpoint(partition, context)
}

主要就是调用computeOrReadCheckpoint方法计算分区。

def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
  if (isCheckpointedAndMaterialized) {
    firstParent[T].iterator(split, context)
  } else {
    compute(split, context)
  }
}

computeOrReadCheckpoint获得Iterator,若是是checkpoint的那么调用第一个父类的iterator方法获得Iterator,这里父类就是CheckpointRDD;不然就是调用compute方法获得Iterator。

因此,RDD的迭代器的实际获取分红两步:

首先,判断是否存在该RDD指定partition的BlockResult,若是存在则将BlockResult做为Iterator结果,此时表示该RDD是shuffleRDD之类。

而后若是上述不知足,则又分两种状况,第一种这是checkpoint的RDD,则调用父RDD的iterator方法(此时父RDD就是CheckpointRDD);不然调用compute方法来得到Iterator。

二、Stage划分

咱们知道RDD的提交Spark集群执行是分阶段划分Stage提交的。从最后一个Stage开始,依次循环递归判断是否要调用依赖RDD的Stage,Stage的划分是根据是否要Shuffle做为分界点的。

若是某个RDD的依赖(dep)是ShuffleDependency,则次RDD做为ShuffleMapTask任务提交,不然最后一个RDD做为ResultTask提交。

递归提交Stage,对ShuffleMapTask类型的RDD,会一直递归判断该RDD是否存在前置的ShuffleDependency,若是存在则递归提交前依赖RDD。

整个Spark做业是RDD串接的,若是不存在Shuffle依赖,则提交最后一个RDD,而且只有这一个RDD被提交。在计算最后一个RDD的iterator时,被调用到父RDD的iterator方法,此时父RDD通常都是MapPartitionsRDD。在MapPartitionsRDD中有进一步叙述。

三、RDD子类

RDD含有多个子类,如MapPartitionRDD,HadoopRDD、CoGroupedRDD等等。笔者这里就找几个例子简单说明一下他们的内部逻辑。 

3.1 MapPartitionsRDD

MapPartitionsRDD是RDD的子类,前面看到RDD的诸多算子都会生成新的MapPartitionRDD。

MapPartitionsRDD的构造函数须要入参f,它是一个函数抽象类或者叫作泛类。

f: (TaskContext, Int, Iterator[T]) => Iterator[U]

f的入参有三个:

(1) TaskContext:是任务上下文

(2) Int:是分区编码

(3) Iterator[T]是分区迭代器

f的输出也是一个Iterator迭代器。能够看出,f是一个抽象的从一个迭代器生成另外一个迭代器的自定义函数。对数据的处理逻辑就是体如今f上。

MapPartitionRDD中触发计算的compute方法定义以下:

override def compute(split: Partition, context: TaskContext): Iterator[U] =
    f(context, split.index, firstParent[T].iterator(split, context))

这里的f是MapPartitionRDD的构造函数中传进入的入参,是用户自定义的map函数。这样,经过RDD的map、flatmap等算子和MapPartitionRDD,能够将RDD上的一系列操做不停的串联下去。

3.2 CoalescedRDD

CoalescedRDD将M个分区的RDD从新分红N个分区,造成新的RDD。在计算过程当中,会引发Shuffle工程。

首先CoalescedRDD须要一个从新分区算法,将M个分区如何划分到N个分区,这里M>N。从新分区的结果是N的每一个分区对应了M的多个分区,用List<Int>来表示,List<Int>中每一个Int表示父RDD中M个分区之一的编号。

若是CoalescedRDD没有指定本身的从新分区算法,则用DefaultPartitionCoalescer来作从新分区计算。

CoalescedRDD的compute过程以下:

override def compute(partition: Partition, context: TaskContext): Iterator[T] = {
  partition.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap { 
    parentPartition => firstParent[T].iterator(parentPartition, context)
  }
}

partition.parents是指CoalescedRDD的第partition分区所对应的父RDD的分区列表,对分区列表的每一个分区,执行:

firstParent[T].iterator(parentPartition, context)

而后获得最终的Iterator[T]。这段应该不难理解。

须要留意的是,这里获得的Iterator[T]最终是要写到Shuffle的,由于CoalescedRDD对应的ShuffleMapTask而不是ResultTask。

对于理解Spark计算流程来讲,理解了Shuffle的过程,也就解决了一半的疑惑了。

相关文章
相关标签/搜索