延迟调度算法思想十分简单,为了实现data locality(即该task所需数据就在其运行的机器上),会尽可能将task分布到有其所需数据的机器或者jvm中去,若是机器或者jvm已被占用就进行延迟等待,直到该机器或者jvm能够运行该task或者超过等待时限则将task运行到其余机器上。
这个想法基于如下几点:
1.每每数据比程序要大得多,分布式上处理的数据都是GB为单位的,将程序放到数据所在机器去执行,大大减小网络传输时间。
2.在集群上面task通常都是运行时间较短的,即整个集群上面不断有task完成,释放其占用的资源,延迟调度的task可以有极大的机会得到分配。
总之,就是延迟调度节省的网络传输时间远远大于task等待花费的时间。
延迟调度的思想是相通的,本文讨论的是spark在yarn集群上的延迟调度状况,故分为两层,第一层是yarn的延迟调度,第二层则是spark内部的延迟调度。
1.yarn级别的Delay Scheduler
spark在yarn上面的Delay Scheduler其实就是以为spark的executor分配在哪些NodeManager上面,这是由yarn根据application的输入文件而定。尽可能将executor分布到有数据的NodeManager上。由于,在这一层上若是executor没法作到data locality,那么到了spark的级别分配task到executor的时候,更加没法实现data locality。
在yarn中配置yarn.scheduler.capacity.node-locality-delay配置延迟等待次数。(一般设置机架数量)。
2.spark内部Task的Delay Scheduler
这个级别的Delay Scheduler是面临的问题,是将task分到有数据的executor上去,上面已经说了,这一层次的Delay Scheduler依赖于yarn对executor的分配。另外,在运算过程当中,有task 的Delay Scheduler是由于咱们在spark中对数据进行了cache或者persist。在shuffle中是不用考虑Delay Scheduler的,由于shuffle中的read task 是须要去全部的write task的disk上拉取数据的,故也就不存在经过延迟调度来选择data locality的问题了。
在spark中会有3个配置项:
spark.locality.wait.process default 3000ms
spark.locality.wait.node default spark.locality.wait.process
spark.locality.wait.rack default spark.locality.wait.process
目前就还有最后一个问题,须要解决了,配置项该以什么标准进行配置?
在这篇论文中Delay Scheduling: A Simple Technique for AchievingLocality and Fairness in Cluster Scheduling 有一个详细的介绍,这里我直接给出公式:
Job等待一次task实现data locality所花的最长时间 W= (D/S)*T=D/(L*M) * T
D是实现Data Locality,须要延迟等待的次数
M是本次计算用到的集群节点数
L为每一个节点能用的core数量
S即为集群能用的总的core
N为本次job的task数量
R为文件的备份数量(HDFS默认为3)
λ为指望本次job达到的数据本地率
T为单个task运行所须要的时间。
经过上面两个公式,咱们就能计算出yarn和spark中的延迟调度项如何配置了。
(D则为yarn配置的延迟等待次数,W则为spark中配置中的等待时间。)node