spark版本 2.2 源码连接:github.com/apache/spar…java
在 YARN-Cluster 模式中,当用户向 YARN 中提交一个应用程序后,YARN 将分两个阶段运行该 应用程序:第一个阶段是把 Spark 的 Driver 做为一个 ApplicationMaster 在 YARN 集群中先启 动;第二个阶段是由 ApplicationMaster 建立应用程序,而后为它向 ResourceManager 申请资 源,并启动 Executor 来运行 Task,同时监控它的整个运行过程,直到运行完成。node
YARN-cluster 的工做流程分为如下几个步骤:git
Spark Yarn Client
向 YARN 中提交应用程序,包括 ApplicationMaster
程序、启动 ApplicationMaster
的命令、须要在 Executor
中运行的程序等;ResourceManager
收到请求后,在集群中选择一个 NodeManager
,为该应用程序分配第 一个 Container
,要求它在这个 Container
中启动应用程序的 ApplicationMaster
,其中 ApplicationMaster
进行 SparkContext
等的初始化;ApplicationMaster
向 ResourceManager
注册,这样用户能够直接经过 ResourceManage
查 看应用程序的运行状态,而后它将采用轮询的方式经过 RPC 协议为各个任务申请资源,并 监控它们的运行状态直到运行结束;ApplicationMaster
申请到资源(也就是 Container
)后,便与对应的 NodeManager
通 信 , 要 求 它 在 获 得 的 Container
中 启 动 启 动 CoarseGrainedExecutorBackend
,CoarseGrainedExecutorBackend
启动后会向 ApplicationMaster
中的 SparkContext
注册并申请 Task。这一点和 Standalone 模式同样,只不过 SparkContext
在 Spark Application
中初始化时, 使用 CoarseGrainedSchedulerBackend
配合 YarnClusterScheduler
进行任务的调度,其中 YarnClusterScheduler
只是对 TaskSchedulerImpl
的一个简单包装;ApplicationMaster
中的 SparkContext
分配 Task 给 CoarseGrainedExecutorBackend
执行, CoarseGrainedExecutorBackend
运行 Task 并向 ApplicationMaster
汇报运行的状态和进度,以 让 ApplicationMaster
随时掌握各个任务的运行状态,从而能够在任务失败时从新启动任务;ApplicationMaster
向 ResourceManager
申请注销并关闭本身。用户调用${SPARK_HOME}/bin/spark-submit
脚本提交命令,此时会执行SparkSubmit
类的main函数
,启动主进程github
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
复制代码
SparkSubmit 解析脚本参数, 脚本的 master 参数和deployMode 参数,若是是yarnCluster模式则会将接下来须要启动的类childMainClass
设置为org.apache.spark.deploy.yarn.Client
,而且将用户设置的启动类mainClass
做为--class,--jars参数
,含义为由先申请一个NodeManange做为ApplicationMaster,并在ApplicationMaster中启动Driver(用户设置的主类)web
org.apache.spark.deploy.SparkSubmit 584-603
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
if (args.isPython) {
childArgs += ("--primary-py-file", args.primaryResource)
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
} else if (args.isR) {
val mainFile = new Path(args.primaryResource).getName
childArgs += ("--primary-r-file", mainFile)
childArgs += ("--class", "org.apache.spark.deploy.RRunner")
} else {
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
childArgs += ("--jar", args.primaryResource)
}
childArgs += ("--class", args.mainClass)
}
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
}
}
复制代码
org.apache.spark.deploy.yarn.Client
经过反射运行脚本参数解析后的childMainClass
类apache
org.apache.spark.deploy.SparkSubmit#main
119:case SparkSubmitAction.SUBMIT => submit(appArgs)
->org.apache.spark.deploy.SparkSubmit:submit
162:runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
->org.apache.spark.deploy.SparkSubmit#runMain
// 注意这里的childMainClass是org.apache.spark.deploy.yarn.Client,这个Client会向Yarn的RM申请的一个NM,并在NM中启动AM
// 而后由AM启动用户设置的mainClass参数
712:mainClass = Utils.classForName(childMainClass)
739:val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
755:mainMethod.invoke(null, childArgs.toArray)
复制代码
org.apache.spark.deploy.yarn.Client
使用org.apache.hadoop.yarn.client.api.YarnClient
向Yarn提交应用程序api
org.apache.spark.deploy.yarn.Client#main
1150:new Client(args, sparkConf).run()
->org.apache.spark.deploy.yarn.Client#run
1091:this.appId = submitApplication()
org.apache.spark.deploy.yarn.Client:submitApplication
161:val containerContext = createContainerLaunchContext(newAppResponse)
174:yarnClient.submitApplication(appContext)
其中appContext中封装了启动ApplicationMaster的命令
org.apache.spark.deploy.yarn.Client#createContainerLaunchContext
887:val userClass =
if (isClusterMode) {
Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))
} else {
Nil
}
910:val amClass =
if (isClusterMode) {
Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster"). getName
} else {
Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
923: val amArgs =
Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs++
Seq("--properties-file", buildPath(Environment.PWD.$$(),LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
val commands = prefixEnv ++
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++javaOpts ++ amArgs ++
Seq(
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
复制代码
根据参数以为是cluster模式仍是client模式,若是是cluster模式,则启动用户最初在执行spark-submit脚本设置的mainClass参数,启动用户进程,并等待SparkContext注入(正常状况下,用户进程会示例化SparkContext,SparkContext在实例化过程当中经过SchedulerBackend将本身注入到ApplicationMaster中,具体第6小节会介绍)bash
org.apache.spark.deploy.yarn.ApplicationMaster#main
763:master = new ApplicationMaster(amArgs, new YarnRMClient)
System.exit(master.run())
->org.apache.spark.deploy.yarn.ApplicationMaster#run
253:if (isClusterMode) {
runDriver(securityMgr)
} else {
runExecutorLauncher(securityMgr)
}
->org.apache.spark.deploy.yarn.ApplicationMaster#runDriver
// 启动用户进程
394:userClassThread = startUserApplication()
// 等待SparkContext注入
401:val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
->org.apache.spark.deploy.yarn.ApplicationMaster#startUserApplication
629:val mainMethod = userClassLoader.loadClass(args.userClass).getMethod("main", classOf[Array[String]])
635:mainMethod.invoke(null, userArgs.toArray)
复制代码
至此,已经完成了向RM申请第一个NM,并在其上启动ApplicationMaster和Driver进程的工做,接下来会运行用户设置的mainClass,当运行到new SparkContext的时候会建立SparkContext实例
,在该过程当中会建立两个特别重要的对象taskScheduler和schedulerBackend
,这两个对象协调配合将DAGScheduler建立的task集提交到Executor运行,并实时申请任务资源,监听task运行状态markdown
用户程序中执行new SparkContext()命令
org.apache.spark.SparkContext#new (371-583的try-cache代码段)
397 // 设置jar包等基本信息
_conf.set(DRIVER_HOST_ADDRESS,_conf.get(DRIVER_HOST_ADDRESS))
_conf.setIfMissing("spark.driver.port", "0")
_conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
_jars = Utils.getUserJars(_conf)
_files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
428 // 建立LisenterBus 用户SparkUI的渲染
_jobProgressListener = new JobProgressListener(_conf)
listenerBus.addListener(jobProgressListener)
432 // 建立SparkEnv
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)
501 // 建立taskScheduler和schedulerBackend
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
540 // 建立动态资源分配管理器,具体会在第9小节介绍
_executorAllocationManager =
if (dynamicAllocationEnabled) {
schedulerBackend match {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
case _ =>
None
}
} else {
None
}
_executorAllocationManager.foreach(_.start())
->org.apache.spark.SparkContext#createTaskScheduler
2757 // 这里若是是yarn模式 masterUrl是yarn,对应的ClusterManager是org.apache.spark.scheduler.cluster.YarnClusterManager
case masterUrl =>
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
}
try {
// 对应的TaskScheduler 是 org.apache.spark.scheduler.cluster.YarnClusterScheduler
val scheduler = cm.createTaskScheduler(sc, masterUrl)
// 对应的backend是org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
cm.initialize(scheduler, backend)
(backend, scheduler)
} catch {
case se: SparkException => throw se
case NonFatal(e) =>
throw new SparkException("External scheduler cannot be instantiated", e)
}
复制代码
YarnClusterScheduler 主要是经过YarnScheduler继承了TaskSchedulerImpl(spark的主要实现),这里子类最重要的功能是将SparkContext注入到ApplicationMaster中(承接第五小节)app
org.apache.spark.scheduler.cluster.YarnClusterManager#createTaskScheduler
32: 经过不一样的模式启动不一样的子类
override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
sc.deployMode match {
case "cluster" => new YarnClusterScheduler(sc)
case "client" => new YarnScheduler(sc)
case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
}
}
org.apache.spark.scheduler.cluster.YarnClusterScheduler#postStartHook
private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) {
logInfo("Created YarnClusterScheduler")
override def postStartHook() {
ApplicationMaster.sparkContextInitialized(sc)
super.postStartHook()
logInfo("YarnClusterScheduler.postStartHook done")
}
}
复制代码
YarnClusterSchedulerBackend 也是经过继承YarnSchedulerBackend继承了org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend(Spark实现的主类),这里子类最主要的功能是实现Spark向Yarn申请资源的桥接功能(第9小节详细介绍)和建立clientEndPoint和driverEndPoint用于通讯
org.apache.spark.scheduler.cluster.YarnClusterManager#createSchedulerBackend
// 根据不一样的模式实例化不一样的子类
40:override def createSchedulerBackend(sc: SparkContext,
masterURL: String,
scheduler: TaskScheduler): SchedulerBackend = {
sc.deployMode match {
case "cluster" =>
new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
case "client" =>
new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
case _ =>
throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' forYarn")
}
}
-> org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend#start
36:super.start()
-> org.apache.spark.scheduler.cluster.YarnSchedulerBackend#start
// 建立clientEndpoint
52:private val yarnSchedulerEndpoint = new YarnSchedulerEndpoint(rpcEnv)
-> org.apache.spark.scheduler.cluster.YarnSchedulerBackend.YarnSchedulerEndpoint#receive
case RegisterClusterManager(am) =>
logInfo(s"ApplicationMaster registered as $am")
// 这里注入amEndpoint,注入时机是第8小节中ApplicationMaster被注入SparkContext后
amEndpoint = Option(am)
if (!shouldResetOnAmRegister) {
shouldResetOnAmRegister = true
} else {
// AM is already registered before, this potentially means that AM failed and
// a new one registered after the failure. This will only happen in yarn-client mode.
reset()
}
86:super.start()
-> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#start
// 建立driverEndpoint
378:driverEndpoint = createDriverEndpointRef(properties)
复制代码
第6小节最后,经过ApplicationMaster.sparkContextInitialized向ApplicationMaster注入了SparkContext,而后会激活第4小节ApplicationMaster的等待
org.apache.spark.deploy.yarn.ApplicationMaster#sparkContextInitialized
769:master.sparkContextInitialized(sc)
->org.apache.spark.deploy.yarn.ApplicationMaster#sparkContextInitialized
326:sparkContextPromise.success(sc)
->org.apache.spark.deploy.yarn.ApplicationMaster#runDriver
// 此时sc取到值,开始进行下一步操做
401:val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
if (sc != null) {
rpcEnv = sc.env.rpcEnv
// 这里建立AMEndpoint,并向org.apache.spark.scheduler.cluster.YarnSchedulerBackend注入,对应第7小节
val driverRef = runAMEndpoint(
sc.getConf.get("spark.driver.host"),
sc.getConf.get("spark.driver.port"),
isClusterMode = true)
registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), securityMgr)
}
->org.apache.spark.deploy.yarn.ApplicationMaster#runAMEndpoint
387:amEndpoint =rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpoint, isClusterMode))
681:override def onStart(): Unit = {
driver.send(RegisterClusterManager(self))
}
->org.apache.spark.deploy.yarn.ApplicationMaster#registerAM
// 建立org.apache.spark.deploy.yarn.YarnAllocator
// 这个类YarnAllocator负责从YARN ResourceManager请求容器,并肯定当YARN知足某些请求时如何处理容器
359:allocator = client.register(driverUrl,
driverRef,
yarnConf,
_sparkConf,
uiAddress,
historyAddress,
securityMgr,
localResources)
// 初始化申请资源
368:allocator.allocateResources()
// 建立监听线程,在任务运行过程当中按需申请资源,线程内也会调用allocator.allocateResources()
369:reporterThread = launchReporterThread()
复制代码
申请资源主要由org.apache.spark.deploy.yarn.YarnAllocator
完成,由 org.apache.spark.scheduler.cluster.YarnSchedulerBackend
触发资源的申请,触发方式为使用org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RequestExecutors
消息通讯来更新Executor需求数targetNumExecutors
// 主要经过调用这个方法来申请NodeManager,并启动Executor
org.apache.spark.deploy.yarn.YarnAllocator#allocateResources
// 先经过这个方法获取须要申请的资源数
260:updateResourceRequests()
-> org.apache.spark.deploy.yarn.YarnAllocator#updateResourceRequests
297:val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
//在初始化的时候,targetNumExecutors为
// spark.dynamicAllocation.minExecutors
// spark.dynamicAllocation.maxExecutors
// spark.dynamicAllocation.initialExecutors 的最大值
109: @volatile private var targetNumExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
275: val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)
-> org.apache.spark.util.Utils#getDynamicAllocationInitialExecutors
2538:val initialExecutors = Seq(
conf.get(DYN_ALLOCATION_MIN_EXECUTORS),
conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS),
conf.get(EXECUTOR_INSTANCES).getOrElse(0)).max
276:handleAllocatedContainers(allocatedContainers.asScala)
-> org.apache.spark.deploy.yarn.YarnAllocator#handleAllocatedContainers
442:runAllocatedContainers(containersToUse)
-> org.apache.spark.deploy.yarn.YarnAllocator#runAllocatedContainers
511:new ExecutorRunnable(
Some(container),
conf,
sparkConf,
driverUrl,
executorId,
executorHostname,
executorMemory,
executorCores,
appAttemptId.getApplicationId.toString,
securityMgr,
localResources
).run()
-> org.apache.spark.deploy.yarn.ExecutorRunnable#run
65:startContainer()
-> org.apache.spark.deploy.yarn.ExecutorRunnable#startContainer
// nodeManager上须要运行的命令
98:val commands = prepareCommand()
// 启动nodeManager
122:nmClient.startContainer(container.get, ctx)
-> org.apache.spark.deploy.yarn.ExecutorRunnable#prepareCommand
201:val commands = prefixEnv ++
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
javaOpts ++
// 这句话是重点,说明会启动这个类,具体在第10小节介绍
Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
"--driver-url", masterAddress,
"--executor-id", executorId,
"--hostname", hostname,
"--cores", executorCores.toString,
"--app-id", appId) ++
userClassPath ++
Seq(
s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")
复制代码
在第5小节建立SparkContext的时候有提到建立executorAllocationManager动态资源申请管理器,这个就是当spark.dynamicAllocation.enabled
设置为true的时候会生效,并在任务运行时动态申请资源
org.apache.spark.ExecutorAllocationManager
经过调用org.apache.spark.ExecutorAllocationClient
的requestTotalExecutors
方法来申请资源
而org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
实现了这个接口
该类又经过调用子类org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend
的doRequestTotalExecutors方法最终实现资源的申请
org.apache.spark.SparkContext#new
552:_executorAllocationManager.foreach(_.start())
-> org.apache.spark.ExecutorAllocationManager#start
222:val scheduleTask = new Runnable() {
override def run(): Unit = {
try {
schedule()
} catch {
case ct: ControlThrowable =>
throw ct
case t: Throwable =>
logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
}
}
}
// 建立一个定时任务类来调度资源
executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
-> org.apache.spark.ExecutorAllocationManager#schedule
281: updateAndSyncNumExecutorsTarget(now)
-> org.apache.spark.ExecutorAllocationManager#updateAndSyncNumExecutorsTarget
// maxNumExecutorsNeeded 经过
// listener.totalPendingTasks(在CoarseGrainedSchedulerBackend的261行触发更新)
// listener.totalRunningTasks(在DAGScheduler的996行触发更新)
// spark.executor.cores (用户自定义,默认1)计算而来,具体计算&更新方法待补充
310: val maxNeeded = maxNumExecutorsNeeded
331: val delta = addExecutors(maxNeeded)
380: val addRequestAcknowledged = testing ||
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
-> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#requestTotalExecutors
548: doRequestTotalExecutors(numExecutors)
->org.apache.spark.scheduler.cluster.YarnSchedulerBackend#doRequestTotalExecutors
// 构建org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RequestExecutors 消息 ,并使用clientEndpoint发送
138:yarnSchedulerEndpointRef.ask[Boolean](prepareRequestExecutors(requestedTotal))
->org.apache.spark.scheduler.cluster.YarnSchedulerBackend.YarnSchedulerEndpoint#receiveAndReply
// 最终将RequestExecutors 消息转发给ApplicationMaster
280:case r: RequestExecutors =>
amEndpoint match {
case Some(am) =>
am.ask[Boolean](r).andThen {
case Success(b) => context.reply(b)
case Failure(NonFatal(e)) =>
logError(s"Sending $r to AM was unsuccessful", e)
context.sendFailure(e)
}(ThreadUtils.sameThread)
case None =>
logWarning("Attempted to request executors before the AM has registered!")
context.reply(false)
}
->org.apache.spark.deploy.yarn.ApplicationMaster.AMEndpoint#receiveAndReply
696: a.requestTotalExecutorsWithPreferredLocalities(r.requestedTotal,
r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklist)
-> org.apache.spark.deploy.yarn.YarnAllocator#requestTotalExecutorsWithPreferredLocalities
// 最终修改了targetNumExecutors,这样当调用org.apache.spark.deploy.yarn.YarnAllocator#allocateResources的时候就会拿到不同的targetNumExecutors,而requestedTotal的源头是 org.apache.spark.ExecutorAllocationManager#schedule计算出来的
218: targetNumExecutors = requestedTotal
复制代码
小疑问,org.apache.spark.SparkContext#requestExecutors
方法也能够经过调用org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#requestExecutors
进而申请资源,且该方法里的logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager")
日志能够在Spark Driver日志里找到,但不知道谁调用但这个方法
第8小节中,申请到资源后,在NodeManager中会启动org.apache.spark.executor.CoarseGrainedExecutorBackend
进程,该进程和org.apache.spark.executor.CoarseGrainedScheduleBackend
是多对一关系,从名字是能够当作CoarseGrainedExecutorBackend是Executor的管理器,它会持有一个Executor对象,Executor会经过启动一个线程池来运行Task,这样整个流程就串起来了
org.apache.spark.executor.CoarseGrainedExecutorBackend#main
284:run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
-> org.apache.spark.executor.CoarseGrainedExecutorBackend#run
226:env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
-> org.apache.spark.executor.CoarseGrainedExecutorBackend#onStart
// 向CoarseGrainedSchedulerBackend注册本身
63:ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
-> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#receiveAndReply
// 检测没有问题,通知CoarseGrainedExecutorBackend建立Executor
195: executorRef.send(RegisteredExecutor)
-> org.apache.spark.executor.CoarseGrainedExecutorBackend#receive
83: executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
-> org.apache.spark.executor.Executor#new
101:Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
复制代码
至此SparkContext建立流程已经所有完毕,SparkContext建立成功后,就会开始执行用户代码,先构建RDD的逻辑关系图,而后遇到action会切分stage造成物理执行图,而后经过SparkContext.runJob执行,具体分析可见 github.com/JerryLead/S…