本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark源码解读及商业实战指导,请持续关注本套博客。版权声明:本套Spark源码解读及商业实战归做者(秦凯新)全部,禁止转载,欢迎学习。算法
Spark数据本地化即移动计算而不是移动数据,为了让task能尽量的以最优本地化级别(Locality Levels)来启动,Spark的延迟调度应运而生,资源不够可在该Locality Levels对应的限制时间内重试,超过限制时间后还没法启动则下降Locality Levels再尝试启动。缓存
注意 CoarseGrainedSchedulerBackend.makeOffers在任意Executor上变更时,开始调用,makeOffers属于公共方法。架构
StatusUpdate
RegisterExecutor
复制代码
首先选定一个Executor,选中在指定executor上的任务,以最大优先级分配。框架
CoarseGrainedSchedulerBackend.makeOffers(公共方法,任务分配触发点,过滤活的Executor())
||
||
\||/
val tasks = TaskSchedulerImpl.resourceOffer(workOffers) (分配开始)
||
||
\||/
(遍历全部TaskSet内部的Task的优先级,以最大本地性开始分配任务)
for (currentMaxLocality <- taskSet.myLocalityLevels)
||
||
\||/
(遍历全部可用的Executor,以指定Executor开始分配)
for (i <- 0 until shuffledOffers.size)
||
||
\||/
(选定一个Executor,经过TaskSetManager进行专项任务分配)
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetManager(tid) = taskSet
taskIdToExecutorId(tid) = execId
executorIdToRunningTaskIds(execId).add(tid)
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
launchedTask = true
}
||
||
\||/
(TaskSchedulerImpl引用结束后,返回Tasks,后执行)
CoarseGrainedSchedulerBackend.launchTasks(taskS)
复制代码
经过在CoarseGrainedSchedulerBackend 中的makeOffers方法,经过scheduler的引用,执行TaskSchedulerImpl.resourceOffers 方法,返回taskDescs(包含了全部Task的位置信息和task的算子等),后执行launchTasks,向 executor 发送消息executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))oop
private def makeOffers() {
val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map {
case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toIndexedSeq
scheduler.resourceOffers(workOffers)
复制代码
} if (!taskDescs.isEmpty) { launchTasks(taskDescs) } }post
TaskSetManager中resourceOffer内部的是如何分配任务的呢?学习
-> allowedLocality = getAllowedLocalityLevel(curTime) (延迟调度)
-> dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality,
speculative) (返回TaskDescription序列,方便后续发送到Executor)
复制代码
本篇内容还须要完善,并作进一步剖析。this
秦凯新 于深圳spa