Spark 多种部署模式,如Yarn,Standalone,Local等等。主节点启动deploy.master,从节点启动deploy.worker。数组
发送RegisterWorker消息给Master。若是master回复注册成功,则设置master,并启动心跳。最后将executors的状态报告给master。若是注册失败,则退出。app
master给worker传递的消息:函数
消息名称spa |
说明对象 |
RegisterWorkerResponse事件 |
回复给worker的注册结果消息,上面已说明内存 |
SendHeartbeatci |
心跳,传递worker的状态资源 |
WorkDirCleanuprem |
清除worker的工做目录 |
MasterChanged |
更改master,worker从新设置,并将executors的状态从新通知给新的master |
ReconnectWorker |
从新注册worker |
LaunchExecutor |
启动新的executor,建立ExecutorRunner对象,并通知master |
KillExecutor |
杀死Executor |
LaunchDriver |
建立DriverRunner,启动 |
|
|
master消息主要是Executor、Worker、Application和Driver相关的消息:
消息名称 |
说明 |
ElectedLeader |
选举leader |
RegisterWorker |
新的worker注册进来。注册成功则发送RegisteredWorker消息给worker |
RegisterApplication |
注册app,包含driver和描述信息等。从driver启动spark task。 只有StandaloneAppClient会发送RegisterApplication消息。 |
schedule |
调度主方法 主要目的判断是否有worker知足启动executor的条件,若是知足,则发送LaunchExecutor消息给worker,同时发送ExecutorAdded消息给Driver。 |
ExecutorStateChanged |
executor状态更新,有多是executor中止,须要维护状态,并判断application状态是否要更新,通知Driver该事件。 |
DriverStateChanged |
判断是否调用RemoveDriver方法 |
Heartbeat |
判断是否从新注册worker |
RequestSubmitDriver |
建立Driver,调用schedule开始调度。由deploy的submit过程提交该消息。 |
RequestExecutors |
请求Executor,要通知指定的worker建立Executor? |
KillExecutors |
删除Executor |
删除Executor,从指定的worker删除executor,同时通知消息给对应worker。
private def killExecutor(exec: ExecutorDesc): Unit = {
exec.worker.removeExecutor(exec)
exec.worker.endpoint.send(KillExecutor(masterUrl, exec.application.id, exec.id))
exec.state = ExecutorState.KILLED
}
注册Worker,建立一个新的Worker对象,设置该Worker的cpus和内存,保存到master的数组中。
重要的调度函数。每次资源信息或者Executor状态等更新时触发该方法执行新的调度。
调度是指寻找等待的driver,为每一个等待的driver寻找空闲cpu和内存数量都知足该driver需求的worker,而后启动driver,向worker发送启动driver消息在该worker上启动driver,启动worker上的executor。
为每一个app,在可用的worker上分配executor数组。
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
exec.application.driver.send(
ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}
向worker发送LaunchExecutor消息,同时向app的driver发送executorAdded消息。
注意有两个分配:一是driver的启动分配,须要判断worker的cpu和内存是否知足条件;二是executor的分配,也要判断worker的剩余cpu和内存是否知足。