调度系统,是贯穿整个Spark应用的主心骨,从调度系统开始入手了解Spark Core,比较容易理清头绪。html
Spark的资源调度采用的是常见的两层调度,底层资源的管理和分配是第一层调度,交给YARN、Mesos或者Spark的Standalone集群处理,Application从第一层调度拿到资源后,还要进行内部的任务和资源调度,将任务和资源进行匹配,这是第二层调度,本文讲的就是这第二层调度。算法
Spark的调度体系涉及的任务包括3个粒度,分别是Job、Stage、Task。
Job表明用户提交的一系列操做的整体,一个具体的计算任务,有明确的输入输出,一个Job由多个Stage组成;
一个Stage表明Job计算流程的一个组成部分,一个阶段,包含多个Task;
一个Task表明对一个分区的数据进行计算的具体任务。shell
层级关系:Job > Stage > Taskapache
在Spark Core 解析:RDD 弹性分布式数据集中,已经解释了RDD之间的依赖,以及如何组成RDD血缘图。segmentfault
因此本文主要目的就是解释清楚:Scheduler将RDD血缘图转变成Stage DAG,而后生成Task,最后提交给Executor去执行的过程。后端
Job的不一样分区的计算一般能够并行,可是有些计算须要将数据进行从新分区,这个过程称做shuffle(混洗)。Shuffle的过程是无法彻底并行的,这时候就会出现task之间的等待,task的数量也可能发生变化,因此Spark中以shuffle为边界,对task进行划分,划分出来的每段称为Stage。app
Stage表明一组能够并行的执行相同计算的task,每一个任务必须有相同的分区规则,这样一个stage中是没有shuffle的。异步
在一个Spark App中,stage有一个全局惟一ID,stage id是自增的。async
Stage分为两种:分布式
stage建立流程:
一旦发现某个RDD的dependency是ShuffleDependency,就建立一个ShuffleMapStage。
val rg=sc.parallelize(List((1,10),(2,20))) rg.reduceByKey(_+_).collect
这里reduceByKey操做引发了一次shuffle,因此job被切分红了2个stage。
val rddA=sc.parallelize(List((1,"a"),(2,"b"),(3,"c"))) val rddB=sc.parallelize(List((1,"A"),(2,"B"),(3,"C"))) rddA.join(rddB).collect
join操做致使rddA和rddB都进行了一次shuffle,因此有3个stage。
import org.apache.spark.HashPartitioner val rddA=sc.parallelize(List((1,"a"),(2,"b"),(3,"c"))).partitionBy(new HashPartitioner(3)) val rddB=sc.parallelize(List((1,"A"),(2,"B"),(3,"C"))) rddA.join(rddB).collect
WHAT ?
由于rddA已经定义了Partitioner,这里join操做会保留rddA的分区方式,因此对rddA的依赖是OneToOneDepenency,而对于rddB则是ShuffleDependency。
val rddA=sc.parallelize(List((1,"a"),(2,"b"),(3,"c"))) rddA join rddA collect
一个RDD被两个stage使用了。
综上,stage的划分必定是依据shuffle即ShuffleDependency,跟算子和RDD变量的定义没有很强的关系,example2和3中的join操做rddA.join(rddB).collect
看起来如出一辙,但实际产生的stage划分却差异很大。
与stage对应,task也分为两种:
一个stage有多少个partition就会建立多少个task,好比一个ShuffleMapStage有10个partition,那么就会建立10个ShuffleMapTask。
一个Stage中的全部task组成一个TaskSet。
graph TB R(RDD.action)-->S(SparkContext.runJob)-- RDD -->D(DAGScheduler.runJob) -- TaskSet -->T(TaskScheduler.submitTasks)-- TaskDescription -->E(Executor.launchTask)
RDD在action操做中经过SparkContext.runJob方法触发Job执行流程,该方法将调用DagScheduler.runJob方法,将RDD传入DagScheduler。而后,DAGScheduler建立TaskSet提交给TaskScheduler,TaskScheduler再将TaskSet封装成TaskDescription发送给Executor,最后Executor会将TaskDescription提交给线程池来运行。
Stage级别的调度是DagScheduler负责的,也是Spark调度体系的核心。
sequenceDiagram participant M as main thread participant L as eventProcessLoop participant E as event thread M-->>L: post event E-->>L: handle event
DagScheduler内部维护了一个事件消息总线eventProcessLoop(类型为DAGSchedulerEventProcessLoop),其实就是一个用来存储DAGSchedulerEvent类型数据的队列。
当DagScheduler的一些方法被调用的时候(如submitJob方法),并不会在主线程中处理该任务,而是post一个event(如JobSubmitted)到eventProcessLoop。eventProcessLoop中有一个守护线程,会不断的依次从队列中取出event,而后调用对应的handle(如handleJobSubmitted)方法来执行具体的任务。
DagScheduler.runJob方法会调用submitJob方法,向eventProcessLoop发送一个JobSubmitted类型的消息,其中包含了RDD等信息。当eventProcessLoop接收到JobSubmitted类型的消息,会调用DagScheduler.handleJobSubmitted方法来处理消息。
sequenceDiagram participant M as main thread(runJob) participant L as eventProcessLoop participant E as event thread(handleJobSubmitted) M-->>L: post JobSubmitted event E-->>L: handle JobSubmitted event
2.create stage
建立一个ShuffleMapStage的过程同理会须要建立它的parent stage,也是若干ShuffleMapStage。如此递归下去,直到建立完全部的ShuffleMapStage,最后才完成ResultStage的建立。最后建立出来的这些Stage(若干ShuffleMapStage加一个ResultStage),经过parent属性串起来,就像这样
graph TD A[ResultStage]-- parent -->B[ShuffleMapStage 1] A-- parent -->C[ShuffleMapStage 2] B-- parent -->D[ShuffleMapStage 3]
这就生成了所谓的DAG图,可是这个图的指向跟执行顺序是反过来的,若是按执行顺序来画DAG图,就是常见的形式了:
graph TD D[ShuffleMapStage 3]-->C[ShuffleMapStage 2] C[ShuffleMapStage 2]-->A[ResultStage] B[ShuffleMapStage 1]-->A[ResultStage]
DagScheduler.handleJobSubmitted方法建立好ResultStage后会提交这个stage(submitStage方法),在提交一个stage的时候,会要先提交它的parent stage,也是经过递归的形式,直到一个stage的全部parent stage都被提交了,它本身才能被提交,若是一个stage的parent尚未完成,则会把这个stage加入waitingStages。也就是说,DAG图中前面的stage会被先提交。当一个stage的parent都准备好了,也就是执行完了,它才会进入submitMissingTasks的环节。
Task是在DagScheduler(不是TaskScheduler)的submitMissingTasks方法中建立的,包括ShuffleMapTask和ResultTask,与Stage对应。归属于同一个stage的这批Task组成一个TaskSet集合,最后提交给TaskScheduler的就是这个TaskSet集合。
Task的调度工做是由TaskScheduler与SchedulerBackend紧密合做,共同完成的。
TaskScheduler是task级别的调度器,主要做用是管理task的调度和提交,是Spark底层的调度器。
SchedulerBackend是TaskScheduler的后端服务,有独立的线程,全部的Executor都会注册到SchedulerBackend,主要做用是进行资源分配、将task分配给executor等。
第一个线程是DAGScheduler的事件处理线程,在其中,Task先通过DAGScheduler(蓝色箭头表示)封装成TaskSet,再由TaskScheduler(绿色箭头)封装成TaskSetManager,并加入调度队列中。
SchedulerBackend在收到ReviveOffers消息时,会从线程池取一个线程进行makeOffers操做,WorkerOffer建立后传递给TaskScheduler进行分配。
图中第二个线程就是SchedulerBackend的一个事件分发线程,从Pool中取出最优先的TaskSetManager,而后将WorkerOffer与其中的Task进行配对,生成TaskDescription,发送给WorkerOffer指定的Executor去执行。
Stage,TaskSet,TaskSetManager是一一对应的,数量相等,都是只存在driver上的。
Parition,Task,TaskDescription是一一对应,数量相同,Task和TaskDescription是会被发到executor上的。
与DAGScheduler不一样的是TaskScheduler有调度池,有两种调度实体,Pool和TaskSetManager。
与YARN的调度队列相似,采用了层级队列的方式,Pool是TaskSetManager的容器,起到将TaskSetManager分组的做用。
Schedulable是调度实体的基类,有两个子类Pool和TaskSetManager。
要理解调度规则,必须知道下面几个属性:
Pool和TaskSetManager对于这些属性的取值有所不一样,从而致使了他们的调度行为也不同。
properties | Pool | TaskSetManager |
---|---|---|
weight | config | 1 |
minShare | config | 0 |
priority | 0 | jobId |
stageId | -1 | stageId |
name | config | TaskSet_{taskSet.id} |
runningTasks | Pool所含TaskSetManager的runningTasks和 | TaskSetManager运行中task数 |
TaskScheduler有个属性schedulingMode,值取决于配置项spark.scheduler.mode
,默认为FIFO。这个属性会致使TaskScheduler使用不一样的SchedulableBuilder,即FIFOSchedulableBuilder和FairSchedulableBuilder。
TaskScheduler在初始化的时候,就会建立root pool,根调度池,是全部pool的祖先。
它的属性取值为:
name: "" (空字符串) schedulingMode: 同TaskScheduler的schedulingMode属性 weight: 0 minShare: 0
注意root pool的调度模式肯定了。
接下来会执行schedulableBuilder.buildPools()
方法,
如果FairSchedulableBuilder
这时default pool它的属性取值是固定的:
name: "default" schedulingMode: FIFO weight: 1 minShare: 0
当TaskScheduler提交task的时候,会先建立TaskSetManager,而后经过schedulableBuilder添加到pool中。
如果FairSchedulableBuilder
spark.scheduler.pool
配置获取pool name,没有定义则用'default';通过上面两部分,最终获得的调度池结构以下:
spark.scheduler.mode=FIFO
spark.scheduler.mode=FAIR
Fair Scheduler Pool的划分依赖于配置文件,默认的配置文件为'fairscheduler.xml',也能够经过配置项"spark.scheduler.allocation.file"指定配置文件。
煮个栗子,文件内容以下:
<?xml version="1.0"?> <allocations> <pool name="prod"> <schedulingMode>FAIR</schedulingMode> <weight>1</weight> <minShare>2</minShare> </pool> <pool name="test"> <schedulingMode>FIFO</schedulingMode> <weight>2</weight> <minShare>3</minShare> </pool> </allocations>
这里配置了两个pool,prod和test,而且配置了相关属性,这两个pool都会添加到root pool中。
以SchedulingAlgorithm为基类,内置实现的调度算法有两种FIFOSchedulingAlgorithm和FairSchedulingAlgorithm,其逻辑以下:
FIFO: 先进先出,优先级比较算法以下,
FAIR:公平调度,优先级比较算法以下,
TaskSetManager之间的比较,其实就是先比较jobId再比较stageId,谁小谁优先,意味着就是谁先提交谁优先。
Pool之间的比较,不存在!FIFO的pool队列中是不会有pool的。
TaskSetManager之间的比较,由于minShare=0,weight=1,FAIR算法变成了:
Pool之间的比较,就是标准的FAIR算法。
当root pool为FAIR模式,先取最优先的pool,再从pool中,按pool的调度模式取优先的TaskSetManager。
启用FAIR模式:
fairscheduler.xml
文件--conf spark.scheduler.mode=FAIR
spark-shell --master yarn --deploy-mode client --conf spark.scheode=FAIR
启动后若是直接运行Job会自动提交到default pool,那么如何提交Job到指定pool?
SparkContext.setLocalProperty("spark.scheduler.pool","poolName")
若是每次只运行一个Job,开启FAIR模式的意义不大,那么如何同时运行多个Job?
要异步提交Job,须要用到RDD的async action,目前有以下几个:
countAsync collectAsync takeAsync foreachAsync foreachPartitionAsync
举个例子:
sc.setLocalProperty("spark.scheduler.pool","test") b.foreachAsync(_=>Thread.sleep(100)) sc.setLocalProperty("spark.scheduler.pool","production") b.foreachAsync(_=>Thread.sleep(100))
这样就会有两个任务在不一样的pool同时运行:
场景1:Spark SQL thrift server
做用:让离线任务和交互式查询任务分配到不一样的pool,给交互式查询任务更高的优先级,这样长时间运行的离线任务就不会一直占用全部资源,阻塞交互式查询任务。
场景2:Streaming job与Batch job同时运行
做用:好比用Streaming接数据写入HDFS,可能产生不少小文件,能够在低优先级的pool定时运行batch job合并小文件。
另外能够参考Spark Summit 2017的分享:Continuous Application with FAIR Scheduler
转载请注明原文地址:
https://liam-blog.ml/2019/11/07/spark-core-scheduler/