Spark运行原理

在大数据领域,只有深挖数据科学领域,走在学术前沿,才能在底层算法和模型方面走在前面,从而占据领先地位。算法

Spark的这种学术基因,使得它从一开始就在大数据领域创建了必定优点。不管是性能,仍是方案的统一性,对比传统的Hadoop,优点都很是明显。Spark提供的基于RDD的一体化解决方案,将MapReduce、Streaming、SQL、Machine Learning、Graph Processing等模型统一到一个平台下,并以一致的API公开,并提供相同的部署方案,使得Spark的工程应用领域变得更加普遍。本文主要分如下章节:api

1、Spark专业术语定义多线程

2、Spark运行基本流程架构

3、Spark运行架构特色分布式

4、Spark核心原理透视函数

1、Spark专业术语定义oop

一、Application:Spark应用程序性能

指的是用户编写的Spark应用程序,包含了Driver功能代码和分布在集群中多个节点上运行的Executor代码。大数据

Spark应用程序,由一个或多个做业JOB组成,以下图所示。优化

Spark应用程序组成

二、Driver:驱动程序

Spark中的Driver即运行上述Application的Main()函数而且建立SparkContext,其中建立SparkContext的目的是为了准备Spark应用程序的运行环境。在Spark中由SparkContext负责和ClusterManager通讯,进行资源的申请、任务的分配和监控等;当Executor部分运行完毕后,Driver负责将SparkContext关闭。一般SparkContext表明Driver,以下图所示。

Driver驱动程序组成

三、Cluster Manager:资源管理器

指的是在集群上获取资源的外部服务,经常使用的有:Standalone,Spark原生的资源管理器,由Master负责资源的分配;Haddop Yarn,由Yarn中的ResearchManager负责资源的分配;Messos,由Messos中的Messos Master负责资源管理,以下图所示。

Cluster Manager含义

四、Executor:执行器

Application运行在Worker节点上的一个进程,该进程负责运行Task,而且负责将数据存在内存或者磁盘上,每一个Application都有各自独立的一批Executor,以下图所示。

Executor运行原理

五、Worker:计算节点

集群中任何能够运行Application代码的节点,相似于Yarn中的NodeManager节点。在Standalone模式中指的就是经过Slave文件配置的Worker节点,在Spark on Yarn模式中指的就是NodeManager节点,在Spark on Messos模式中指的就是Messos Slave节点,以下图所示。

Worker运行原理

六、RDD:弹性分布式数据集

Resillient Distributed Dataset,Spark的基本计算单元,能够经过一系列算子进行操做(主要有Transformation和Action操做),以下图所示。

RDD组成图解

七、窄依赖

父RDD每个分区最多被一个子RDD的分区所用;表现为一个父RDD的分区对应于一个子RDD的分区,或两个父RDD的分区对应于一个子RDD 的分区。如图所示。

窄依赖图解

八、宽依赖

父RDD的每一个分区均可能被多个子RDD分区所使用,子RDD分区一般对应全部的父RDD分区。如图所示。

宽依赖图解

常见的窄依赖有:map、filter、union、mapPartitions、mapValues、join(父RDD是hash-partitioned :若是JoinAPI以前被调用的RDD API是宽依赖(存在shuffle), 并且两个join的RDD的分区数量一致,join结果的rdd分区数量也同样,这个时候join api是窄依赖)。

常见的宽依赖有groupByKey、partitionBy、reduceByKey、join(父RDD不是hash-partitioned :除此以外的,rdd 的join api是宽依赖)。

九、DAG:有向无环图

Directed Acycle graph,反应RDD之间的依赖关系,如图所示。

DAG图解

十、DAGScheduler:有向无环图调度器

基于DAG划分Stage 并以TaskSet的形势提交Stage给TaskScheduler;负责将做业拆分红不一样阶段的具备依赖关系的多批任务;最重要的任务之一就是:计算做业和任务的依赖关系,制定调度逻辑。在SparkContext初始化的过程当中被实例化,一个SparkContext对应建立一个DAGScheduler。

DAGScheduler图解

十一、TaskScheduler:任务调度器

将Taskset提交给worker(集群)运行并回报结果;负责每一个具体任务的实际物理调度。如图所示。

TaskScheduler图解

十二、Job:做业

由一个或多个调度阶段所组成的一次计算做业;包含多个Task组成的并行计算,每每由Spark Action催生,一个JOB包含多个RDD及做用于相应RDD上的各类Operation。如图所示。

Job图解

1三、Stage:调度阶段

一个任务集对应的调度阶段;每一个Job会被拆分不少组Task,每组任务被称为Stage,也可称TaskSet,一个做业分为多个阶段;Stage分红两种类型ShuffleMapStage、ResultStage。如图所示。

Stage图解

1四、TaskSet:任务集

由一组关联的,但相互之间没有Shuffle依赖关系的任务所组成的任务集。如图所示。

Stage图解

提示:

1)一个Stage建立一个TaskSet;

2)为Stage的每一个Rdd分区建立一个Task,多个Task封装成TaskSet

1五、Task:任务

被送到某个Executor上的工做任务;单个分区数据集上的最小处理流程单元。如图所示。

Task图解

整体如图所示:

汇总图解

2、Spark运行基本流程

Spark运行基本流程

Spark运行基本流程

3、Spark运行架构特色

一、Executor进程专属

每一个Application获取专属的executor进程,该进程在Application期间一直驻留,并以多线程方式运行tasks。Spark Application不能跨应用程序共享数据,除非将数据写入到外部存储系统。如图所示。

Executor进程图解

二、支持多种资源管理器

Spark与资源管理器无关,只要可以获取executor进程,并能保持相互通讯就能够了,Spark支持资源管理器包含: Standalone、On Mesos、On YARN、Or On EC2。如图所示。

资源管理器

三、Job提交就近原则

提交SparkContext的Client应该靠近Worker节点(运行Executor的节点),最好是在同一个Rack(机架)里,由于Spark Application运行过程当中SparkContext和Executor之间有大量的信息交换;若是想在远程集群中运行,最好使用RPC将SparkContext提交给集群,不要远离Worker运行SparkContext。如图所示。

Job提交就近原则

四、移动程序而非移动数据的原则执行

Task采用了数据本地性和推测执行的优化机制。关键方法:taskIdToLocations、getPreferedLocations。如图所示。

执行原则

4、Spark核心原理透视

一、计算流程

Spark计算流程

二、从代码构建DAG图

Spark program

Val lines1 = sc.textFile(inputPath1). map(···)). map(···)

Val lines2 = sc.textFile(inputPath2) . map(···)

Val lines3 = sc.textFile(inputPath3)

Val dtinone1 = lines2.union(lines3)

Val dtinone = lines1.join(dtinone1)

dtinone.saveAsTextFile(···)

dtinone.filter(···).foreach(···)

Spark的计算发生在RDD的Action操做,而对Action以前的全部Transformation,Spark只是记录下RDD生成的轨迹,而不会触发真正的计算。

Spark内核会在须要计算发生的时刻绘制一张关于计算路径的有向无环图,也就是DAG。

构建DAG图

三、将DAG划分为Stage核心算法

Application多个job多个Stage:Spark Application中能够由于不一样的Action触发众多的job,一个Application中能够有不少的job,每一个job是由一个或者多个Stage构成的,后面的Stage依赖于前面的Stage,也就是说只有前面依赖的Stage计算完毕后,后面的Stage才会运行。

划分依据:Stage划分的依据就是宽依赖,什么时候产生宽依赖,reduceByKey, groupByKey等算子,会致使宽依赖的产生。

核心算法:从后往前回溯,遇到窄依赖加入本stage,碰见宽依赖进行Stage切分。Spark内核会从触发Action操做的那个RDD开始从后往前推,首先会为最后一个RDD建立一个stage,而后继续倒推,若是发现对某个RDD是宽依赖,那么就会将宽依赖的那个RDD建立一个新的stage,那个RDD就是新的stage的最后一个RDD。而后依次类推,继续继续倒推,根据窄依赖或者宽依赖进行stage的划分,直到全部的RDD所有遍历完成为止。

四、将DAG划分为Stage剖析

从HDFS中读入数据生成3个不一样的RDD,经过一系列transformation操做后再将计算结果保存回HDFS。能够看到这个DAG中只有join操做是一个宽依赖,Spark内核会以此为边界将其先后划分红不一样的Stage. 同时咱们能够注意到,在图中Stage2中,从map到union都是窄依赖,这两步操做能够造成一个流水线操做,经过map操做生成的partition能够不用等待整个RDD计算结束,而是继续进行union操做,这样大大提升了计算的效率。

DAG划分为Stage剖析

五、相关代码

SparkContext.runJob(…)

DAGScheduler.runJob(...)

DAGScheduler.submitJob(...)

DAGScheduler.handleJobSubmitted(…)

DAGScheduler.submitWaitingStages(…)

DAGScheduler.submitStage(…)

六、提交Stages

调度阶段的提交,最终会被转换成一个任务集的提交,DAGScheduler经过TaskScheduler接口提交任务集,这个任务集最终会触发TaskScheduler构建一个TaskSetManager的实例来管理这个任务集的生命周期,对于DAGScheduler来讲,提交调度阶段的工做到此就完成了。而TaskScheduler的具体实现则会在获得计算资源的时候,进一步经过TaskSetManager调度具体的任务到对应的Executor节点上进行运算。

提交Stages

七、相关代码

submitMissingTasks. submitMissingTasks

submitMissingTasks.taskIdToLocations

DAGScheduler.getPreferredLocs(…)

TaskScheduler.submitTasks(…)

TaskScheduler.createTaskSetManager(…)

TaskSetManager负责管理TaskSchedulerImpl中一个单独TaskSet,跟踪每个task,若是task失败,负责重试task直到达到task重试次数的最屡次数。

调度池构建器,SchedulableBuilder决定调度顺序

八、监控Job、Task、Executor

DAGScheduler监控Job与Task:要保证相互依赖的做业调度阶段可以获得顺利的调度执行,DAGScheduler须要监控当前做业调度阶段乃至任务的完成状况。这经过对外暴露一系列的回调函数来实现的,对于TaskScheduler来讲,这些回调函数主要包括任务的开始结束失败、任务集的失败,DAGScheduler根据这些任务的生命周期信息进一步维护做业和调度阶段的状态信息。

DAGScheduler监控Executor的生命状态:TaskScheduler经过回调函数通知DAGScheduler具体的Executor的生命状态,若是某一个Executor崩溃了,则对应的调度阶段任务集的ShuffleMapTask的输出结果也将标志为不可用,这将致使对应任务集状态的变动,进而从新执行相关计算任务,以获取丢失的相关数据。

九、获取任务执行结果

结果DAGScheduler:一个具体的任务在Executor中执行完毕后,其结果须要以某种形式返回给DAGScheduler,根据任务类型的不一样,任务结果的返回方式也不一样。

两种结果,中间结果与最终结果:对于FinalStage所对应的任务,返回给DAGScheduler的是运算结果自己,而对于中间调度阶段对应的任务ShuffleMapTask,返回给DAGScheduler的是一个MapStatus里的相关存储信息,而非结果自己,这些存储位置信息将做为下一个调度阶段的任务获取输入数据的依据。

两种类型,DirectTaskResult与IndirectTaskResult:根据任务结果大小的不一样,ResultTask返回的结果又分为两类,若是结果足够小,则直接放在DirectTaskResult对象内中,若是超过特定尺寸则在Executor端会将DirectTaskResult先序列化,再把序列化的结果做为一个数据块存放在BlockManager中,而后将BlockManager返回的BlockID放在IndirectTaskResult对象中返回给TaskScheduler,TaskScheduler进而调用TaskResultGetter将IndirectTaskResult中的BlockID取出并经过BlockManager最终取得对应的DirectTaskResult。

十、任务调度整体诠释

任务调度整体诠释

相关文章
相关标签/搜索