Spark任务调度node
TaskScheduler调度入口:算法
(1)CoarseGrainedSchedulerBackend 在启动时会建立DriverEndPoint. 而DriverEndPoint中存在必定时任务,每隔必定时间(spark.scheduler.revive.interval, 默认为1s)进行一次调度(给自身发送ReviveOffers消息, 进行调用makeOffers进行调度)。代码以下所示dom
override def onStart() { // Periodically revive offers to allow delay scheduling to work
val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s") reviveThread.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { Option(self).foreach(_.send(ReviveOffers)) } }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS) }
(2)当Executor执行完成已分配任务时,会向Driver发送StatusUpdate消息,当Driver接收到消息后会调用 makeOffers(executorId)方法,进行任务调度, CoarseGrainedExecutorBackend 状态变化时向Driver (DriverEndPoint)发送StatusUpdate消息ide
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { val msg = StatusUpdate(executorId, taskId, state, data) driver match { case Some(driverRef) => driverRef.send(msg) case None => logWarning(s"Drop $msg because has not yet connected to driver") } }
Dirver接收到StatusUpdate消息时将会触发设调度(makeOffers),为完成任务的Executor分配任务。ui
override def receive: PartialFunction[Any, Unit] = { case StatusUpdate(executorId, taskId, state, data) => scheduler.statusUpdate(taskId, state, data.value) if (TaskState.isFinished(state)) { executorDataMap.get(executorId) match { case Some(executorInfo) => executorInfo.freeCores += scheduler.CPUS_PER_TASK makeOffers(executorId) case None =>
// Ignoring the update since we don't know about the executor.
logWarning(s"Ignored task status update ($taskId state $state) " + s"from unknown executor with ID $executorId") } } case ReviveOffers => makeOffers() case KillTask(taskId, executorId, interruptThread) => executorDataMap.get(executorId) match { case Some(executorInfo) => executorInfo.executorEndpoint.send(KillTask(taskId, executorId, interruptThread)) case None =>
// Ignoring the task kill since the executor is not registered.
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.") } }
其中makeOffers方法,会调用TaskSchedulerImpl中的resourceOffers方法,依其调度策略为Executor分配适合的任务。具体代码以下:spa
a、为全部资源分配任务3d
// Make fake resource offers on all executors
private def makeOffers() { // Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(!executorsPendingToRemove.contains(_)) val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toSeq launchTasks(scheduler.resourceOffers(workOffers)) }
b、为单个executor分配任务code
// Make fake resource offers on just one executor
private def makeOffers(executorId: String) { // Filter out executors under killing
if (!executorsPendingToRemove.contains(executorId)) { val executorData = executorDataMap(executorId) val workOffers = Seq( new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores)) launchTasks(scheduler.resourceOffers(workOffers)) } }
分配完任务后,向Executor发送LaunchTask指令,启动任务,执行用户逻辑代码orm
// Launch tasks returned by a set of resource offers
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { val serializedTask = ser.serialize(task) if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
try { var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
"spark.akka.frameSize or using broadcast variables for large values." msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize, AkkaUtils.reservedSizeBytes) taskSetMgr.abort(msg) } catch { case e: Exception => logError("Exception in error callback", e) } } } else { val executorData = executorDataMap(task.executorId) executorData.freeCores -= scheduler.CPUS_PER_TASK executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) } } }
一、FIFOxml
FIFO(先进先出)方式调度Job,以下图所示,每一个Job被切分红多个Stage.第一个Job优先获取全部可用资源,接下来第二个Job再获取剩余可用资源。(每一个Stage对应一个TaskSetManager)
二、FAIR
FAIR共享模式调度下,Spark以在多Job之间轮询方式为任务分配资源,全部的任务拥有大体至关的优先级来共享集群的资源。FAIR调度模型以下图:
当触发调度时,会调用TaskSchedulerImpl的resourceOffers方法,方法中会依照调度策略选出要执行的TaskSet, 而后取出适合(考虑本地性)的task交由Executor执行, 其代码以下:
/** * Called by cluster manager to offer resources on slaves. We respond by asking our active task * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so * that tasks are balanced across the cluster. */ def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { // Mark each slave as alive and remember its hostname // Also track if new executor is added
var newExecAvail = false
for (o <- offers) { executorIdToHost(o.executorId) = o.host activeExecutorIds += o.executorId if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) newExecAvail = true } for (rack <- getRackForHost(o.host)) { hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host } } // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
val shuffledOffers = Random.shuffle(offers) // Build a list of tasks to assign to each worker.
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) val availableCpus = shuffledOffers.map(o => o.cores).toArray val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( taskSet.parent.name, taskSet.name, taskSet.runningTasks)) if (newExecAvail) { taskSet.executorAdded() } } // Take each TaskSet in our scheduling order, and then offer it each node in increasing order // of locality levels so that it gets a chance to launch local tasks on all of them. // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) { do { launchedTask = resourceOfferSingleTaskSet( taskSet, maxLocality, shuffledOffers, availableCpus, tasks) } while (launchedTask) } if (tasks.size > 0) { hasLaunchedTask = true } return tasks }
通过分析可知,经过rootPool.getSortedTaskSetQueue对队列中的TaskSet进行排序,getSortedTaskSetQueue的具体实现以下:
override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = { var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager] val sortedSchedulableQueue = schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator) for (schedulable <- sortedSchedulableQueue) { sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue } sortedTaskSetQueue }
由上述代码可知,其经过算法作为比较器对taskSet进行排序, 其中调度算法有FIFO和FAIR两种,下面分别进行介绍。
优先级(Priority): 在DAGscheduler建立TaskSet时使用JobId作为优先级的值。
FIFO调度算法实现以下所示
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm { override def comparator(s1: Schedulable, s2: Schedulable): Boolean = { val priority1 = s1.priority val priority2 = s2.priority var res = math.signum(priority1 - priority2) if (res == 0) { val stageId1 = s1.stageId val stageId2 = s2.stageId res = math.signum(stageId1 - stageId2) } if (res < 0) { true } else { false } } }
由源码可知,FIFO依据JobId进行挑选较小值。由于越早提交的做业,JobId越小。
对同一个做业(Job)来讲越先生成的Stage,其StageId越小,有依赖关系的多个Stage之间,DAGScheduler会控制Stage是否会被提交到调度队列中(若其依赖的Stage未执行完前,此Stage不会被提交),其调度顺序可经过此来保证。但若某Job中有两个无入度的Stage的话,则先调度StageId小的Stage.
Fair调度队列相比FIFO较复杂,其可存在多个调度队列,且队列呈树型结构(现阶段Spark的Fair调度只支持两层树结构),每用户可使用sc.setLocalProperty(“spark.scheduler.pool”, “poolName”)来指定要加入的队列,默认状况下会加入到buildDefaultPool。每一个队列中还可指定本身内部的调度策略,且Fair还存在一些特殊的属性:
schedulingMode: 设置调度池的调度模式FIFO或FAIR, 默认为FIFO.
minShare:最少资源保证量,当一个队列最少资源未知足时,它将优先于其它同级队列获取资源。
weight: 在一个队列内部分配资源时,默认状况下,采用公平轮询的方法将资源分配给各个应用程序,而该参数则将打破这种平衡。例如,若是用户配置一个指定调度池权重为2, 那么这个调度池将会得到相对于权重为1的调度池2倍的资源。
以上参数,可经过conf/fairscheduler.xml文件配置调度池的属性。
Fair调度算法实现以下所示:
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm { override def comparator(s1: Schedulable, s2: Schedulable): Boolean = { val minShare1 = s1.minShare val minShare2 = s2.minShare val runningTasks1 = s1.runningTasks val runningTasks2 = s2.runningTasks val s1Needy = runningTasks1 < minShare1 val s2Needy = runningTasks2 < minShare2 val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble var compare: Int = 0
if (s1Needy && !s2Needy) { return true } else if (!s1Needy && s2Needy) { return false } else if (s1Needy && s2Needy) { compare = minShareRatio1.compareTo(minShareRatio2) } else { compare = taskToWeightRatio1.compareTo(taskToWeightRatio2) } if (compare < 0) { true } else if (compare > 0) { false } else { s1.name < s2.name } } }
由原码可知,未知足minShare规定份额的资源的队列或任务集先执行;若是全部均不知足minShare的话,则选择缺失比率小的先调度;若是均不知足,则按执行权重比进行选择,先调度执行权重比小的。若是执行权重也相同的话则会选择StageId小的进行调度(name=“TaskSet_”+ taskSet.stageId.toString)。
以此为标准将全部TaskSet进行排序, 而后选出优先级最高的进行调度。
当选出TaskSet后,将按本地性从中挑选适合Executor的任务,在Executor上执行。