spark 计算引擎

spark 计算引擎(一)

spark 的计算是一个层层迭代的过程,迭代即上一轮的输出是下一轮计算的输入,RDD是spark计算的核心,是spark对各类计算数据的统一抽象模型,关于RDD的特性,将在从此的博客中陆续更新,今天要给你们展现的是spark的计算引擎模型。web

迭代计算

MappedRDD 的iterator 方法实际是父RDD的iterator方法,若是任务是初次执行,此时还没有缓存,会调用computeOfReadCheckpoint方法,迭代方法的容错处理过程,若是在计算的过程当中某个分区任务执行失败了,但其余分区任务执行成功了,能够利用DAG从新调度。失败的任务将从检查点恢复状态,而那些成功执行的任务的结果已经存储到存储体系中了,因此调用CacheManager的getOrCompute方法便可获取,不需再次执行。iterator方法以下:算法

final def iterator(split:Partition,Context:TaskContext):Iterator[T]={
  if(storageLevel!=StorageLevel.NONE){
    SparkEnv.get.cacheManager.getOrcompute(this,split,context,storageLevel)
  }else{
    computeOrReadCheckPoint(split,contxt)
  }
}

Mappped rdd的compute方法首先调用firstparent 找到父RDD,通过层层调用最终达到hadoop-RDD,即最初的计算入口。(spark是惰性计算的,在spark中对RDD的操做主要是两种transformation和action,只有action才会触发真正的计算,而后层层迭代求出最终的结果,transformation是不会触发计算的)。缓存

shuffle

shuflle是全部Mapreduce框架所必须通过的阶段,shuffle 是链接map任务和reduce任务的桥梁,map任务的输出结果按照key值哈希分配后给某reduce个任务spark早期版本的shuffle过程以下:网络

这里写图片描述

从上图能够知道:
* map任务会为每一个reduce任务建立一个part文件(这些part文件会存放在bucket)。若是有M个map任务和N个reduce任务最终将生成M*N个part文件;
* map任务的结果将按照partition的不一样写入不一样的bucket中;
* reduce任务从本地或者远端的map任务所在的BlockManager中获取对应的bucket做为输入;app

经过对shuffle的过程的了解发现,map任务中间结果先存入内存,而后才写入磁盘。这样容易致使内存紧张,进而发生溢出。其次,每一个map任务都有M个bucket相对应,虽然bucket的的自己并非很大,可是当bucket的数量太多时会频繁形成网络IO,成为性能的瓶颈。框架

因此在后期的spark中对以上问题进行了改进,改进主要包括如下几个方面:
一、 将map任务给每一个partition的reduce任务任务输出的bucket合并到同一个文件中,这样能够解决bucket数量多而形成频繁网络IO的问题;
二、 map任务逐条输出计算结果,而不是一次性输出到内存,并使用AppendOnlyMAp缓存机器局和算法对中间结果进行聚合,这样可大大减小中间结果所占用的内存。
三、 reduce在拉取map任务的中间输出时也是逐条读取,而不是一次性读入内存,而且在内存中进行聚合和排序操做,这样能够减少数据所占的内存。
四、 reduce任务将要拉取的Block按照blockmanager地址划分,而后将同一BlockManager地址中的Block累积为少许的网络请求,减小网络IO;svg

Map端计算结果缓存处理及持久化

Map端对计算结果的缓存有三种方式:
* map端对计算结果在缓存中进行聚合和排序;
* map不使用缓存,也不执行聚合和排序,而是直接调用spillToPartitionFiles将各个文件patition直接写到本身的存储文件中,最后由reduce进行聚合排序;
* map端进行简单缓存oop

对计算结果的聚合和排序可以大大的节省IO开销,从而提高系统性能(通常经过patitionID和key进行排序和聚合)。性能

reduce端读取中间计算结果

当map任务相关的Stage都执行完的时候,会唤起下游Stage的提交及任务执行,ResultTask的计算是由RDD的iterator方法驱动的,因此最终计算过程最终会落到ShuffledRDD的compute方法,compute方法首相调用SortShuffleManager的getReader方法建立HassShuffleReader,经过其read方法读取依赖的中间结果。
spark 能够经过MapoutputTracker的getServerStatus来获取map任务执行状态信息,shuffledBlockFetcherIterator是读取中间结果的关键,获取过程实际上是建立了一个netty服务,经过这个服务进行上传和查找中间结果所在的位置。this