于Worker Actor于,每次LaunchExecutor这将建立一个CoarseGrainedExecutorBackend流程。Executor和CoarseGrainedExecutorBackend是1对1的关系。也就是说集群里启动多少Executor实例就有多少CoarseGrainedExecutorBackend进程。
html
那么到底是怎样分配Executor的呢?怎么控制调节Executor的个数呢?java
如下主要介绍一下Spark Executor分配策略:node
咱们仅看。当Application提交注冊到Master后,Master会返回RegisteredApplication,以后便会调用schedule()这种方法,来分配Driver的资源。和启动Executor的资源。git
schedule()方法是来调度当前可用资源的调度方法,它管理还在排队等待的Apps资源的分配。这种方法是每次在集群资源发生变更的时候都会调用,依据当前集群最新的资源来进行Apps的资源分配。github
// First schedule drivers, they take strict precedence over applications val shuffledWorkers = Random.shuffle(workers) // 把当前workers这个HashSet的顺序随机打乱 for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { //遍历活着的workers for (driver <- waitingDrivers) { //在等待队列中的Driver们会进行资源分配 if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { //当前的worker内存和cpu均大于当前driver请求的mem和cpu。则启动 launchDriver(worker, driver) //启动Driver 内部实现是发送启动Driver命令给指定Worker。Worker来启动Driver。 waitingDrivers -= driver //把启动过的Driver从队列移除 } } }
val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
这种才算是对App可用的Worker。apache
/** * Can an app use the given worker?True if the worker has enough memory and we haven't already * launched an executor for the app on it (right now the standalone backend doesn't like having * two executors on the same worker). */ def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = { worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app) }数组
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app // in the queue, then the second app, etc. if (spreadOutApps) { // Try to spread out each app among all the nodes, until it has all its cores for (app <- waitingApps if app.coresLeft > 0) { //对还未被全然分配资源的apps处理 val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(canUse(app, _)).sortBy(_.coresFree).reverse //依据core Free对可用Worker进行降序排序。 val numUsable = usableWorkers.length //可用worker的个数 eg:可用5个worker val assigned = new Array[Int](numUsable) //候选Worker,每个Worker一个下标,是一个数组,初始化默认都是0 var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)//还要分配的cores = 集群中可用Worker的可用cores总和(10)。 当前未分配core(5)中找最小的 var pos = 0 while (toAssign > 0) { if (usableWorkers(pos).coresFree - assigned(pos) > 0) { //以round robin方式在所有可用Worker里推断当前worker空暇cpu是否大于当前数组已经分配core值 toAssign -= 1 assigned(pos) += 1 //当前下标pos的Worker分配1个core +1 } pos = (pos + 1) % numUsable //round-robin轮询寻找有资源的Worker } // Now that we've decided how many cores to give on each node, let's actually give them for (pos <- 0 until numUsable) { if (assigned(pos) > 0) { //假设assigned数组中的值>0,将启动一个executor在。指定下标的机器上。 val exec = app.addExecutor(usableWorkers(pos), assigned(pos)) //更新app里的Executor信息 launchExecutor(usableWorkers(pos), exec) //通知可用Worker去启动Executor app.state = ApplicationState.RUNNING } } } } else {
。markdown
奉献当前Worker全部资源。(Ps:挨个榨干每个Worker的剩余资源。。。。app
)负载均衡
} else { // Pack each app into as few nodes as possible until we've assigned all its cores for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { for (app <- waitingApps if app.coresLeft > 0) { if (canUse(app, worker)) { //直接问当前worker是有空暇的core val coresToUse = math.min(worker.coresFree, app.coresLeft) //有则取。不管多少 if (coresToUse > 0) { //有 val exec = app.addExecutor(worker, coresToUse) //直接启动 launchExecutor(worker, exec) app.state = ApplicationState.RUNNING } } } } } }
二、针对同一个App。每个Worker里仅仅能有一个针对该App的Executor存在。切记。
假设想让整个App的Executor变多,设置SPARK_WORKER_INSTANCES。让Worker变多。
三、Executor的资源分配有2种策略:
3.一、SpreadOut :一种以round-robin方式遍历集群所有可用Worker。分配Worker资源。来启动建立Executor的策略,优势是尽量的将cores分配到各个节点。最大化负载均衡和高并行。
3.二、非SpreadOut:会尽量的依据每个Worker的剩余资源来启动Executor,这样启动的Executor可能仅仅在集群的一小部分机器的Worker上。这样作对node较少的集群还能够,集群规模大了。Executor的并行度和机器负载均衡就不能够保证了。
行文仓促,若有不正之处,请指出,欢迎讨论 :)
一、关于: 一个App一个Worker为何仅仅有赞成有针对该App的一个Executor 究竟这样设计为什么? 的讨论:
连城404:Spark是线程级并行模型。为何需要一个worker为一个app启动多个executor呢?
朴动_zju:一个worker相应一个executorbackend是从mesos那一套迁移过来的,mesos下也是一个slave一个executorbackend。我理解这里是可以实现起多个,但起多个貌似没什么优势,而且添加了复杂度。
CrazyJvm:@CodingCat 作了一个patch可以启动多个,但是尚未被merge。 从Yarn的角度考虑的话,一个Worker可以相应多个executorbackend,正如一个nodemanager相应多个container。 @OopsOutOfMemory
OopsOutOfMemory:回复@连城404: 假设一个executor太大且装的对象太多。会致使GC很是慢,多几个Executor会下降full gc慢的问题。 see this post http://t.cn/RP1bVO4(今天 11:25)
连城404:回复@OopsOutOfMemory:哦。这个考虑是有道理的。
一个workaround是单台机器部署多个worker。worker相对来讲比較便宜。
JerryLead:回复@OopsOutOfMemory:看来都还在变化其中,standalone 和 YARN 仍是有很是多不一样,咱们暂不下结论 (今天 11:35)
JerryLead:问题開始变得复杂了,是提升线程并行度仍是提升进程并行度?我想 Spark 仍是优先选择前者,这样 task 好管理。而且 broadcast,cache 的效率高些。后者有一些道理。但參数配置会变得更复杂,各有利弊吧 (今天 11:40)
未完待续。。
。
传送门:@JerrLead https://github.com/JerryLead/SparkInternals/blob/master/markdown/1-Overview.md
——EOF——
原创文章。转载请注明来自:http://blog.csdn.net/oopsoom/article/details/38763985