先来回顾一下Spark的程序运行架构:
node
对于任何一个Spark程序,有且仅有一个SparkContext
,其实一个SparkContext
就对应了一个Driver
;git
一个Driver
就是一个进城,运行在一个节点上,程序的main函数就运行在Driver
上;github
main函数经过分析程序,将程序转化成一些列Task
,而后分发到各个节点的Executor
上去执行;一个节点能够运行一个或多个Executor
;而后一个Executor
能够同时跑若干个Task
;算法
每一个节点有多少个Executor,每一个Executor上有多少个Task,都是能够由用户指定的计算资源)架构
(分布式计算:主要就是须要分布式地调度计算资源和计算任务)并发
Job的实际执行流程比用户头脑中的要复杂,须要先创建逻辑执行图(或者叫数据依赖图),而后划分逻辑执行图生成DAG型的物理执行图,而后生成具体Task
执行。app
一些典型的transformation()及其建立的RDD:框架
iterator(split) 的意思是 foreach record in the partition分布式
实际上,从parent RDD 通过转换操做,生成RDD x的过过程当中,须要考虑的问题是主要会分为三种:函数
max(numPartitions[parent RDD 1], ..., numPartitions[parent RDD n])
numPartitions[parent RDD n])`)
partition
之间是什么依赖关系?是依赖 parent RDD 中一个仍是多个 partition?主要须要考虑的是最后一个问题。这要结合不一样 transformation() 的语义,不一样的 transformation() 的依赖关系不一样。
RDD x 中每一个 partition 能够依赖于 parent RDD 中一个或者多个 partition。并且这个依赖能够是NarrowDependency 彻底依赖(窄依赖)
或者ShuffleDependency 部分依赖(宽依赖)
。
下图展现了彻底依赖和部分依赖
前三个是彻底依赖,RDD x 中的 partition 与 parent RDD 中的 partition/partitions 彻底相关。也就是说parent RDD中的每一个partition只会被RDD x中的一个partiton使用!
最后一个是部分依赖,RDD x 中的 partition 只与 parent RDD 中的 partition 一部分数据相关,另外一部分数据与 RDD x 中的其余 partition 相关。也就是说父RDD的每一个partiton都有可能被多个子RDD的partition使用
在 Spark 中,彻底依赖被称为 NarrowDependency,部分依赖被称为 ShuffleDependency。其实 ShuffleDependency 跟 MapReduce 中 shuffle 的数据依赖相同(mapper 将其 output 进行 partition,而后每一个 reducer 会将全部 mapper 输出中属 于本身的 partition 经过 HTTP fetch 获得)
主要的问题是:给定job的逻辑执行图,如何生成物理执行图(也就是 stages 和 tasks)?
逻辑执行计划到物理执行计划的转化须要执行:
Stage
Task
Spark Task的类型只有两种:ShuffleMapTask
和ResultTask
问:每一个Stage的Task数目?
- First Stage: 由hdfs block或hbase regioin 数目决定
- Other Stages: 由用户设置,默认与第一个阶段相等
根据RDD的依赖关系,划分Stage:
从后往前推算,遇到ShuffleDependency就断开,遇到NarrowDependency就将其加入该stage。 每一个stage里面task的数目由该stage最后一个 RDD 中的partition个数决定
生成了Stage和Task后,接下来的问题就是怎么去执行物理图:
回想 pipeline
的思想是 数据用的时候再算,并且数据是流到要计算的位置的
Result 产生的地方的就是要计算的位置,要肯定 “须要计算的数据”,咱们能够从后往前推,须要哪一个 partition 就计算哪一个 partition,若是 partition 里面没有数据,就继续向前推,造成 computing chain。这样推下去,结果就是:须要首先计算出每一个 stage 最左边的 RDD 中的某些 partition
整个 computing chain 根据数据依赖关系自后向前创建,遇到 ShuffleDependency 后造成 stage。在每一个 stage 中,每一个 RDD 中的 compute() 调用 parentRDD.iter() 来将 parent RDDs 中的 records 一个个 fetch 过来。
用户的 driver 程序中一旦出现 action(),就会生成一个 job
每个 job 包含 n 个 stage,最后一个 stage 产生 result。好比,第一章的 GroupByTest 例子中存在两个 job,一共产生了两组 result。在提交 job 过程当中,DAGScheduler 会首先划分 stage,而后先提交无 parent stage 的 stages,并在提交过程当中肯定该 stage 的 task 个数及类型,并提交具体的 task。无 parent stage 的 stage 提交完后,依赖该 stage 的 stage 才可以提交。从 stage 和 task 的执行角度来说,一个 stage 的 parent stages 执行完后,该 stage 才能执行
TaskScheduler的主要做用就是得到须要处理的任务集合,并将其发送到集群进行处理。而且还有汇报任务运行状态的做用。 因此其是在Master端。具体有如下4个做用:
接收来自Executor的心跳信息,使Master知道该Executer的BlockManager还“活着”
对于失败的任务进行重试
对于stragglers(拖后腿的任务)放到其余的节点执行
向集群提交任务集,交给集群运行
FIFO或Fair
优化机制:数据本地性和推测执行
Task被序列化后,发送到executor上执行
为何要写在本地?
后面的 RDD 多个分区都要去读这个信息,若是放到内存,若是出现数据丢失,后面的全部步骤所有不能进行,违背了以前所说的须要父 RDD 分区数据所有 ready 的原则
同一个 stage 里面的 task 是能够并发执行的,下一个 stage 要等前一个 stage ready
数据用的时候再算,并且数据是流到要计算的位置的
Shuffle是分布式计算框架的核心s数据交换方式,它的实现方式直接决定了计算框架的性能和可扩展性
Shuffle阶段主要解决的问题是:数据是怎么经过 ShuffleDependency 流向下一个 stage 的
产生shuffle的算子:join, cogroup, 和*ByKey(reduceByKey, groupByKey, sortByKey,…
Shuffle中map端输出的数据要先写到磁盘,而后由reduce进行拉取
上图有 4 个 ShuffleMapTask 要在同一个 worker node 上运行,CPU core 数为 2,能够同时运行两个 task。每一个 task 的执行结果(该 stage 的 finalRDD 中某个 partition 包含的 records)被逐一写到本地磁盘上。每一个 task 包含 R 个缓冲区,R = reducer 个数(也就是下一个 stage 中 task 的个数),缓冲区被称为 bucket
ShuffleMapTask 的执行过程很简单:先利用 pipeline 计算获得 finalRDD 中对应 partition 的 records。每获得一个 record 就将其送到对应的 bucket 里,具体是哪一个 bucket 由partitioner.partition(record.getKey()))决定。每一个 bucket 里面的数据会不断被写到本地磁盘上,造成一个 ShuffleBlockFile,或者简称 FileSegment。以后的 reducer 会去 fetch 属于本身的FileSegment
,而后进入 shuffle read 阶段。
这样的实现很简单,但有几个问题:
产生的 FileSegment 过多
缓冲区占用内存空间大
第一个问题有一些方法去解决,下面介绍已经在 Spark 里面实现的 FileConsolidation 方法: 一个MapTask只产生一个FileSegment
;前面每一个MapTask有几个Reduce就产生几个FileSegment
;以下图:
在一个 core 上连续执行的 ShuffleMapTasks 能够共用一个输出文件 ShuffleFile。先执行完的 ShuffleMapTask 造成 ShuffleBlock i,后执行的 ShuffleMapTask 能够将输出数据直接追加到 ShuffleBlock i 后面,造成 ShuffleBlock i;数据块在数据文件中的偏移量存储在不一样的索引文件中
Shuffle read就是将以前Shuffle write写的数据拉过来再作处理;
fetch 来的 records 被逐个 aggreagte 到 HashMap 中,等到全部 records 都进入 HashMap,就获得最后的处理结果