val yarnClient = YarnClient.createYarnClient setupCredentials() yarnClient.init(yarnConf) yarnClient.start() // Get a new application from our RM val newApp = yarnClient.createApplication() val newAppResponse = newApp.getNewApplicationResponse() appId = newAppResponse.getApplicationId() // Set up the appropriate contexts to launch our AM val containerContext = createContainerLaunchContext(newAppResponse) val appContext = createApplicationSubmissionContext(newApp, containerContext) // Finally, submit and monitor the application logInfo(s"Submitting application $appId to ResourceManager") yarnClient.submitApplication(appContext)
一、FairScheduler接收到SchedulerEventType.APP_ADDED以后,调用addApplication方法把把RMApp添加到队列里面,结束以后发送RMAppEventType.APP_ACCEPTED给RMAppjava
二、RMApp启动RMAttempt以后,发送SchedulerEventType.APP_ATTEMPT_ADDED给FairSchedulernode
LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user: " + user);
三、FairScheduler调用addApplicationAttempt方法,发送RMAppAttemptEventType.ATTEMPT_ADDED事件给RMAppAttempt,RMAppAttempt随后调用Scheduler的allocate方法发送AM的ResourceRequestapp
四、FairScheduler在allocate方法里面对该请求进行处理,FairScheduler对于AM的资源请求的优先级上并无特殊的照顾,详细请看章节2 如何分配资源异步
amClient = AMRMClient.createAMRMClient()
amClient.init(conf)
amClient.start()
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
// 1.建立资源请求 amClient.addContainerRequest(request) // 2.发送资源请求 val allocateResponse = amClient.allocate(progressIndicator) val allocatedContainers = allocateResponse.getAllocatedContainers() if (allocatedContainers.size > 0) { // 3.请求返回以后处理Container handleAllocatedContainers(allocatedContainers.asScala) }
def startContainer(): java.util.Map[String, ByteBuffer] = { val ctx = Records.newRecord(classOf[ContainerLaunchContext]) .asInstanceOf[ContainerLaunchContext] val env = prepareEnvironment().asJava ctx.setLocalResources(localResources.asJava) ctx.setEnvironment(env) val credentials = UserGroupInformation.getCurrentUser().getCredentials() val dob = new DataOutputBuffer() credentials.writeTokenStorageToStream(dob) ctx.setTokens(ByteBuffer.wrap(dob.getData())) val commands = prepareCommand() ctx.setCommands(commands.asJava) ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr).asJava) // If external shuffle service is enabled, register with the Yarn shuffle service already // started on the NodeManager and, if authentication is enabled, provide it with our secret // key for fetching shuffle files later if (sparkConf.get(SHUFFLE_SERVICE_ENABLED)) { val secretString = securityMgr.getSecretKey() val secretBytes = if (secretString != null) { // This conversion must match how the YarnShuffleService decodes our secret JavaUtils.stringToBytes(secretString) } else { // Authentication is not enabled, so just provide dummy metadata ByteBuffer.allocate(0) } ctx.setServiceData(Collections.singletonMap("spark_shuffle", secretBytes)) } // Send the start request to the ContainerManager try { nmClient.startContainer(container.get, ctx) } catch { case ex: Exception => throw new SparkException(s"Exception while starting container ${container.get.getId}" + s" on host $hostname", ex) } }
在FairScheduler的allocate方法里面仅仅是记录ResourceRequest,并不会真正的立马分配。ide
流程以下:oop
一、检查该APP是否注册过fetch
二、检查资源的请求是否超过最大内存和最大CPU的限制ui
三、记录资源请求的时间,最后container分配的延迟会体如今队列metrics的appAttemptFirstContainerAllocationDelay当中spa
四、释放AM发过来的已经不须要的资源,主要逻辑在FSAppAttempt的containerCompleted方法里线程
五、更新资源请求,全部资源请求都是记录在AppSchedulingInfo当中的requests(注意:只有是ANY的资源请求才会被立马更新到QueueMetrics的PendingResources里)
六、找出该APP被标记为抢占的container ID列表preemptionContainerIds
七、更新APP的黑名单列表,该信息被记录在AppSchedulingInfo当中
八、从FSAppAttempt的newlyAllocatedContainers当中获取最新被分配的container
九、返回preemptionContainerIds、HeadRoom、ContainerList、NMTokenList。(注:Headroom = Math.min(Math.min(queueFairShare - queueUsage, 0), maxAvailableResource)
请求和分配的过程是异步的,关系如上图,每次调用allocate得到的container,实际上是以前的请求被分配的结果
分配有两种方式:
一、接收到NodeManager的心跳的时候进行分配
NodeManager每隔一秒(yarn.resourcemanager.nodemanagers.heartbeat-interval-ms)给ResourceManager发送一个心跳事件NODE_UPDATE,接收到心跳事件以后,在FairScheduler的nodeUpdate方法里进行处理。
NodeManager会汇报新启动的Container列表newlyLaunchedContainers和已经结束的Container列表completedContainers。而后在attemptScheduling方法里面进行分配。
二、持续调度方式
它有一个单独的线程,线程名称是FairSchedulerContinuousScheduling,每5毫秒对全部节点的资源进行排序,而后遍历全部节点,调用attemptScheduling方法进行分配。
开启持续调度模式以后,在接收到心跳事件NODE_UPDATE的时候,只有在completedContainers不为空的状况下,才会进行调度
attemptScheduling首先会检查是否有资源预留,若是有预留,则直接为预留的APP分配container
没有预留的分配过程以下:
一、最大可分配资源为这台机器的可用资源的一半,从root队列开始自上而下进行分配Resource assignment = queueMgr.getRootQueue().assignContainer(node);
二、分配到一个Container以后,判断是否要连续分配多个,最大支持连续分配多少个?
如下是涉及到的各个参数以及参数的默认值:
yarn.scheduler.fair.assignmultiple false (建议设置为true)
yarn.scheduler.fair.dynamic.max.assign true (hadoop2.7以后就没有这个参数了)
yarn.scheduler.fair.max.assign -1 (建议设置为2~3,不要设置得太多,不然会有调度倾斜的问题)
入口在queueMgr.getRootQueue().assignContainer(node);
一、检查当前队列的使用量是否小于最大资源量
二、首先对子队列进行排序,优先顺序请参照章节 2.3.4 如何肯定优先顺序
三、排序完再调用子队列的assignContainer方法分配container
四、一直递归到叶子队列
叶子队列如何进行分配?
一、先对runnableApps进行排序,排序完成以后,for循环遍历一下
二、先检查该Node是否在APP的黑名单当中
三、检查该队列是否能够运行该APP的AM,主要是检查是否超过了maxAMShare(根据amRunning字段判断是否已经启动了AM了)
检查逻辑的伪代码以下:
maxResource = getFairShare() if (maxResource == 0) { // 最大资源是队列的MaxShare和集群总资源取一个小的值 maxResource = Math.min(getRootQueue().AvailableResource(), getMaxShare()); } maxAMResource = maxResource * maxAMShare if (amResourceUsage + amResource) > maxAMResource) { // 能够运行 return true } else { // 不能够运行 return false }
四、给该APP分配container
下面以一个例子来讲明分配的过程是如何选择队列的:
假设队列的结构是这样子的
root
---->BU_1
-------->A
-------->B
---->BU_2
-------->C
-------->D
任务分配Container的时候会考虑请求的本地性,对于调度器来讲,它的本地性分为三种:NODE_LOCAL, RACK_LOCAL, OFF_SWITCH
具体方法位于FSAppAttempt的assignContainer方法
遍历优先级
给该优先级的调度机会+1
获取RackLocal和NodeLocal的任务
计算容许分配的本地性级别allowedLocality,默认是NODE_LOCAL
一、心跳分配方式
计算调度机会,若是该优先级的任务的调度机会超过了(节点数 * NODE_LOCAL阈值),降级为RACK_LOCAL,若是该优先级的任务的调度机会超过了(节点数 * RACK_LOCAL阈值),降级为OFF_SWITCH
二、连续分配方式
计算等待时间waitTime -= lastScheduledContainer.get(priority);
若是waitTime超过了NODE_LOCAL容许的delay时间,就降级为RACK_LOCAL,再超过RACK_LOCAL容许的delay的时间,就降级为OFF_SWITCH
分配NODE_LOCAL的container
容许分配的本地性级别>=RACK_LOCAL,分配RACK_LOCAL的container
容许分配的本地性级别=OFF_SWITCH,分配OFF_SWITCH的container
都分不到,等待下一次机会
相关参数:
默认值全是-1,则容许的本地性级别是OFF_SWITCH
yarn.scheduler.fair.locality-delay-node-ms -1
yarn.scheduler.fair.locality-delay-rack-ms -1
yarn.scheduler.fair.locality.threshold.node -1
yarn.scheduler.fair.locality.threshold.rack -1
一、检查该节点的资源是否足够,若是资源充足
二、若是当前的allowedLocality比实际分配的本地性低,则重置allowedLocality
三、把新分配的Container加到newlyAllocatedContainers和liveContainers列表中
四、把分配的container信息同步到appSchedulingInfo当中
五、发送RMContainerEventType.START事件
六、更新FSSchedulerNode记录的container信息
七、若是被分配的是AM,则设置amRunning为true
若是资源不够,则检查是否能够预留资源
条件:
1)Container的资源请求必须小于Scheduler的增量分配内存 * 倍数(默认应该是2g)
2)若是已经存在的预留数 < 本地性对应的可用节点 * 预留比例
3)一个节点只容许同时为一个APP预留资源
相关参数:
yarn.scheduler.increment-allocation-mb 1024
yarn.scheduler.increment-allocation-vcores 1
yarn.scheduler.reservation-threshold.increment-multiple 2
yarn.scheduler.fair.reservable-nodes 0.05
该比较规则同时适用于队列和APP,详细代码位于FairSharePolicy当中
MinShare = Math.min(getMinShare(), getDemand())
一、(当前资源使用量 / MinShare)的比值越小,优先级越高
二、若是双方资源使用量都超过MinShare,则(当前资源使用量 / 权重)的比值越小,优先级越高
三、启动时间越早,优先级越高
四、最后实在比不出来,就比名字...
从上面分配的规则当中能看出来MinShare是很是重要的一个指标,当资源使用量没有超过MinShare以前,队列在分配的时候就会比较优先,切记必定要设置啊!
注:getMinShare()是FairScheduler当中队列的minResources
<minResources>6887116 mb,4491 vcores</minResources>