本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归做者(秦凯新)全部,禁止转载,欢迎学习。算法
一级调度:Cluster Manger (YARN模式下为ResourceManger , Standalone 模式下为 Master )负责将资源分配给Application。这里的资源如:cpu核数,内存,磁盘空间等。缓存
二级调度:Application 进一步将资源分配给Application的各个Task。DAG任务调度,延迟调度等。架构
任务(Task): Task分为ResultTask和ShuffleMapTask两种,每个Stage会根据未完成的Partion的多少,建立零到多个Task,DAGScheduer最后将每一个Stage中的Task以任务集合(TaskSet)的形式提交给TaskScheduler继续处理。app
TaskSchedulerImpl :手握StandaloneSchedulerBackend(StandaloneAppClient)和 CoarseGrainedSchedulerBackend(DriverEndpoint)两大通信端点神器。操控了整个集群资源的汇报和资源调度。框架
CoarseGrainedSchedulerBackend:维护了一个executorDataMap,用于实时拿到最新的且活着的executor用于资源分配。ide
StandaloneSchedulerBackend: 持有TaskSchedulerImpl的引用,目前来看,就是为了初始化启动StandaloneAppClient和DriverEndpoint终端,经过接受消息,实际干活的仍是TaskSchedulerImpl。oop
StandaloneAppClient:在Standalone模式下StandaloneSchedulerBackend在启动的时候构造AppClient实例并在该实例start的时候启动了ClientEndpoint这个消息循环体。ClientEndpoint在启动的时候会向Master注册当前程序。(Interface allowing applications to speak with a Spark standalone cluster manager.)post
SparkContext -> createTaskScheduler -> new TaskSchedulerImpl(sc) ->
new StandaloneSchedulerBackend(scheduler, sc, masterUrls)-> scheduler.initialize(backend)
_taskScheduler.start()-> backend.start()
复制代码
StandaloneSchedulerBackend->start()
-> super.start() [CoarseGrainedSchedulerBackend]->
createDriverEndpointRef(properties)-> createDriverEndpoint(properties) ->
-> new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf).start()
复制代码
onStart -> registerWithMaster -> tryRegisterAllMasters -> rpcEnv.setupEndpointRef(masterAddress,
Master.ENDPOINT_NAME) -> masterRef.send(RegisterApplication(appDescription, self))
复制代码
RegisteredApplication(启动时向Master注册)
ApplicationRemoved
ExecutorAdded
ExecutorUpdated
WorkerRemoved
MasterChanged
复制代码
DriverEndpoint:而StandaloneSchedulerBackend的父类CoarseGrainedSchedulerBackend在start的时候会实例化类型为DriverEndpoint(这就是咱们程序运行时候的经典对象的Driver)的消息循环体,StandaloneSchedulerBackend专门负责收集Worker上的资源信息, 当Worker端的ExecutorBackend启动的时候会发送RegisteredExecutor信息向DriverEndpoint注册,学习
StatusUpdate
ReviveOffers -->
KillTask
KillExecutorsOnHost
RemoveExecutor
RegisterExecutor()
StopDriver
StopExecutors
RemoveWorker
复制代码
Spark调度系统整体规律:在Standalone模式下StandaloneSchedulerBackend在启动的时候构造AppClient实例并在该实例start的时候启动了ClientEndpoint这个消息循环体。ClientEndpoint在启动的时候会向Master注册当前程序。而StandaloneSchedulerBackend的父类CoarseGrainedSchedulerBackend在start的时候会实例化类型为DriverEndpoint(这就是咱们程序运行时候的经典对象的Driver)的消息循环体,StandaloneSchedulerBackend专门负责收集Worker上的资源信息,当ExecutorBackend启动的时候会发送RegisteredExecutor信息向DriverEndpoint注册,此时StandaloneSchedulerBackend就掌握了当前应用程序拥有的计算资源,TaskScheduler就是经过StandaloneSchedulerBackend拥有的计算资源来具体运行Task。fetch
TaskSchdulerImpl
LiveListenerBus
MapoutTrackerMaster
BlockManagerMaster
SparkEnv
cacheLocas:缓存每一个RDD的全部分区的位置信息,最终创建分区号和位置信息序列映射。为何是位置序列? 这里着重讲解一下:每个分区可能存在多个副本机制,所以RDD的每个分区的BLock可能存在多个节点的BlockManager上,所以是序列。听懂了吗??
new HashMap[Int, IndexedSeq[Seq[TaskLocation]]]
复制代码
MessageScheduler:职责是对失败的Stage进行重试,以下面的执行线程代码段:
private val messageScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message")
case FetchFailed -> messageScheduler.schedule(new Runnable {
override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
}, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
}
复制代码
getPreferredLocs:重量级方法,分区最大偏好位置获取。最终把分区最佳偏好位置序列放在cacheLocas中,获取不到,调用rdd.preferredLocations方法获取。
getPreferredLocs
-> getPreferredLocsInternal
-> getCacheLocs(rdd)(partition) ---> cacheLocs(rdd.id) = locs() -> 返回 cached
(取不到直接放进内存后,再返回偏好序列)
-> val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
复制代码
getCacheLocs 方法代码段:
def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] =
cacheLocs.synchronized {
if (!cacheLocs.contains(rdd.id)) {
val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) {
IndexedSeq.fill(rdd.partitions.length)(Nil)
} else {
val blockIds =
rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]
blockManagerMaster.getLocations(blockIds).map { bms =>
bms.map(bm => TaskLocation(bm.host, bm.executorId))
}
}
cacheLocs(rdd.id) = locs
}
cacheLocs(rdd.id)
}
复制代码
eventProccessLoop:大名鼎鼎的DAGSchedulerEventProcessLoop,事件处理线程,负责处理各类事件,如:
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener,
properties)
case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
case StageCancelled(stageId) =>
dagScheduler.handleStageCancellation(stageId)
case JobCancelled(jobId) =>
dagScheduler.handleJobCancellation(jobId)
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)
case ExecutorLost(execId) =>
dagScheduler.handleExecutorLost(execId, fetchFailed = false)
case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)
case completion: CompletionEvent =>
dagScheduler.handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason, exception) =>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
复制代码
-> DAGScheduler.runJob->submitJob(rdd, func, partitions, callSite, resultHandler,
properties)
-> DAGScheduler.eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
-> DAGSchedulerEventProcessLoop-> case JobSubmitted(jobId, dependency, callSite, listener, properties)->
-> DAGScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
-> DAGScheduler.createResultStage(finalRDD, func, partitions, jobId, callSite) (广度优先算法,创建无环图)
-> DAGScheduler.submitStage(finalStage)
-> DAGScheduler.submitMissingTasks(stage, jobId.get) (提交最前面的Stage0)
-> taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
-> taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId,
properties))
-> createTaskSetManager(taskSet, maxTaskFailures) (TaskSchedulerImpl.submitTasks方法内)
-> taskScheduler ->initialize -> FIFOSchedulableBuilder ->buildPools (TaskSchedulerImpl初始化构建)
-> schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
(TaskSchedulerImpl.submitTasks方法内, TaskSetManager pool ,遵循FIFO)
-> backend.reviveOffers()(CoarseGrainedSchedulerBackend)
-> driverEndpoint.send(ReviveOffers)
-> DriverEndpoint.receive -> case ReviveOffers makeOffers() (makeOffers封装公共调度任务调度方法)
-> makeOffers(得到活着的Executor及executorHost和freeCores) -> workOffers(CoarseGrainedSchedulerBackend内部)
-> TaskSchedulerImpl.resourceOffers(workOffers)(CoarseGrainedSchedulerBackend引用TaskSchedulerImpl)
-> val sortedTaskSets = TaskSchedulerImpl.rootPool.getSortedTaskSetQueue (TaskSetManager上场)
-> TaskSchedulerImpl.resourceOfferSingleTaskSet(
taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)t
-> TaskSetManager.resourceOffer(execId, host, maxLocality))
-> TaskSchedulerImpl.getAllowedLocalityLevel(curTime) (延迟调度,期待个人下一篇精彩讲解)
-> TaskSetManager.dequeueTask(execId, host, allowedLocality)
-> sched.dagScheduler.taskStarted(task, info)
-> new TaskInfo(taskId, index, attemptNum, curTime, execId, host, taskLocality, speculative)
复制代码
反向驱动:长江后浪推前浪,这里我发现一个奇怪的事情,Spark 2.0版本stage的反向驱动算法和Spark 2.3的竟然不同,这里以Spark 2.3为准:
DAGScheduler进行submitStage提交后使命就结束了,最终实现submitStage正向提交任务集合便可:
-> DAGScheduler. handleJobSubmitted
-> finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
(封装 finalRDD 和func为最后的Stage)
-> submitStage(finalStage) -> submitMissingTasks
复制代码
可是createResultStage反向驱动算法精彩开始了:
-> createResultStage
-> getOrCreateParentStages -> getShuffleDependencies (获取父依赖)
-> getOrCreateShuffleMapStage -> getShuffleDependencies (不断循环,造成递归)
-> createShuffleMapStage
复制代码
如下代码段做用是获取父stage,有的话直接返回,没有就从新根据final RDD父依赖来建立Stage。注意这里会不断递归调用getOrCreateParentStages,最终创建Stage,也所以
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
复制代码
}
getOrCreateShuffleMapStage:建立依赖的依赖的所在Stage,有的话会直接获取,没有就优先建立 父Stage,而后执行子Stage.
private def getOrCreateShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) =>
stage
case None =>
// Create stages for all missing ancestor shuffle dependencies.
getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
// Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
// that were not already in shuffleIdToMapStage, it's possible that by the time we
// get to a particular dependency in the foreach loop, it's been added to
// shuffleIdToMapStage by the stage creation process for an earlier dependency. See
// SPARK-13902 for more information.
if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
createShuffleMapStage(dep, firstJobId)
}
}
// Finally, create a stage for the given shuffle dependency.
createShuffleMapStage(shuffleDep, firstJobId)
复制代码
} }
createShuffleMapStage:最终落地方法,就是要返回须要的stage,注意阻塞点就在getOrCreateParentStages,从而一直递归到最顶层。
def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
val numTasks = rdd.partitions.length
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ShuffleMapStage(
id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
stageIdToStage(id) = stage
shuffleIdToMapStage(shuffleDep.shuffleId) = stage
updateJobIdStageIdMaps(jobId, stage)
if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
}
stage
}
复制代码
总流程是:先建立最顶层Satge,慢慢递归执行建立子stage,相似于堆栈模型。
总流程为何是这样呢?如无环图: G -> F A -> H -> M L->N,由于由createShuffleMapStage作反向驱动,阻塞点在就在方法内的getOrCreateParentStages,所以先把建立(Final Stage的父stage的建立方法 F, A)放在堆栈底部,不断向上存放(F, A以上的父Stage建立方法)依次到堆栈,但却不执行Stage建立,直到最后最顶层Stage建立方法放到堆栈时,在获得rdd的顶级父亲时,开始执行最顶层Stage建立方法,也即createShuffleMapStage开始从阻塞点递归创建依赖关系,注册Shuffle,执行createShuffleMapStage方法体,而后依次从阻塞点递归向下执行。
注意 getOrCreateShuffleMapStage的缓存机制,即shuffleIdToMapStage,实现了依赖的直接获取,不用再重复执行,如F,A的父Stage的获取。
G -> F A -> H -> M L->N
1 先从 F A 的父依赖开始 开始递归构建stage
2 进而开始创建H以上的stagey以及依赖关系。
3 后创建 F A 与 H stage 的关系,注意H Stage是从shuffleIdToMapStage拿到的,
最后返回F,A stage ,创建 final RDD(G)与F A的依赖关系
4 最终提交submitStage(finalStage)
最终stage构建顺序为: N -> M L -> H-> F A -> G
复制代码
本节内容是做者投入大量时间优化后的内容,采用最平实的语言来剖析Spark的任务调度,如今时间为凌晨1:22,最后放张图DAGScheduler.handleTaskCompletion,正向提交任务,放飞吧,spark。