spark源码分析--spark的任务调度(standalone模式)

原创,转载请注明出处 http://baishuo491.iteye.com/blog/1994026 ,做者邮箱:vc_java@hotmail.com,新浪微博:爱看历史的码农--白硕

在sparkContext的创建过程当中(更细致的说是clientActor的preStart回调函数中),会向master发送RegisterApplication消息
master ! RegisterApplication(appDescription)
当master收到RegisterApplication请求后:
app = addApplication(description, sender)
.....
waitingApps += app
.....
schedule()
经过传入的(appDescription)和发送者,建立一个addApplication,而后把app加入到等待队列waitingApps中,以后再调用schedule函数进行调度
再来看看schedule的流程:
当等待队列waitingApps里有数据的时候,对waitingApps里的每一个元素app作以下操做:
从已经注册的works里面,选择合适的works列表usableWorkers
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE).filter(canUse(app, _)).sortBy(_.coresFree).reverse
从usableWorkers的空闲cpu中,选择合适数量的cpu,为当前app进行分配
这以后调用launchExecutor,并把app的状态设置为running
launchExecutor(usableWorkers(pos), exec, app.desc.sparkHome);
app.state = ApplicationState.RUNNING

再来看看launchExecutor这个函数
先在每一个传入的workerInfo参数里面,记录当前的app,和已经消耗的cpu和内存
接着向worker发送LaunchExecutor消息,向client发送ExecutorAdded消息
worker.addExecutor(exec)
worker.actor ! LaunchExecutor(
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome)
exec.application.driver ! ExecutorAdded(
exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)

咱们再来看看worker收到LaunchExecutor后的执行步骤:
建立一个实际执行任务的ExecutorRunner并启动它
更新已经使用的cpu和内存数量
而后向master发送ExecutorStateChanged消息
val manager = new ExecutorRunner(
appId, execId, appDesc, cores_, memory_, self, workerId, host, new File(execSparkHome_), workDir)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
memoryUsed += memory_
master ! ExecutorStateChanged(appId, execId, ExecutorState.RUNNING, None, None)

ExecutorRunner的start的函数,主要任务就是启动一个java后台线程,这个线程执行fetchAndRunExecutor函数,这个函数的主要流程以下:
拼接出一个字符串,其内容是用JAVA_HOME\bin\java命令,执行一个类StandaloneExecutorBackend
这个执行过程不在当前的jvm下执行。而是经过ProcessBuilder新建了一个jvm进程,单独执行的,至关于就是启动了一个StandaloneExecutorBackend实例
val command = buildCommandSeq()
val builder = new ProcessBuilder(command: _*).directory(executorDir)
val env = builder.environment()
for ((key, value) <- appDesc.command.environment) {
env.put(key, value)
}
......
process = builder.start()

看看StandaloneExecutorBackend作了什么:
建立了一个actor,在这个actor的在构造过程当中,就是向客户端的driver发送RegisterExecutor(executorId, hostPort, cores)消息,而后开始等待消息
上面发送的那个消息,会被driver端的StandaloneSchedulerBackend接受到
它回复一个[color=darkred]RegisteredExecutor(sparkProperties)[/color]消息,在修改了一系列的资源记录后,调用makeOffers()函数
makeOffers()的做用是在拼装好要执行的任务列表tasks以后,把一个重要的变量hasLaunchedTask设置成true(借助scheduler.resourceOffers函数)
这以后,向StandaloneExecutorBackend发送[color=blue]LaunchTask(task)[/color]消息
这里能够看到[color=darkred]RegisteredExecutor[/color]和[color=blue]LaunchTask[/color]消息都是发送给了StandaloneExecutorBackend,咱们来看看它收到消息后的动做
在StandaloneExecutorBackend收到[color=darkred]RegisteredExecutor(sparkProperties)[/color]消息后,会建立一个excutor变量,里面有建立了一个ThreadPoolExecutor线程池,名叫threadPool
当它再收到[color=blue]LaunchTask(task)[/color]消息以后,就经过executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)函数,调用刚刚建立的threadPool线程池,执行传入的task任务
threadPool.execute(new TaskRunner(context, taskId, serializedTask))
而TaskRunner就是这个任务真正被执行的地方