Apache Spark是用于大规模数据处理的统一分析引擎,基于内存计算,提升了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,容许用户将Spark部署在大量硬件之上,造成集群。算法
Spark源码从1.x的40w行发展到如今的超过100w行,有1400多位大牛贡献了代码。整个Spark框架源码是一个巨大的工程。下面咱们一块儿来看下spark的底层执行原理。多线程
具体运行流程以下:架构
SparkContext 向资源管理器注册并向资源管理器申请运行Executor框架
资源管理器分配Executor,而后资源管理器启动Executoride
Executor 发送心跳至资源管理器函数
SparkContext 构建DAG有向无环图学习
将DAG分解成Stage(TaskSet)大数据
把Stage发送给TaskScheduler优化
Executor 向 SparkContext 申请 Taskspa
TaskScheduler 将 Task 发送给 Executor 运行
同时 SparkContext 将应用程序代码发放给 Executor
Task 在 Executor 上运行,运行完毕释放全部资源
Val lines1 = sc.textFile(inputPath1).map(...).map(...)
Val lines2 = sc.textFile(inputPath2).map(...)
Val lines3 = sc.textFile(inputPath3)
Val dtinone1 = lines2.union(lines3)
Val dtinone = lines1.join(dtinone1)
dtinone.saveAsTextFile(...)
dtinone.filter(...).foreach(...)
上述代码的DAG图以下所示:
Spark内核会在须要计算发生的时刻绘制一张关于计算路径的有向无环图,也就是如上图所示的DAG。
Spark 的计算发生在RDD的Action操做,而对Action以前的全部Transformation,Spark只是记录下RDD生成的轨迹,而不会触发真正的计算。
一个Application能够有多个job多个Stage:
Spark Application中能够由于不一样的Action触发众多的job,一个Application中能够有不少的job,每一个job是由一个或者多个Stage构成的,后面的Stage依赖于前面的Stage,也就是说只有前面依赖的Stage计算完毕后,后面的Stage才会运行。
划分依据:
Stage划分的依据就是宽依赖,像reduceByKey,groupByKey等算子,会致使宽依赖的产生。
回顾下宽窄依赖的划分原则:
窄依赖:父RDD的一个分区只会被子RDD的一个分区依赖。即一对一或者多对一的关系,可理解为独生子女。 常见的窄依赖有:map、filter、union、mapPartitions、mapValues、join(父RDD是hash-partitioned)等。
宽依赖:父RDD的一个分区会被子RDD的多个分区依赖(涉及到shuffle)。即一对多的关系,可理解为超生。 常见的宽依赖有groupByKey、partitionBy、reduceByKey、join(父RDD不是hash-partitioned)等。
核心算法:回溯算法
从后往前回溯/反向解析,遇到窄依赖加入本Stage,碰见宽依赖进行Stage切分。
Spark内核会从触发Action操做的那个RDD开始从后往前推,首先会为最后一个RDD建立一个Stage,而后继续倒推,若是发现对某个RDD是宽依赖,那么就会将宽依赖的那个RDD建立一个新的Stage,那个RDD就是新的Stage的最后一个RDD。
而后依次类推,继续倒推,根据窄依赖或者宽依赖进行Stage的划分,直到全部的RDD所有遍历完成为止。
一个Spark程序能够有多个DAG(有几个Action,就有几个DAG,上图最后只有一个Action(图中未表现),那么就是一个DAG)。
一个DAG能够有多个Stage(根据宽依赖/shuffle进行划分)。
同一个Stage能够有多个Task并行执行(task数=分区数,如上图,Stage1 中有三个分区P一、P二、P3,对应的也有三个 Task)。
能够看到这个DAG中只reduceByKey操做是一个宽依赖,Spark内核会以此为边界将其先后划分红不一样的Stage。
同时咱们能够注意到,在图中Stage1中,从textFile到flatMap到map都是窄依赖,这几步操做能够造成一个流水线操做,经过flatMap操做生成的partition能够不用等待整个RDD计算结束,而是继续进行map操做,这样大大提升了计算的效率。
调度阶段的提交,最终会被转换成一个任务集的提交,DAGScheduler经过TaskScheduler接口提交任务集,这个任务集最终会触发TaskScheduler构建一个TaskSetManager的实例来管理这个任务集的生命周期,对于DAGScheduler来讲,提交调度阶段的工做到此就完成了。
而TaskScheduler的具体实现则会在获得计算资源的时候,进一步经过TaskSetManager调度具体的任务到对应的Executor节点上进行运算。
要保证相互依赖的做业调度阶段可以获得顺利的调度执行,DAGScheduler须要监控当前做业调度阶段乃至任务的完成状况。
这经过对外暴露一系列的回调函数来实现的,对于TaskScheduler来讲,这些回调函数主要包括任务的开始结束失败、任务集的失败,DAGScheduler根据这些任务的生命周期信息进一步维护做业和调度阶段的状态信息。
TaskScheduler经过回调函数通知DAGScheduler具体的Executor的生命状态,若是某一个Executor崩溃了,则对应的调度阶段任务集的ShuffleMapTask的输出结果也将标志为不可用,这将致使对应任务集状态的变动,进而从新执行相关计算任务,以获取丢失的相关数据。
一个具体的任务在Executor中执行完毕后,其结果须要以某种形式返回给DAGScheduler,根据任务类型的不一样,任务结果的返回方式也不一样。
对于FinalStage所对应的任务,返回给DAGScheduler的是运算结果自己。
而对于中间调度阶段对应的任务ShuffleMapTask,返回给DAGScheduler的是一个MapStatus里的相关存储信息,而非结果自己,这些存储位置信息将做为下一个调度阶段的任务获取输入数据的依据。
根据任务结果大小的不一样,ResultTask返回的结果又分为两类:
若是结果足够小,则直接放在DirectTaskResult对象内中。
若是超过特定尺寸则在Executor端会将DirectTaskResult先序列化,再把序列化的结果做为一个数据块存放在BlockManager中,而后将BlockManager返回的BlockID放在IndirectTaskResult对象中返回给TaskScheduler,TaskScheduler进而调用TaskResultGetter将IndirectTaskResult中的BlockID取出并经过BlockManager最终取得对应的DirectTaskResult。
一张图说明任务整体调度:
每一个Application获取专属的Executor进程,该进程在Application期间一直驻留,并以多线程方式运行Tasks。
Spark Application不能跨应用程序共享数据,除非将数据写入到外部存储系统。如图所示:
Spark与资源管理器无关,只要可以获取Executor进程,并能保持相互通讯就能够了。
Spark支持资源管理器包含: Standalone、On Mesos、On YARN、Or On EC2。如图所示:
提交SparkContext的Client应该靠近Worker节点(运行Executor的节点),最好是在同一个Rack(机架)里,由于Spark Application运行过程当中SparkContext和Executor之间有大量的信息交换;
若是想在远程集群中运行,最好使用RPC将SparkContext提交给集群,不要远离Worker运行SparkContext。
如图所示:
移动程序而非移动数据的原则执行,Task采用了数据本地性和推测执行的优化机制。
关键方法:taskIdToLocations、getPreferedLocations。
如图所示: