spark—3(Spark Scheduler)

Spark的核心是根据RDD来实现的,Spark Scheduler则为Spark核心实现的重要一环,其做用就是任务调度。Spark的任务调度就是如何组织任务去处理RDD中每一个分区的数据,根据RDD的依赖关系构建DAG,基于DAG划分Stage,将每一个Stage中的任务发到指定节点运行。基于Spark的任务调度原理,咱们能够合理规划资源利用,作到尽量用最少的资源高效地完成任务计算。html

分布式运行框架

Spark能够部署在多种资源管理平台,例如Yarn、Mesos等,Spark自己也实现了一个简易的资源管理机制,称之为Standalone模式。因为工做中接触较多的是Saprk on Yarn,不作特别说明,如下所述均表示Spark-on-Yarn。Spark部署在Yarn上有两种运行模式,分别为yarn-client和yarn-cluster模式,它们的区别仅仅在于Spark Driver是运行在Client端仍是ApplicationMater端。以下图所示为Spark部署在Yarn上,以yarn-cluster模式运行的分布式计算框架。node

其中蓝色部分是Spark里的概念,包括Client、ApplicationMaster、Driver和Executor,其中Client和ApplicationMaster主要是负责与Yarn进行交互;Driver做为Spark应用程序的总控,负责分发任务以及监控任务运行状态;Executor负责执行任务,并上报状态信息给Driver,从逻辑上来看Executor是进程,运行在其中的任务是线程,因此说Spark的任务是线程级别的。经过下面的时序图能够更清晰地理解一个Spark应用程序从提交到运行的完整流程。算法

提交一个Spark应用程序,首先经过Client向ResourceManager请求启动一个Application,同时检查是否有足够的资源知足Application的需求,若是资源条件知足,则准备ApplicationMaster的启动上下文,交给ResourceManager,并循环监控Application状态。apache

当提交的资源队列中有资源时,ResourceManager会在某个NodeManager上启动ApplicationMaster进程,ApplicationMaster会单独启动Driver后台线程,当Driver启动后,ApplicationMaster会经过本地的RPC链接Driver,并开始向ResourceManager申请Container资源运行Executor进程(一个Executor对应与一个Container),当ResourceManager返回Container资源,则在对应的Container上启动Executor。缓存

Driver线程主要是初始化SparkContext对象,准备运行所需的上下文,而后一方面保持与ApplicationMaster的RPC链接,经过ApplicationMaster申请资源,另外一方面根据用户业务逻辑开始调度任务,将任务下发到已有的空闲Executor上。并发

当ResourceManager向ApplicationMaster返回Container资源时,ApplicationMaster就尝试在对应的Container上启动Executor进程,Executor进程起来后,会向Driver注册,注册成功后保持与Driver的心跳,同时等待Driver分发任务,当分发的任务执行完毕后,将任务状态上报给Driver。框架

Driver把资源申请的逻辑给抽象出来,以适配不一样的资源管理系统,因此才间接地经过ApplicationMaster去和Yarn打交道。分布式

从上述时序图可知,Client只管提交Application并监控Application的状态。对于Spark的任务调度主要是集中在两个方面: 资源申请和任务分发,其主要是经过ApplicationMaster、Driver以及Executor之间来完成,下面详细剖析Spark任务调度每一个细节。ide

Spark任务调度总览

当Driver起来后,Driver则会根据用户程序逻辑准备任务,并根据Executor资源状况逐步分发任务。在详细阐述任务调度前,首先说明下Spark里的几个概念。一个Spark应用程序包括Job、Stage以及Task三个概念:oop

  • Job是以Action方法为界,遇到一个Action方法则触发一个Job;
  • Stage是Job的子集,以RDD宽依赖(即Shuffle)为界,遇到Shuffle作一次划分;
  • Task是Stage的子集,以并行度(分区数)来衡量,分区数是多少,则有多少个task。

Spark的任务调度整体来讲分两路进行,一路是Stage级的调度,一路是Task级的调度,整体调度流程以下图所示。

Spark RDD经过其Transactions操做,造成了RDD血缘关系图,即DAG,最后经过Action的调用,触发Job并调度执行。

DAGScheduler负责Stage级的调度,主要是将DAG切分红若干Stages,并将每一个Stage打包成TaskSet交给TaskScheduler调度。

TaskScheduler(trait)负责为建立它的SparkContext进行Task级的调度,将DAGScheduler给过来的TaskSet按照指定的调度策略分发到Executor上执行,并为执行特别慢的任务启动备份任务,当前TaskScheduler的惟一实现为org.apache.spark.scheduler.TaskSchedulerImpl,TaskSchedulerImpl会在如下几种场景下调用org.apache.spark.scheduler.SchedulerBackend#receiveOffers:

  1. 有新任务提交时。
  2. 有任务执行失败时。
  3. 计算节点(即Executor)不可用时。
  4. 某些任务执行过慢而须要为它从新分配资源时。

调度过程当中SchedulerBackend(trait)负责为Task分配可用计算资源(即Executor),并在分配的Executor上启动Task,完成计算的调度过程,使用receiveOffers完成上述调度过程。SchedulerBackend有多种实现,分别对接不一样的资源管理系统(YARN、Standalone、Mesos),但都是基于SchedulerBackend的一个实现org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend加入了自身特有的逻辑。每一个SchedulerBackend都会对应一个惟一的TaskScheduler,而它们都会被SparkContext建立和持有。

有了上述感性的认识后,下面这张图描述了Spark-On-Yarn模式下在任务调度期间,ApplicationMaster、Driver以及Executor内部模块的交互过程。

Driver初始化SparkContext过程当中,会分别初始化DAGScheduler、TaskScheduler、SchedulerBackend以及HeartbeatReceiver,并启动SchedulerBackend以及HeartbeatReceiver。SchedulerBackend经过ApplicationMaster申请资源,并不断从TaskScheduler中拿到合适的Task分发到Executor执行。HeartbeatReceiver负责接收Executor的心跳信息,监控Executor的存活情况,并通知到TaskScheduler。下面着重剖析DAGScheduler负责的Stage调度以及TaskScheduler负责的Task调度。

Stage级的调度

Spark的任务调度是从DAG切割开始,主要是由DAGScheduler来完成。当遇到一个Action操做后就会触发一个Job的计算,并交给DAGScheduler来提交,下图是涉及到Job提交的相关方法调用流程图。

Job由最终的RDD和Action方法封装而成,SparkContext将Job交给DAGScheduler提交,它会根据RDD的血缘关系构成的DAG进行切分,将一个Job划分为若干Stages,具体划分策略是,由最终的RDD不断经过依赖回溯判断父依赖是不是宽依赖,即以Shuffle为界,划分Stage,窄依赖的RDD之间被划分到同一个Stage中,能够进行pipeline式的计算,以下图紫色流程部分。划分的Stages分两类,一类叫作ResultStage,为DAG最下游的Stage,由Action方法决定,另外一类叫作ShuffleMapStage,为下游Stage准备数据,下面看一个简单的例子WordCount。

Job由saveAsTextFile触发,该Job由RDD-3和saveAsTextFile方法组成,根据RDD之间的依赖关系从RDD-3开始回溯搜索,直到没有依赖的RDD-0,在回溯搜索过程当中,RDD-3依赖RDD-2,而且是宽依赖,因此在RDD-2和RDD-3之间划分Stage,RDD-3被划到最后一个Stage,即ResultStage中,RDD-2依赖RDD-1,RDD-1依赖RDD-0,这些依赖都是窄依赖,因此将RDD-0、RDD-1和RDD-2划分到同一个Stage,即ShuffleMapStage中,实际执行的时候,数据记录会一鼓作气地执行RDD-0到RDD-2的转化。不难看出,其本质上是一个深度优先搜索算法。

一个Stage是否被提交,须要判断它的父Stage是否执行,只有在父Stage执行完毕才能提交当前Stage,若是一个Stage没有父Stage,那么从该Stage开始提交。Stage提交时会将Task信息(分区信息以及方法等)序列化并被打包成TaskSet交给TaskScheduler,一个Partition对应一个Task,另外一方面监控Stage的运行状态,只有Executor丢失或者Task因为Fetch失败才须要从新提交失败的Stage以调度运行失败的任务,其余类型的Task失败会在TaskScheduler的调度过程当中重试。

相对来讲DAGScheduler作的事情较为简单,仅仅是在Stage层面上划分DAG,提交Stage并监控相关状态信息。TaskScheduler则相对较为复杂,下面详细阐述其细节。

Task级的调度

Spark Task的调度是由TaskScheduler来完成,由前文可知,DAGScheduler将Stage打包到TaskSet交给TaskScheduler,TaskScheduler会将其封装为TaskSetManager加入到调度队列中,TaskSetManager负责监控管理同一个Stage中的Tasks,TaskScheduler就是以TaskSetManager为单元来调度任务。前面也提到,TaskScheduler初始化后会启动SchedulerBackend,它负责跟外界打交道,接收Executor的注册信息,并维护Executor的状态,因此说SchedulerBackend是管“粮食”的,同时它在启动后会按期地去“询问”TaskScheduler有没有任务要运行,也就是说,它会按期地“问”TaskScheduler“我有这么多余量,你要不要啊”,TaskScheduler在SchedulerBackend“问”它的时候,会从调度队列中按照指定的调度策略选择TaskSetManager去调度运行,大体方法调用流程以下图所示。

调度策略

前面讲到,TaskScheduler会先把DAGScheduler给过来的TaskSet封装成TaskSetManager扔到任务队列里,而后再从任务队列里按照必定的规则把它们取出来在SchedulerBackend给过来的Executor上运行。这个调度过程实际上仍是比较粗粒度的,是面向TaskSetManager的。

TaskScheduler是以树的方式来管理任务队列,树中的节点类型为Schedulable,叶子节点为TaskSetManager,非叶子节点为Pool,下图是它们之间的继承关系。

TaskScheduler支持两种调度策略,一种是FIFO,也是默认的调度策略,另外一种是FAIR。在TaskScheduler初始化过程当中会实例化rootPool,表示树的根节点,是Pool类型。若是是采用FIFO调度策略,则直接简单地将TaskSetManager按照先来先到的方式入队,出队时直接拿出最早进队的TaskSetManager,其树结构大体以下图所示,TaskSetManager保存在一个FIFO队列中。

在阐述FAIR调度策略前,先贴一段使用FAIR调度策略的应用程序代码,后面针对该代码逻辑来详细阐述FAIR调度的实现细节。

object MultiJobTest {
  // spark.scheduler.mode=FAIR
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().getOrCreate()

    val rdd = spark.sparkContext.textFile(...)
      .map(_.split("\\s+"))
      .map(x => (x(0), x(1)))

    val jobExecutor = Executors.newFixedThreadPool(2)

    jobExecutor.execute(new Runnable {
      override def run(): Unit = {
        spark.sparkContext.setLocalProperty("spark.scheduler.pool", "count-pool")
        val cnt = rdd.groupByKey().count()
        println(s"Count: $cnt")
      }
    })

    jobExecutor.execute(new Runnable {
      override def run(): Unit = {
        spark.sparkContext.setLocalProperty("spark.scheduler.pool", "take-pool")
        val data = rdd.sortByKey().take(10)
        println(s"Data Samples: ")
        data.foreach { x => println(x.mkString(", ")) }
      }
    })

    jobExecutor.shutdown()
    while (!jobExecutor.isTerminated) {}
    println("Done!")
  }
}

上述应用程序中使用两个线程分别调用了Action方法,即有两个Job会并发提交,可是无论怎样,这两个Job被切分红若干TaskSet后终究会被交到TaskScheduler这里统一管理,其调度树大体以下图所示。

在出队时,则会对全部TaskSetManager排序,具体排序过程是从根节点rootPool开始,递归地去排序子节点,最后合并到一个ArrayBuffer里,代码逻辑以下。

def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
    var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
    val sortedSchedulableQueue = schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
    for (schedulable <- sortedSchedulableQueue) {
      sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
    }
    sortedTaskSetQueue
}

使用FAIR调度策略时,上面代码中的taskSetSchedulingAlgorithm的类型为FairSchedulingAlgorithm,排序过程的比较是基于Fair-share来比较的,每一个要排序的对象包含三个属性: runningTasks值(正在运行的Task数)、minShare值、weight值,比较时会综合考量runningTasks值,minShare以及weight值。若是A对象的runningTasks大于它的minShare,B对象的runningTasks小于它的minShare,那么B排在A前面;若是A、B对象的runningTasks都小于它们的minShare,那么就比较runningTasksminShare的比值,谁小谁排前面;若是A、B对象的runningTasks都大于它们的minShare,那么就比较runningTasksweight的比值,谁小谁排前面。总体上来讲就是经过minShareweight这两个参数控制比较过程,能够作到不让资源被某些长时间Task给一直占了。

从调度队列中拿到TaskSetManager后,那么接下来的工做就是TaskSetManager按照必定的规则一个个取出Task给TaskScheduler,TaskScheduler再交给SchedulerBackend去发到Executor上执行。前面也提到,TaskSetManager封装了一个Stage的全部Task,并负责管理调度这些Task。

本地化调度

从调度队列中拿到TaskSetManager后,那么接下来的工做就是TaskSetManager按照必定的规则一个个取出Task给TaskScheduler,TaskScheduler再交给SchedulerBackend去发到Executor上执行。前面也提到,TaskSetManager封装了一个Stage的全部Task,并负责管理调度这些Task。

在TaskSetManager初始化过程当中,会对Tasks按照Locality级别进行分类,Task的Locality有五种,优先级由高到低顺序:PROCESS_LOCAL(指定的Executor),NODE_LOCAL(指定的主机节点),NO_PREF(无所谓),RACK_LOCAL(指定的机架),ANY(知足不了Task的Locality就随便调度)。这五种Locality级别存在包含关系,RACK_LOCAL包含NODE_LOCAL,NODE_LOCAL包含PROCESS_LOCAL,然而ANY包含其余全部四种。初始化阶段在对Task分类时,根据Task的preferredLocations判断它属于哪一个Locality级别,属于PROCESS_LOCAL的Task同时也会被加入到NODE_LOCAL、RACK_LOCAL类别中,好比,一个Task的preferredLocations指定了在Executor-2上执行,那么它属于Executor-2对应的PROCESS_LOCAL类别,同时也把他加入到Executor-2所在的主机对应的NODE_LOCAL类别,Executor-2所在的主机的机架对应的RACK_LOCAL类别中,以及ANY类别,这样在调度执行时,知足不了PROCESS_LOCAL,就逐步退化到NODE_LOCAL,RACK_LOCAL,ANY

TaskSetManager在决定调度哪些Task时,是经过上面流程图中的resourceOffer方法来实现,为了尽量地将Task调度到它的preferredLocations上,它采用一种延迟调度算法。resourceOffer方法原型以下,参数包括要调度任务的Executor Id、主机地址以及最大可容忍的Locality级别。

def resourceOffer(
      execId: String,
      host: String,
      maxLocality: TaskLocality.TaskLocality)
    : Option[TaskDescription]

延迟调度算法的大体流程以下图所示。


首先看是否存在execId对应的PROCESS_LOCAL类别的任务,若是存在,取出来调度,不然根据当前时间,判断是否超过了PROCESS_LOCAL类别最大容忍的延迟,若是超过,则退化到下一个级别NODE_LOCAL,不然等待不调度。退化到下一个级别NODE_LOCAL后调度流程也相似,看是否存在host对应的NODE_LOCAL类别的任务,若是存在,取出来调度,不然根据当前时间,判断是否超过了NODE_LOCAL类别最大容忍的延迟,若是超过,则退化到下一个级别RACK_LOCAL,不然等待不调度,以此类推…..。当不知足Locatity类别会选择等待,直到下一轮调度重复上述流程,若是你比较激进,能够调大每一个类别的最大容忍延迟时间,若是不知足Locatity时就会等待多个调度周期,直到知足或者超过延迟时间退化到下一个级别为止。

失败重试与黑名单机制

除了选择合适的Task调度运行外,还须要监控Task的执行状态,前面也提到,与外部打交道的是SchedulerBackend,Task被提交到Executor启动执行后,Executor会将执行状态上报给SchedulerBackend,SchedulerBackend则告诉TaskScheduler,TaskScheduler找到该Task对应的TaskSetManager,并通知到该TaskSetManager,这样TaskSetManager就知道Task的失败与成功状态,对于失败的Task,会记录它失败的次数,若是失败次数尚未超过最大重试次数,那么就把它放回待调度的Task池子中,不然整个Application失败。

在记录Task失败次数过程当中,会记录它上一次失败所在的Executor Id和Host,这样下次再调度这个Task时,会使用黑名单机制,避免它被调度到上一次失败的节点上,起到必定的容错做用。黑名单记录Task上一次失败所在的Executor Id和Host,以及其对应的“黑暗”时间,“黑暗”时间是指这段时间内不要再往这个节点上调度这个Task了。

推测式执行

TaskScheduler在启动SchedulerBackend后,还会启动一个后台线程专门负责推测任务的调度,推测任务是指对一个Task在不一样的Executor上启动多个实例,若是有Task实例运行成功,则会干掉其余Executor上运行的实例。推测调度线程会每隔固定时间检查是否有Task须要推测执行,若是有,则会调用SchedulerBackend的reviveOffers去尝试拿资源运行推测任务。

检查是否有Task须要推测执行的逻辑最后会交到TaskSetManager,TaskSetManager采用基于统计的算法,检查Task是否须要推测执行,算法流程大体以下图所示。

TaskSetManager首先会统计成功的Task数,当成功的Task数超过75%(可经过参数spark.speculation.quantile控制)时,再统计全部成功的Tasks的运行时间,获得一个中位数,用这个中位数乘以1.5(可经过参数spark.speculation.multiplier控制)获得运行时间门限,若是在运行的Tasks的运行时间超过这个门限,则对它启用推测。算法逻辑较为简单,其实就是对那些拖慢总体进度的Tasks启用推测,以加速整个TaskSet即Stage的运行。

资源申请机制

在前文已经提过,ApplicationMaster和SchedulerBackend起来后,SchedulerBackend经过ApplicationMaster申请资源,ApplicationMaster就是用来专门适配YARN申请Container资源的,当申请到Container,会在相应Container上启动Executor进程,其余事情就交给SchedulerBackend。Spark早期版本只支持静态资源申请,即一开始就指定用多少资源,在整个Spark应用程序运行过程当中资源都不能改变,后来支持动态Executor申请,用户不须要指定确切的Executor数量,Spark会动态调整Executor的数量以达到资源利用的最大化。

静态资源申请

静态资源申请是用户在提交Spark应用程序时,就要提早估计应用程序须要使用的资源,包括Executor数(num_executor)、每一个Executor上的core数(executor_cores)、每一个Executor的内存(executor_memory)以及Driver的内存(driver_memory)。

在估计资源使用时,应当首先了解这些资源是怎么用的。任务的并行度由分区数(Partitions)决定,一个Stage有多少分区,就会有多少Task。每一个Task默认占用一个Core,一个Executor上的全部core共享Executor上的内存,一次并行运行的Task数等于num_executor*executor_cores,若是分区数超过该值,则须要运行多个轮次,通常来讲建议运行3~5轮较为合适,不然考虑增长num_executor或executor_cores。因为一个Executor的全部tasks会共享内存executor_memory,因此建议executor_cores不宜过大。executor_memory的设置则须要综合每一个分区的数据量以及是否有缓存等逻辑。下图描绘了一个应用程序内部资源利用状况。

动态资源申请

动态资源申请目前只支持到Executor,便可以不用指定num_executor,经过参数spark.dynamicAllocation.enabled来控制。因为许多Spark应用程序一开始可能不须要那么多Executor或者其自己就不须要太多Executor,因此没必要一次性申请那么多Executor,根据具体的任务数动态调整Executor的数量,尽量作到资源的不浪费。因为动态Executor的调整会致使Executor动态的添加与删除,若是删除Executor,其上面的中间Shuffle结果可能会丢失,这就须要借助第三方的ShuffleService了,若是Spark是部署在Yarn上,则能够在Yarn上配置Spark的ShuffleService,具体操做仅需作两点:

  1. 首先在yarn-site.xml中加上以下配置:
  2. <property>
      <name>yarn.nodemanager.aux-services</name>
      <value>mapreduce_shuffle,spark_shuffle</value>
    </property>
    <property>
      <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
      <value>org.apache.spark.network.yarn.YarnShuffleService</value>
    </property>
    <property>
      <name>spark.shuffle.service.port</name>
      <value>7337</value>
    </property>
  3. 将Spark ShuffleService jar包$SPARK_HOME/lib/spark-*-yarn-shuffle.jar拷贝到每台NodeManager的$HADOOP_HOME/share/hadoop/yarn/lib/下,并重启全部的NodeManager。

当启用动态Executor申请时,在SparkContext初始化过程当中会实例化ExecutorAllocationManager,它是被用来专门控制动态Executor申请逻辑的,动态Executor申请是一种基于当前Task负载压力实现动态增删Executor的机制。一开始会按照参数spark.dynamicAllocation.initialExecutors设置的初始Executor数申请,而后根据当前积压的Task数量,逐步增加申请的Executor数,若是当前有积压的Task,那么取积压的Task数和spark.dynamicAllocation.maxExecutors中的最小值做为Executor数上限,每次新增长申请的Executor为2的次方,即第一次增长1,第二次增长2,第三次增长4,…。另外一方面,若是一个Executor在一段时间内都没有Task运行,则将其回收,可是在Remove Executor时,要保证最少的Executor数,该值经过参数spark.dynamicAllocation.minExecutors来控制,若是Executor上有Cache的数据,则永远不会被Remove,以保证中间数据不丢失。

但愿后续能够改下。。。

参考:Spark Scheduler内部原理剖析   spark DAGScheduler、TaskSchedule、Executor执行task源码分析   Spark核心做业调度和任务调度之DAGScheduler源码   Spark Scheduler模块详解-DAGScheduler实现   

相关文章
相关标签/搜索