上篇 spark 源码分析之十九 -- DAG的生成和Stage的划分 中,主要介绍了下图中的前两个阶段DAG的构建和Stage的划分。html
本篇文章主要剖析,Stage是如何提交的。apache
rdd的依赖关系构成了DAG,DAGScheduler根据shuffle依赖关系将DAG图划分为一个一个小的stage。具体能够看 spark 源码分析之十九 -- DAG的生成和Stage的划分 作进一步了解。缓存
上篇文章中,DAGScheduler的handleJobSubmitted方法咱们只剖析了stage的生成部分,下面咱们看一下stage的提交部分源码。安全
首先构造ActiveJob对象,其次清除缓存的block location信息,而后记录jobId和job对象的映射关系到jobIdToActiveJob map集合中,而且将该jobId记录到活动的job集合中。app
获取到Job全部的stage的惟一标识,而且根据惟一标识来获取stage对象,而且调用其lastestInfo方法获取其StageInfo对象。ide
而后进一步封装成 SparkListenerJobStart 事件对象,并post到 listenerBus中,listenerBus 是一个 LiveListenerBus 对象,其内部封装了四个消息队列组成的集合,具体能够看 spark 源码分析之三 -- LiveListenerBus介绍 文章作进一步了解。函数
最后调用submitStage 方法执行Stage的提交。源码分析
先来看一下ActiveJob的说明。post
A running job in the DAGScheduler. Jobs can be of two types: a result job, which computes a ResultStage to execute an action, or a map-stage job, which computes the map outputs for a ShuffleMapStage before any downstream stages are submitted. The latter is used for adaptive query planning, to look at map output statistics before submitting later stages. We distinguish between these two types of jobs using the finalStage field of this class. Jobs are only tracked for "leaf" stages that clients directly submitted, through DAGScheduler's submitJob or submitMapStage methods. However, either type of job may cause the execution of other earlier stages (for RDDs in the DAG it depends on), and multiple jobs may share some of these previous stages. These dependencies are managed inside DAGScheduler.ui
它表明了正运行在DAGScheduler中的一个job,job有两种类型:result job,其经过计算一个ResultStage来执行一个action操做;map-stage job,它在下游的stage提交以前,为ShuffleMapStage计算map的输出。
finalStages是这个job的最后一个stage。
直接先来看submitStage方法,以下:
思路: 首先先获取可能丢失的父stage信息,若是该stage的父stage被遗漏了,则递归调用查看其爷爷stage是否被遗漏。
getMissingParentStages方法以下:
思路:不断建立父stage,能够看上篇文章 spark 源码分析之十九 -- DAG的生成和Stage的划分 作进一步了解。
submitMissingTasks方法过于长,为方便分析,按功能大体分为以下部分:
org.apache.spark.scheduler.ResultStage#findMissingPartitions 方法以下:
org.apache.spark.scheduler.ShuffleMapStage#findMissingPartitions 方法以下:
org.apache.spark.MapOutputTrackerMaster#findMissingPartitions 方法以下:
OutputCommitCoordinator 的 stageStart实现以下:
本质上就是把它放入到一个map中了。
思路:根据stage的RDD和分区id获取到其rdd中的分区的优先位置。
下面看一下 getPreferredLocs 方法:
注释中说到,它是线程安全的,下面看一下,它是如何实现的,即 getPrefferredLocsInternal 方法。
这个方法中提到四种状况:
1. 若是以前获取到过,那么直接返回Nil便可。
2. 若是以前已经缓存在内存中,直接从缓存的内存句柄中取出返回便可。
3. 若是RDD对应的是HDFS输入的文件等,则使用RDD记录的优先位置。
4. 若是上述三种状况都不知足,且是narrowDependency,则调用该方法,获取子RDDpartition对应的父RDD的partition的优先位置。
下面仔细说一下中间两种状况。
getCacheLocs 方法以下:
思路:先查看rdd的存储级别,若是没有存储级别,则直接返回Nil,不然根据RDD和分区id组成BlockId集合,请求存储系统中的BlockManager来获取block的位置,而后转换为TaskLocation信息返回。
RDD的 preferredLocations 方法以下:
思路:先从checkpoint中找,若是checkpoint中没有,则返回默认的为Nil。
返回对象是TaskLocation对象,作一下简单的说明。
类说明
A location where a task should run. This can either be a host or a (host, executorID) pair. In the latter case, we will prefer to launch the task on that executorID, but our next level of preference will be executors on the same host if this is not possible.
它有三个子类,以下:
这三个类定义以下:
很简单,不作过多说明。
TaskLocation伴随对象以下,如今用的方法是第二种 apply 方法:
对应方法以下:
org.apache.spark.scheduler.Stage#makeNewStageAttempt 方法以下:
很简单,主要是调用了StageInfo的fromStage方法。
先来看Stage类。
StageInfo封装了关于Stage的一些信息,用于调度和SparkListener传递stage信息。
其伴生对象以下:
对应源码以下:
经过broadcast机制,将数据广播到spark集群中的driver和各个executor中。关于broadcast的实现细节,能够查看 spark 源码分析之十四 -- broadcast 是如何实现的?作进一步了解。
根据stage的类型生成不一样的类型Task。关于过多Task 的内容,在阶段四进行剖析。
对应代码以下:
其中taskScheduler是 TaskSchedulerImpl,它是TaskScheduler的惟一子类实现。它负责task的调度。
org.apache.spark.scheduler.TaskSchedulerImpl#submitTasks方法实现以下:
其中 createTaskSetManager 方法以下:
SchedulableBuilder类是构建Schedulable树的接口。
schedulableBuilder 定义以下:
其中schedulingMode 能够经过参数 spark.scheduler.mode 来调整,默认为FIFO。
schedulableBuilder 初始化以下:
schedulableBuilder的 addTaskSetManager (FIFO)方法以下:
即调用了内部Pool对象的addSchedulable 方法:
关于更多TaskSetManager的内容,将在阶段四进行剖析。
backend是一个 SchedulerBackend 实例。在SparkContetx的初始化过程当中调用 createTaskScheduler 初始化 backend,具体能够看 spark 源码分析之四 -- TaskScheduler的建立和启动过程 作深刻了解。
在yarn 模式下,它有两个实现yarn-client 模式下的 org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend实现 和 yarn-cluster 模式下的 org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend 实现。
这两个类在spark 项目的 resource-managers 目录下的 yarn 目录下定义实现,固然它也支持 kubernetes 和 mesos,不作过多说明。
这两个类的继承关系以下:
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#reviveOffers 实现以下:
发送ReviveOffers 请求给driver。
driver端的 CoarseGrainedSchedulerBackend 的 receive 方法有以下事件处理分支:
其内部通过一系列RPC过程,关于 RPC 能够看 spark 源码分析之十二--Spark RPC剖析之Spark RPC总结 作进一步了解。
即会调用driver端的makeOffsers方法,以下:
本篇文章剖析了从DAGScheduler生成的Stage是如何被提交给TaskScheduler,以及TaskScheduler是如何把TaskSet提交给ResourceManager的。
下面就是task的运行部分了,下篇文章对其作详细介绍。跟task执行关系很密切的TaskSchedulerBackend、Task等内容,也将在下篇文章作更详细的说明。