一、 资源分配html
经过SparkSubmit进行提交应用后,首先会建立Client将应用程序(字节码文件.class)包装成Driver,并将其注册到Master。Master收到Client的注册请求后将其加入待调度队列waitingDrivers,并等待分配执行资源。apache
1.1 Dirver调度(分配Driver执行容器,1个)app
Master中调度程序执行时会为Driver分配一知足其执行要求的Worker, 并通知Worker启动将Driver。Worker接到执行Driver指令后建立DriverRunner执行Driver(应用程序mainClass,mainClass执行时其会建立Spark执行上下文环境:SparkContext。伴随SparkContext会建立DAGScheduler和TaskScheduler分别用于Stage调度和任务调度,并会触发RDD的Action算子提交job)。spa
1.2 APP调度(分配Executor, 多个)线程
若想Job运行就须要获得执行资源,Dirver成功执行后,会经过SparkDeployScheduler-Backend建立AppClient(包装App信息,包含能够建立CoarseGrainedExecutorBackend实例Command),用于向Master汇报资源需求。Master接到AppClient的汇报后,将其加入waittingApps队列,等待调度。htm
App调度时会为app分配知足条件的资源-----Worker(State是Alive,其上并无该Application的executor,可用内存知足要求(spark.executor.memory指定,默认512), 核知足要求(spark.cores.max, 最大可用core数,若未指定,则为所有资源)),而后通知Woker启动Excutor. 及向AppClient发送ExecutorAdded消息。blog
进行调度时,调度程序会根据配制SpreadOutApps = spark.deploy.spreadOut状况决定资源分配方式,若排序
SpreadOutApps方式:将每一个app分配到尽量多的worker中执行。递归
1 从列表中取下一app,根据CPU状况找出合适的woker,按核从小到大排序队列
2 若是worker节点存在能够分配的core 则进行预分配处理(轮循一次分一个直至知足app需求),并在分配列表(assigned = Array[Int](numUsable))中记数。
3根据assinged列表中的预分配信息,进行分配Executor(真实分配)
4 启动Executor并设置app.state = ApplicationState.RUNNING
非SpreadOutApps方式: 将每一个app分配到尽量少的worker中执行。
1 从可用的worker列表中取下一work. (worker <- workers if worker.coresFree > 0)
2 遍历waitingApps 找到知足app运行条件的app,进行分配
3启动Executor(launchExecutor(w,e))并设置app.state = ApplicationState.RUNNING
其中:launchExcutor(worker, exec) 具体内容以下:
向executor分配给worker
通知worker启动executor
由分配过程可知, 分配的Excutor个数与CPU核心数有关。当指定完Worker节点后,会在Worker节点建立ExecutorRunner,并启动,执行App中的Command 去建立并启动CoarseGrainedExecutorBackend。CoarseGrainedExecutorBackend启动后,会首先经过传入的driverUrl这个参数向在CoarseGrainedSchedulerBackend::DriverActor(用于与Master通讯,及调度任务)发送RegisterExecutor(executorId, hostPort, cores),DriverActor会建立executorData(executor信息)加入executorDataMap供后续task使用,并回复RegisteredExecutor,此时CoarseGrainedExecutorBackend会建立一个org.apache.spark.executor.Executor。至此,Executor建立完毕。Executor是直接用于task执行, 是集群中的直接劳动者。
至此,资源分配结束。当分配完资源后,就能够为依本地性为任务分配具体的执行资源。
二、Stage划分
当执行mainClass时,执行到RDD的action算子时,会触发执行做业(sc.runJob),最终经过调用DAGScheduler的runJob方法根据RDD信息及action算子要作的操做建立ResultStage(FinalStage)及ActiveJob。
若ResultStage建立成功的话,根据配制信息及RDD特征可分为本地执行,集群执行。
若“spark.localExecution.enable”指定容许本地运行(默认为:false,不容许),具RDD的action算了容许本地运行allowLocal=true,且RDD只有一个partition的话能够直接以本地线程执行job,无需划分stage。不然要将job分红多个Stage提交到集群去执行(经过提交ResultStage进行)。
由于ResultStage提交时,首先会去判断其是否存在缺失的ParentStage(也就是说是否存在未完成的父Stage)。如有,则其须要等待其父Stage执行完成,才能进行提交执行。
判断是否存在Stage的标准是看是否存在ShuffeDependency(Stage的分界线)。提交ResultStage时会根据其finalRDD 的依赖递归的寻找其DAG图中是否存在ShuffeDependency, 若存在,则建立ShuffleMapStage作为finalStage的父Stage以此相似。但至此,只能说存在父Stage并不能说存在缺失的父Stage. 判断缺失的标准是看其结果成功的输出信息(status)个数与其处理的分区个数是否相同,如若相同,则说明父Stage已经执行完成, 不存在missing;不然,说明还未完成,存在missing. 由于将ShuffleMapStage划分红maptask时,每一个Partition对应一个maptask, 每一个task会获得一个status输出结果信息,并在执行结束时将输出结果上报mapOutputTracker,并更新shuffleStage状态(将status增长进行其outputLocs列表,并将numAvailableOutputs加1),若numAvailableOutputs 与 Stage所要处理的partitions一致,说明全部的task都已经执行完成,即Stage执行完成;不然,说明还有task未完成,即Stage未完成。
由上述分析可知,存在依赖关系的两个Stage,若是父Stage未执行完成,子Stage不能提交,也就是不能转变为Taskset加入任务调度队列。所以其前后顺序是严格控制的。咱们知道只有存在ShuffleDependency时,才会划分Stage,这也就是说两个Stage之间是要作Shuffle操做的。根据上述分析可知Shuffle时ShuffleWrite作不完,ShuffleRead不能进行.
3. Task调度
当Stage不存在缺失的ParentStage时,会将其转换为TaskSet并提交。转换时依Stage类型进行转换:将ResultStage转换成ResultTask, ShuffleMapStage转换成ShuffleMapTask. Task个数由Stage中finalRDD 的分区数决定。
当转换成的TaskSet提交以后,将其经过taskScheduler包装成TaskSetManager并添加至调度队列中(Pool),等待调度。在包装成TaskSetManager时,根据task的preferredLocatitions将任务分类存放在pendingTasksForExecutor, pendingTaskForHost, pendingTasksForRack, pendingTaskWithNoPrefs及allPendingTasks中, 前三个列表是是包含关系(本地性愈来愈低),范围起来越大,例如:在pendingTasksForExecutor也在pendingTaskForHost,pendingTasksForRack中, 分类的目的是在调度时,依次由本地性高à低的查找task。
在进行Task调度时,首先根据调度策略将可调度全部taskset进行排序,而后对排好序的taskset待调度列表中的taskset,按序进行分配Executor。再分配Executor时,而后逐个为Executor列表中可用的Executor在这次选择的taskset中按本地性由高到低查找适配任务。此处任务调度为延迟调度,即若本次调度时间距上一任务结束时间小于当前本地性配制时间则等待,若过了配制时间,本地性要求逐渐下降,再去查找适配的task。当选定某一task后后将其加入runningtask列表,当其执行完成时会加入success列表,下次调度时就会过滤过存在这两个列表中的任务,避免重复调度。
当一个任务执行结束时,会将其从runningtask中移除,并加入success,并会适放其占用的执行资源,供后序task使用, 将判断其执行成功的task数与此taskset任务总数相等时,意为taskset中全部任务执行结束,也就是taskset结束。此时会将taskset移除出可调度队列。
重复上述过程直到taskset待调度列表为空。即全部做业(job)执行完成。
3.1 spark调度策略
上文任务调度时提到,在调度任务时,首前后依据调度策略对任务按优先级进行排序。下面就调度策略就行介绍。
Spark现有的调度策略有FIFO 及 Fair两种。采用何种调度策略由“spark.scheduler.mode”参数指定,默认为FIFO类型。
下小节进行分析……
……………………
文章出处:http://www.cnblogs.com/barrenlake/p/4550800.html
……………………