Spark源码之部署模式

Spark 多种部署模式,如Yarn,Standalone,Local等等。主节点启动deploy.master,从节点启动deploy.worker。数组

    1. Worker的主要流程
      1. 启动时

发送RegisterWorker消息给Master。若是master回复注册成功,则设置master,并启动心跳。最后将executors的状态报告给master。若是注册失败,则退出。app

      1. Worker处理消息

master给worker传递的消息:函数

消息名称spa

说明对象

RegisterWorkerResponse事件

回复给worker的注册结果消息,上面已说明内存

SendHeartbeatci

心跳,传递worker的状态资源

WorkDirCleanuprem

清除worker的工做目录

MasterChanged

更改master,worker从新设置,并将executors的状态从新通知给新的master

ReconnectWorker

从新注册worker

LaunchExecutor

启动新的executor,建立ExecutorRunner对象,并通知master

KillExecutor

杀死Executor

LaunchDriver

建立DriverRunner,启动

 

 

 

      1. Master处理消息

master消息主要是Executor、Worker、Application和Driver相关的消息:

消息名称

说明

ElectedLeader

选举leader

RegisterWorker

新的worker注册进来。注册成功则发送RegisteredWorker消息给worker

RegisterApplication

注册app,包含driver和描述信息等。从driver启动spark task。

只有StandaloneAppClient会发送RegisterApplication消息。

schedule

调度主方法

主要目的判断是否有worker知足启动executor的条件,若是知足,则发送LaunchExecutor消息给worker,同时发送ExecutorAdded消息给Driver。

ExecutorStateChanged

executor状态更新,有多是executor中止,须要维护状态,并判断application状态是否要更新,通知Driver该事件。

DriverStateChanged

判断是否调用RemoveDriver方法

Heartbeat

判断是否从新注册worker

RequestSubmitDriver

建立Driver,调用schedule开始调度。由deploy的submit过程提交该消息。

RequestExecutors

请求Executor,要通知指定的worker建立Executor?

KillExecutors

删除Executor

 

KillExecutor

删除Executor,从指定的worker删除executor,同时通知消息给对应worker。

private def killExecutor(exec: ExecutorDesc): Unit = {

    exec.worker.removeExecutor(exec)

    exec.worker.endpoint.send(KillExecutor(masterUrl, exec.application.id, exec.id))

    exec.state = ExecutorState.KILLED

  }

 

registerWorker

注册Worker,建立一个新的Worker对象,设置该Worker的cpus和内存,保存到master的数组中。

 

schedule

重要的调度函数。每次资源信息或者Executor状态等更新时触发该方法执行新的调度。

调度是指寻找等待的driver,为每一个等待的driver寻找空闲cpu和内存数量都知足该driver需求的worker,而后启动driver,向worker发送启动driver消息在该worker上启动driver,启动worker上的executor。

为每一个app,在可用的worker上分配executor数组。

private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {

    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)

    worker.addExecutor(exec)

    worker.endpoint.send(LaunchExecutor(masterUrl,

      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))

    exec.application.driver.send(

      ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))

  }

worker发送LaunchExecutor消息,同时向appdriver发送executorAdded消息。

注意有两个分配:一是driver的启动分配,须要判断workercpu和内存是否知足条件;二是executor的分配,也要判断worker的剩余cpu和内存是否知足。

相关文章
相关标签/搜索