Spark Job的提交与task本地化分析(源码阅读八)

  咱们又都知道,Spark中任务的处理也要考虑数据的本地性(locality),Spark目前支持PROCESS_LOCAL(本地进程)NODE_LOCAL(本地节点)NODE_PREF、RACK_LOCAL(本地机架)ANY(任何)几种。其余都很好理解,NODE_LOCAL会在spark日志中执行拉取数据所执行的task时,打印出来,由于Spark是移动计算,而不是移动数据的嘛。node

  那么什么是NODE_PREF数组

  当Driver应用程序刚刚启动,Driver分配得到的Executor极可能尚未初始化,因此有一部分任务的本地化级别被设置为NO_PREF.若是是ShuffleRDD,其本地性始终为NO_PREF。这两种本地化级别是NO_PREF的状况,在任务分配时会被优先分配到非本地节点执行,达到必定的优化效果。缓存

  那么下来咱们从job的任务提交开始玩起~dom

   

  getMissingParentStages方法用来找到Stage的全部不可用的父Stage.从代码能够到这里是个递归的调用,submitWaitingStages实际上循环waitingStages中的stage并调用submitStaghe:函数

  

  那么下来开始提交task,提交task的入口是submitMissingTasks,此函数在Stage没有可用的父stage时,被调用处理当前Stage未提交的任务。源码分析

  一、那么在没有父stage时,会首先调用paendingPartitions.clear 用于清空pendingTasks.因为当前Stage的任务刚开始提交,因此须要清空,便于记录须要计算的任务。性能

  二、将当前Stage加入运行中的Stage集合,是用HashSet进行构造的。优化

  三、找出位计算的partition,若是Stage是map任务,那么outputLocs中partition对应的List为Nil,说明此partition还未计算。若是Stage不是map任务,那么须要获取stage的finalJob,调用finished方法判断每一个partition的任务是否完成。spa

  

  四、而后经过stage.makeNewStageAttemp,使用StageInfo.fromStage方法建立当前Stage的_latestInfo:3d

  

  五、若是是Stage Map任务,那么序列化Stage的RDD及ShuffleDependency,若是Stage不是map任务,那么序列化Stage的RDD及resultOfJob的处理函数。最终这些序列化获得的字节数组须要用sc.broadcast进行广播。

  

  六、最后,建立全部Task、当前stage的id、jobId等信息建立TaskSet,并调用taskScheduler的submitTasks,批量提交Stage及其全部Task.

  

  有可能同时有多个任务提交,因此就有了调度策略FIFO,那么下来调用LocalBackendreviveOffers方法,向local-Actor发送ReviveOffers消息。localActor对ReviveOffers消息的匹配执行reviveOffers方法。调用TaskSchedulerImpl的resourceOffers方法分配资源,最后调用Executor的launchTask方法运行任务。

  

  同时你会发现,这里有段代码,shuffleOffers = Random.shuffle(offers),是为了计算资源的分配与计算,对全部WorkerOffer随机洗牌,避免将任务老是分配给一样的WorkerOffer。

  好了,知道了整个流程,下来咱们来看一下本地化问题:

  

  myLocalityLevles:当前TaskSetManager容许使用的本地化级别。那么这里的computeValidLocalityLevels方法是用于计算有效的本地化缓存级别。若是存在Executor中的有待执行的任务,且PROCESS_LOCAL本地化的等待时间不为0,且存在Executor已被激活,那么容许的本地化级别里包括PROCESS_LOCAL.

  

  这里又发现新大陆,获取各个本地化级别的等待时间。

  spark.locality.wait 本地化级别的默认等待时间 

  spark.locality.wait.process 本地进程的等待时间

  spark.locality.wait.node 本地节点的等待时间

  spark.locality.wait.rack 本地机架的等待时间

  这些参数呢,在任务的运行很长且数量不少的状况下,适当调高这些参数能够显著提升性能,然而当这些参数值都已经超过任务的运行时长时,则须要调小这些参数。任何任务都但愿被分配到能够从本地读取数据的节点上以获得最大的性能提高,然而每一个任务的运行时长时不可预计的。当一个任务在分配时,若是没有知足最佳本地化(PROCESS_LOCAL)的资源时,若是执拗的期盼获得最佳的资源,极可能被已经占用最佳资源可是运行时间很长的任务耽误,因此这些代码实现了当没有最佳本地化时,选择稍差点的资源。

 

参考文献:《深刻理解Spark:核心思想与源码分析》

相关文章
相关标签/搜索