DAGScheduler概述:是一个面向Stage层面的调度器;数组
主要入参有:缓存
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,resultHandler, localProperties.get)app
rdd: final RDD;函数
cleanedFunc: 计算每一个分区的函数;spa
resultHander: 结果侦听器;code
主要功能以下:blog
一、接收用户提交的job;递归
二、将job根据类型划分为不一样的stage,记录哪些RDD、Stage被物化,并在每个stage内产生一系列的task,并封装成TaskSet;ci
三、决定每一个Task的最佳位置(任务在数据所在的节点上运行),并结合当前的缓存状况;将TaskSet提交给TaskScheduler;rem
四、从新提交Shuffle输出丢失的Stage给TaskScheduler;
注:一个Stage内部的错误不是由shuffle输出丢失形成的,DAGScheduler是无论的,由TaskScheduler负责尝试从新提交task执行;
以以下示例描述Job提交过程:
val sc = new SparkContext("local[2]", "WordCount", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR"))) val textFile = sc.textFile("xxx") val result = textFile.flatMap(line => line.split("\t")).map(word => (word, 1)).reduceByKey(_ + _) result.collect
RDD.collect
==>sc.runJob #####至此完成了将RDD提交DAGScheduler#####
val results = new Array[U](partitions.size) //result存放的是返回值,数组大小为最后一个RDD的partition的个数
==>dagScheduler.runJob(rdd, func, partitions, resultHandler......) //DAGScheduler的输入:RDD and partitions to compute
==>dagScheduler.submitJob
==>eventProcessActor ! JobSubmitted
def receive = { case JobSubmitted(jobId, rdd, func, partitions, allowLocal...) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal...) } //完成job到stage的转换,生成finalStage并提交 private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], allowLocal: Boolean...){ //注意:该RDD是final RDD,而不是一系列的RDD,用finalRDD来建立finalStage //newStage操做对应会生成新的result stage或者shuffle stage:内部有一个isShuffleMap变量来标识该stage是shuffle or result var finalStage: Stage = newStage(rdd, partitions.size, None, jobId, Some(callSite)) //使用finalStage来构建job val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) //对于简单的job,没有依赖关系而且只有一个partition,该类job会使用local thread处理而并不是提交到TaskScheduler上处理 if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) { runLocally(job) } else { submitStage(finalStage) //提交finalStage } }
handleJobSubmitted方法完成了job到stage的转换,生成finalStage;每一个job都有一个finalStage。
newStage()方法分析:根据finalRDD生成finalStage
private def newStage( rdd: RDD[_], numTasks: Int, //task个数就是partitions个数 shuffleDep: Option[ShuffleDependency[_,_]], jobId: Int, callSite: Option[String] = None) : Stage = { val id = nextStageId.getAndIncrement() val stage = new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite) ...... } private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = { val parents = new HashSet[Stage] val visited = new HashSet[RDD[_]] def visit(r: RDD[_]) { if (!visited(r)) { visited += r for (dep <- r.dependencies) { dep match { case shufDep: ShuffleDependency[_,_] => parents += getShuffleMapStage(shufDep, jobId) case _ => visit(dep.rdd) } } } } visit(rdd) parents.toList } private def getShuffleMapStage(shuffleDep: ShuffleDependency[_,_], jobId: Int): Stage = { shuffleToMapStage.get(shuffleDep.shuffleId) match { case Some(stage) => stage case None => val stage = newOrUsedStage(shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId) shuffleToMapStage(shuffleDep.shuffleId) = stage stage }
newStage()后产生的finalStage中已经包含了该stage的全部依赖的父Stage;
经过getParentStages()方法构建该stage的依赖关系;反向visit RDD DAG图,遇到窄依赖就将依赖的RDD加入到stage,遇到宽依赖就切开并递归宽依赖的stage;
生成stage实例,stage的id经过nextStageId的值加一获得,task的个数就是partitions的个数;
有两种类型的Stage:ShuffleStage和ResultStage;
Stage内部有一个isShuffleMap变量标识该Stage是shuffle仍是result类型;
Spark对stage的划分是按照宽依赖来进行区分的:根据RDD的依赖关系,若是遇到宽依赖则建立ShuffleStage;
submitStage()方法分析:计算stage之间的依赖关系(Stage DAG)并对依赖关系进行处理
private def submitStage(stage: Stage) { if (!waiting(stage) && !running(stage) && !failed(stage)) { val missing = getMissingParentStages(stage).sortBy(_.id) //根据final stage发现是否有parent stage if (missing == Nil) { // 若是计算中发现当前的stage没有任何依赖或者全部的依赖都已经准备完毕,则提交task submitMissingTasks(stage, jobId.get) running += stage //设置当前的stage为running,由于当前的stage没有未处理完的依赖的stage } else { //若是有parent stage,须要先submit parent, 由于stage之间须要顺序执行 for (parent <- missing) { submitStage(parent) } waiting += stage //当前stage放入到waiting列表中,表示该stage须要等待parent先执行完成 } } } //根据final stage的parents找出全部的parent stage private def getMissingParentStages(stage: Stage): List[Stage] = { ...... dep match { //若是是ShuffleDependency,则新建一个shuffle map stage,且该stage是可用的话则加入missing中 case shufDep: ShuffleDependency[_,_] => //ShuffleDependecy val mapStage = getShuffleMapStage(shufDep, stage.jobId) if (!mapStage.isAvailable) { missing += mapStage } case narrowDep: NarrowDependency[_] => //NarrowDependecy visit(narrowDep.rdd) } }
getMissParentStages(stage)处理步骤:
一、根据该stage获得该stage的parent,也就是RDD的依赖关系,生成parentStage是经过RDD的dependencies;
二、若是依赖关系是宽依赖,则生成一个mapStage来做为finalStage的parent;也就是说对于须要shuffle操做的job,会生成mapStage和finalStage进行处理
三、若是依赖关系是窄依赖,不会生成新的stage。也就是说对于不须要shuffle的job只须要一个finalStage;
注意:getMissParentStages(stage)获得的结果集是按照stageid的降序排列的
submitStage()处理步骤:
一、计算该stage的getMissParentStages(),若是当前stage没有任何依赖或者全部的依赖都已执行完,则提交该stage;
二、若是发现该stage有依赖的stage未执行,则先执行完全部依赖的父stage(根据getMissParentStages()方法获得的结果集降序来执行stage);
submitMissingTasks()方法分析:把stage根据parition拆分红task(决定每一个Task的最佳位置)生成TaskSet,并提交到TaskScheduler
private def submitMissingTasks(stage: Stage, jobId: Int) { //首先根据stage所依赖的RDD的partition的分布,会产生出与partition数量相等的task var tasks = ArrayBuffer[Task[_]]() //对于finalStage或是mapStage会产生不一样的task。 //检查该stage时是否ShuffleMap,若是是则生成ShuffleMapTask if (stage.isShuffleMap) { //mapStage:表示还有其余stage依赖此stage for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) { //task根据partition的locality进行分布 val locs = getPreferredLocs(stage.rdd, p) tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs) } } else { //finalStage:该类型stage直接输出结果生成ResultTask val job = resultStageToJob(stage) for (id <- 0 until job.numPartitions if !job.finished(id)) { val partition = job.partitions(id) val locs = getPreferredLocs(stage.rdd, partition) //因为是ResultTask,所以须要传入定义的func,也就是若是处理结果返回 tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id) } } //向TaskSchuduler提交任务,以stage为单位,一个stage对应一个TaskSet taskSched.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) }
submitMissingTask()方法的处理步骤:
一、经过stage.isShuffleMap来决定生成的是ShuffleMapTask仍是ResultTask;
二、若是是ShuffleMapTask则根据stage所依赖的RDD的partition分布,产生和partition数量相同的task,这些task根据partition的locality进行分布’
三、把stage对应生成全部的task封装到一个TaskSet中,提交给TaskScheduler的submitTasks()方法进行调度;
从新提交shuffle输出丢失的stage
case ResubmitFailedStages => dagScheduler.resubmitFailedStages() private[scheduler] def resubmitFailedStages() { if (failedStages.size > 0) { logInfo("Resubmitting failed stages") clearCacheLocs() val failedStagesCopy = failedStages.toArray failedStages.clear() for (stage <- failedStagesCopy.sortBy(_.jobId)) { submitStage(stage) } } submitWaitingStages() }
####至此完成了DAGScheduler提交TaskSet到TaskSchuduler#####
Job的生成:
一旦driver程序中出现action,就会生成一个job,好比:count等,向DAGScheduler提交job;若是driver程序后面还有action,那么其余action也会对应生成相应的job;
因此:driver有多少个action就会生成多少个job。为何spark将driver程序称为application而不是job的缘由,估计就是这吧。
每个job可能会包含多个stage,最后一个stage产生result。在提交job过程当中,DAGScheduler会首先划分stage,而后先提交无parent stage的stages,并在提交过程当中计算该stage的task数目和类型,并提交具体的task;无parent stage的stage提交完后,依赖该stage的stage才能提交。