spark yarn cluster模式下任务提交和计算流程分析

spark能够运行在standalone,yarn,mesos等多种模式下,当前咱们用的最广泛的是yarn模式,在yarn模式下又分为client和cluster。本文接下来将分析yarn cluster下任务提交的过程。也就是回答,在yarn cluster模式下,任务是怎么提交的问题。在yarn cluster模式下,spark任务提交涉及四个角色(client, application, driver以及executor)之间的交互。接下来,将详细分析这四个角色在任务提交过程当中都作了那些事。apache

 

1,client流程

Step  1:咱们知道:在咱们写完任务准备向集群提交spark任务时,通常是调用bin下的spark-submit脚本进行任务的提交。在完成一些环境变量和参数的准备后,最终调用spark代码库中的SparkSubmit类。api

Step 2:在SparkSubmit的main函数中,经过submit,runMain而后经过YarnClusterApplication启动org.apache.spark.deploy.yarn.Client.app

Step 3:在Client中,经过main,run,而后在submitApplication中,利用yarnClient向ResourceManager提交新应用以启动ApplicationMaster,其中在yarn cluster模式下启动ApplicationMaster的类是函数

org.apache.spark.deploy.yarn.ApplicationMaster。 至此,client完成全部的工做。

2,ApplicationMaster流程

Step1:yarn分配container运行ApplicationMaster。经过main,run,runDriver,调用startUserApplication,新建线程,运行在spark-submit --class参数指定的应用类用户代码。ui

Step2:ApplicationMaster等待driver完成sparkContext的初始化后,获取driver的一个ref。调用registerAM函数,利用YarnRMClient向yarn申请资源运行executor。一旦获取到container资源,在yarnAllocator中,url

launcherPool线程池会将container,driver等相关信息封装成ExecutorRunnable对象,经过ExecutorRunnable启动新的container以运行executor。在次过程当中,指定启动executor的类是
org.apache.spark.executor.CoarseGrainedExecutorBackend。

3,Driver流程

在ApplicationMaster的步骤1中,会新建线程运行用户端代码,而且在完成sparkcontext的初始化,其中包括dagScheduler完成job stage的切分,每一个stage的任务转成化一系列的task,封装成taskset。交由taskScheduler去调用。因为这个过程比较复杂,并且很是的重要,准备稍后会单独对这个部分进行详细讲解。spa

4,Executor流程

在ApplicationMaster的步骤2中提到,新的container将会运行executor。在executor启动之后,会向driver发送RegisterExecutor消息告诉driver注册当前运行的executor。在driver端的CoarseGrainedSchedulerBackend中,能够看到对该消息的处理过程。在driver段感知到该消息后,driver将向executor发送RegisteredExecutor消息。executor和driver更多的细节,在稍后spark任务计算解析中,会将进行更详细的描述。线程

 

至此,client在完成使命后退出。其余三个部分也已启动起来。接下来将以spark example中的sparkPi例子来看看日常咱们写的spark任务是怎么计算的。调试

首先把sparkPi中的代码贴出来:对象

 

问题1:什么时候开始运行用户的中main函数?

在前文中 ApplicationMaster流程中第一步提到:yarn分配container运行ApplicationMaster。经过main,run,runDriver,调用startUserApplication,新建线程,运行在spark-submit --class参数指定的应用类用户代码。也就是说,在这一步将运行用户写入的代码。

问题2:上述代码具体都作了些啥?

1,在SparkSession...getOrCreate函数中主要作的事情是完成sparkContext的初始化,这其中主要包括DAGScheduler,TaskSchedule的初始化等。(注:在调试过程当中使用的standalone模式,而且加入extraJavaOption主要是为了便于调试executor的代码)。

2,上述代码的核心是sparkContext.parallelize(....).map(....).reduce。在parallelize函数中将新建ParallelCollectionRDD。在map中将新建MapPartitionsRDD。最后reduce是一个action(一个action对应一个Job),触发实际的计算。

3,在reduce函数中,经过调用sc.runJob->dagScheduler.runJob→submitJob提交JobSubmitted事件到DAGScheduler本身。而后调用handleJobSubmitted来处理Job提交。在handleJobSubmitted函数中,将建立ResultStage,而后根据shuffle将Job划分为不一样的stage。在本例中,因为没有shuffle,将只有一个stage。最终经过submitMissingTasks将stage中的task封装成taskset,交由taskschuduler(taskScheduler.submitTasks)进行task级别的调度。

4,在TaskSchdulerImpl的submitTasks中,能够看到taskset会被进一步封装成TasksetManager,加入到schedulableBuilder中(默认使用FIFO队列进行调度)。而后driver向本身发送ReviveOffers消息。driver接收到该信息后,若是发现有空闲的executor,将该Task序列后,发送LaunchTask消息给executor。让executor去执行。

5,executor处理LaunchTask消息的代码以下:

launchTask会将task信息TaskRunner,启用线程池运行。

6,在TaskRunner的run方法中,将运行

val res = task.run(
taskAttemptId = taskId,
attemptNumber = taskDescription.attemptNumber,
metricsSystem = env.metricsSystem)
threwException = false
而后调用runTask进行运行,有两种类型的Task(ShuffleMapTask,ResultTask),本例中将运行ResultTask中的runTask方法,而后在该方法中,调用用户传入的函数代码。

7,在TaskRunner的run方法中,在完成计算后,将调用execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult),该函数将向driver发送信息,告诉改Task已完成。

8,在driver端,若是任务正常结束,将调用taskResultGetter.enqueueSuccessfulTask。在该函数中,接着调用handleSuccessfulTask,最终DAGScheduler将向本身发送CompletionEvent事件,而后使用handleTaskCompletion来处理。若是任务正常结束,将经过

job.listener.taskSucceeded通知JobWaiter,JobWaiter完成任务结果的合并。在全部的JobWaiter中的Task都完成后,任务退出。
相关文章
相关标签/搜索