Spark 源码解读之SparkContext建立过程源码解读 (Yarn Cluster模式)

spark版本 2.2 源码连接:github.com/apache/spar…java

  • 简介

    在 YARN-Cluster 模式中,当用户向 YARN 中提交一个应用程序后,YARN 将分两个阶段运行该 应用程序:第一个阶段是把 Spark 的 Driver 做为一个 ApplicationMaster 在 YARN 集群中先启 动;第二个阶段是由 ApplicationMaster 建立应用程序,而后为它向 ResourceManager 申请资 源,并启动 Executor 来运行 Task,同时监控它的整个运行过程,直到运行完成。node

  • 文字说明

    YARN-cluster 的工做流程分为如下几个步骤:git

    1. Spark Yarn Client 向 YARN 中提交应用程序,包括 ApplicationMaster 程序、启动 ApplicationMaster 的命令、须要在 Executor 中运行的程序等;
    2. ResourceManager 收到请求后,在集群中选择一个 NodeManager,为该应用程序分配第 一个 Container,要求它在这个 Container 中启动应用程序的 ApplicationMaster,其中 ApplicationMaster 进行 SparkContext 等的初始化;
    3. ApplicationMasterResourceManager 注册,这样用户能够直接经过 ResourceManage 查 看应用程序的运行状态,而后它将采用轮询的方式经过 RPC 协议为各个任务申请资源,并 监控它们的运行状态直到运行结束;
    4. 一旦 ApplicationMaster 申请到资源(也就是 Container)后,便与对应的 NodeManager 通 信 , 要 求 它 在 获 得 的 Container 中 启 动 启 动 CoarseGrainedExecutorBackendCoarseGrainedExecutorBackend 启动后会向 ApplicationMaster 中的 SparkContext 注册并申请 Task。这一点和 Standalone 模式同样,只不过 SparkContextSpark Application 中初始化时, 使用 CoarseGrainedSchedulerBackend 配合 YarnClusterScheduler 进行任务的调度,其中 YarnClusterScheduler 只是对 TaskSchedulerImpl 的一个简单包装;
    5. ApplicationMaster 中的 SparkContext 分配 Task 给 CoarseGrainedExecutorBackend执行, CoarseGrainedExecutorBackend 运行 Task 并向 ApplicationMaster 汇报运行的状态和进度,以 让 ApplicationMaster 随时掌握各个任务的运行状态,从而能够在任务失败时从新启动任务;
    6. 应用程序运行完成后,ApplicationMasterResourceManager 申请注销并关闭本身。
  • 图解

  • 源码解读

    1. 提交命令

      用户调用${SPARK_HOME}/bin/spark-submit 脚本提交命令,此时会执行SparkSubmit 类的main函数,启动主进程github

      exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
      复制代码
    2. 解析命令参数

      SparkSubmit 解析脚本参数, 脚本的 master 参数和deployMode 参数,若是是yarnCluster模式则会将接下来须要启动的类childMainClass设置为org.apache.spark.deploy.yarn.Client,而且将用户设置的启动类mainClass做为--class,--jars参数,含义为由先申请一个NodeManange做为ApplicationMaster,并在ApplicationMaster中启动Driver(用户设置的主类)web

      org.apache.spark.deploy.SparkSubmit 584-603
      // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
      if (isYarnCluster) {
        childMainClass = "org.apache.spark.deploy.yarn.Client"
        if (args.isPython) {
          childArgs += ("--primary-py-file", args.primaryResource)
          childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
        } else if (args.isR) {
          val mainFile = new Path(args.primaryResource).getName
          childArgs += ("--primary-r-file", mainFile)
          childArgs += ("--class", "org.apache.spark.deploy.RRunner")
        } else {
          if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
            childArgs += ("--jar", args.primaryResource)
          }
          childArgs += ("--class", args.mainClass)
        }
        if (args.childArgs != null) {
          args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
        }
      }
      复制代码
    3. 启动org.apache.spark.deploy.yarn.Client

      经过反射运行脚本参数解析后的childMainClassapache

      org.apache.spark.deploy.SparkSubmit#main
          119:case SparkSubmitAction.SUBMIT => submit(appArgs) 
      ->org.apache.spark.deploy.SparkSubmit:submit
          162:runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
      ->org.apache.spark.deploy.SparkSubmit#runMain
          // 注意这里的childMainClass是org.apache.spark.deploy.yarn.Client,这个Client会向Yarn的RM申请的一个NM,并在NM中启动AM
          // 而后由AM启动用户设置的mainClass参数
          712:mainClass = Utils.classForName(childMainClass)
          739:val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
          755:mainMethod.invoke(null, childArgs.toArray)
      复制代码

      org.apache.spark.deploy.yarn.Client 使用org.apache.hadoop.yarn.client.api.YarnClient向Yarn提交应用程序api

      org.apache.spark.deploy.yarn.Client#main
          1150:new Client(args, sparkConf).run()
      ->org.apache.spark.deploy.yarn.Client#run
          1091:this.appId = submitApplication()
      org.apache.spark.deploy.yarn.Client:submitApplication    
          161:val containerContext = createContainerLaunchContext(newAppResponse)
          174:yarnClient.submitApplication(appContext)
          
      其中appContext中封装了启动ApplicationMaster的命令
      org.apache.spark.deploy.yarn.Client#createContainerLaunchContext
          887:val userClass =
                  if (isClusterMode) {
                    Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))
                  } else {
                    Nil
                  }
          910:val amClass =
                  if (isClusterMode) {
                    Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster"). getName
                  } else {
                    Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
                  }        
          923: val amArgs =
                  Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs++
                  Seq("--properties-file", buildPath(Environment.PWD.$$(),LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
              val commands = prefixEnv ++
                  Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++javaOpts ++ amArgs ++
                  Seq(
                    "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
                    "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr") 
      复制代码
    4. ApplicationMaster&Driver启动

      根据参数以为是cluster模式仍是client模式,若是是cluster模式,则启动用户最初在执行spark-submit脚本设置的mainClass参数,启动用户进程,并等待SparkContext注入(正常状况下,用户进程会示例化SparkContext,SparkContext在实例化过程当中经过SchedulerBackend将本身注入到ApplicationMaster中,具体第6小节会介绍)bash

      org.apache.spark.deploy.yarn.ApplicationMaster#main
          763:master = new ApplicationMaster(amArgs, new YarnRMClient)
          System.exit(master.run())
      ->org.apache.spark.deploy.yarn.ApplicationMaster#run 
          253:if (isClusterMode) {
                runDriver(securityMgr)
              } else {
                runExecutorLauncher(securityMgr)
              }
      ->org.apache.spark.deploy.yarn.ApplicationMaster#runDriver
          // 启动用户进程
          394:userClassThread = startUserApplication()
          // 等待SparkContext注入
          401:val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
          Duration(totalWaitTime, TimeUnit.MILLISECONDS))  
      ->org.apache.spark.deploy.yarn.ApplicationMaster#startUserApplication
          629:val mainMethod = userClassLoader.loadClass(args.userClass).getMethod("main", classOf[Array[String]])
          635:mainMethod.invoke(null, userArgs.toArray)
      复制代码
    5. 建立SparkContext

      至此,已经完成了向RM申请第一个NM,并在其上启动ApplicationMaster和Driver进程的工做,接下来会运行用户设置的mainClass,当运行到new SparkContext的时候会建立SparkContext实例,在该过程当中会建立两个特别重要的对象taskScheduler和schedulerBackend,这两个对象协调配合将DAGScheduler建立的task集提交到Executor运行,并实时申请任务资源,监听task运行状态markdown

      用户程序中执行new SparkContext()命令
      org.apache.spark.SparkContext#new (371-583的try-cache代码段)
          397 // 设置jar包等基本信息
              _conf.set(DRIVER_HOST_ADDRESS,_conf.get(DRIVER_HOST_ADDRESS))
              _conf.setIfMissing("spark.driver.port", "0")
              _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
              _jars = Utils.getUserJars(_conf)
              _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
          428 // 建立LisenterBus 用户SparkUI的渲染
              _jobProgressListener = new JobProgressListener(_conf)
              listenerBus.addListener(jobProgressListener)
          432 // 建立SparkEnv
              _env = createSparkEnv(_conf, isLocal, listenerBus)
              SparkEnv.set(_env)
          501 // 建立taskScheduler和schedulerBackend
              val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
              _schedulerBackend = sched
              _taskScheduler = ts
              _dagScheduler = new DAGScheduler(this)
              _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
          540 // 建立动态资源分配管理器,具体会在第9小节介绍
              _executorAllocationManager =
                if (dynamicAllocationEnabled) {
                  schedulerBackend match {
                    case b: ExecutorAllocationClient =>
                      Some(new ExecutorAllocationManager(
                        schedulerBackend.asInstanceOf[ExecutorAllocationClient],     listenerBus, _conf))
                    case _ =>
                      None
                  }
                } else {
                  None
                }
              _executorAllocationManager.foreach(_.start())
      ->org.apache.spark.SparkContext#createTaskScheduler
          2757 // 这里若是是yarn模式 masterUrl是yarn,对应的ClusterManager是org.apache.spark.scheduler.cluster.YarnClusterManager
              case masterUrl =>
              val cm = getClusterManager(masterUrl) match {
                  case Some(clusterMgr) => clusterMgr
                  case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
              }
              try {
                // 对应的TaskScheduler 是 org.apache.spark.scheduler.cluster.YarnClusterScheduler
                val scheduler = cm.createTaskScheduler(sc, masterUrl)
                // 对应的backend是org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend
                val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
                cm.initialize(scheduler, backend)
                (backend, scheduler)
              } catch {
                case se: SparkException => throw se
                case NonFatal(e) =>
                  throw new SparkException("External scheduler cannot be instantiated", e)
              }
      复制代码
    6. 建立YarnClusterScheduler

      YarnClusterScheduler 主要是经过YarnScheduler继承了TaskSchedulerImpl(spark的主要实现),这里子类最重要的功能是将SparkContext注入到ApplicationMaster中(承接第五小节)app

      org.apache.spark.scheduler.cluster.YarnClusterManager#createTaskScheduler
          32: 经过不一样的模式启动不一样的子类
              override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
                  sc.deployMode match {
                    case "cluster" => new YarnClusterScheduler(sc)
                    case "client" => new YarnScheduler(sc)
                    case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
                  }
              }
      org.apache.spark.scheduler.cluster.YarnClusterScheduler#postStartHook
              private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) {
              
               logInfo("Created YarnClusterScheduler")
      
               override def postStartHook() {
                 
                 ApplicationMaster.sparkContextInitialized(sc)
                 super.postStartHook()
                 logInfo("YarnClusterScheduler.postStartHook done")
               }
      
              }
      复制代码
    7. 建立 YarnClusterSchedulerBackend

      YarnClusterSchedulerBackend 也是经过继承YarnSchedulerBackend继承了org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend(Spark实现的主类),这里子类最主要的功能是实现Spark向Yarn申请资源的桥接功能(第9小节详细介绍)和建立clientEndPoint和driverEndPoint用于通讯

      org.apache.spark.scheduler.cluster.YarnClusterManager#createSchedulerBackend
          // 根据不一样的模式实例化不一样的子类
          40:override def createSchedulerBackend(sc: SparkContext,
            masterURL: String,
            scheduler: TaskScheduler): SchedulerBackend = {
              sc.deployMode match {
                case "cluster" =>
                  new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
                case "client" =>
                  new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
                case  _ =>
                  throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' forYarn")
              }
          }
          
      -> org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend#start
          36:super.start()
      -> org.apache.spark.scheduler.cluster.YarnSchedulerBackend#start
          // 建立clientEndpoint
          52:private val yarnSchedulerEndpoint = new YarnSchedulerEndpoint(rpcEnv)
              -> org.apache.spark.scheduler.cluster.YarnSchedulerBackend.YarnSchedulerEndpoint#receive
                  case RegisterClusterManager(am) =>
                      logInfo(s"ApplicationMaster registered as $am")
                      // 这里注入amEndpoint,注入时机是第8小节中ApplicationMaster被注入SparkContext后
                      amEndpoint = Option(am)
                      if (!shouldResetOnAmRegister) {
                        shouldResetOnAmRegister = true
                      } else {
                        // AM is already registered before, this potentially means that      AM   failed and
                        // a new one registered after the failure. This will only    happen in   yarn-client mode.
                        reset()
                      }
          86:super.start()
          -> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#start
              // 建立driverEndpoint
              378:driverEndpoint = createDriverEndpointRef(properties)
      复制代码
    8. SparkContext注入ApplicationMaster后的后续操做

      第6小节最后,经过ApplicationMaster.sparkContextInitialized向ApplicationMaster注入了SparkContext,而后会激活第4小节ApplicationMaster的等待

      org.apache.spark.deploy.yarn.ApplicationMaster#sparkContextInitialized
          769:master.sparkContextInitialized(sc)
      ->org.apache.spark.deploy.yarn.ApplicationMaster#sparkContextInitialized
          326:sparkContextPromise.success(sc)
      ->org.apache.spark.deploy.yarn.ApplicationMaster#runDriver
          // 此时sc取到值,开始进行下一步操做
          401:val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
          Duration(totalWaitTime, TimeUnit.MILLISECONDS))
          if (sc != null) {
              rpcEnv = sc.env.rpcEnv
              // 这里建立AMEndpoint,并向org.apache.spark.scheduler.cluster.YarnSchedulerBackend注入,对应第7小节
              val driverRef = runAMEndpoint(
                sc.getConf.get("spark.driver.host"),
                sc.getConf.get("spark.driver.port"),
                isClusterMode = true)
              registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), securityMgr)
          }
      ->org.apache.spark.deploy.yarn.ApplicationMaster#runAMEndpoint
          387:amEndpoint =rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpoint, isClusterMode))
          681:override def onStart(): Unit = {
              driver.send(RegisterClusterManager(self))
          }
      ->org.apache.spark.deploy.yarn.ApplicationMaster#registerAM
          // 建立org.apache.spark.deploy.yarn.YarnAllocator
          // 这个类YarnAllocator负责从YARN ResourceManager请求容器,并肯定当YARN知足某些请求时如何处理容器
          359:allocator = client.register(driverUrl,
                  driverRef,
                  yarnConf,
                  _sparkConf,
                  uiAddress,
                  historyAddress,
                  securityMgr,
                  localResources)
          // 初始化申请资源
          368:allocator.allocateResources() 
          // 建立监听线程,在任务运行过程当中按需申请资源,线程内也会调用allocator.allocateResources()
          369:reporterThread = launchReporterThread()
      复制代码
    9. 申请资源

      申请资源主要由org.apache.spark.deploy.yarn.YarnAllocator 完成,由 org.apache.spark.scheduler.cluster.YarnSchedulerBackend 触发资源的申请,触发方式为使用org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RequestExecutors消息通讯来更新Executor需求数targetNumExecutors

      // 主要经过调用这个方法来申请NodeManager,并启动Executor
      org.apache.spark.deploy.yarn.YarnAllocator#allocateResources
          // 先经过这个方法获取须要申请的资源数
          260:updateResourceRequests()
              -> org.apache.spark.deploy.yarn.YarnAllocator#updateResourceRequests
                  297:val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
                  //在初始化的时候,targetNumExecutors为
                  //  spark.dynamicAllocation.minExecutors
                  //  spark.dynamicAllocation.maxExecutors
                  //  spark.dynamicAllocation.initialExecutors 的最大值
                  109: @volatile private var targetNumExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
                  275: val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)
                  
              -> org.apache.spark.util.Utils#getDynamicAllocationInitialExecutors
                  2538:val initialExecutors = Seq(
                      conf.get(DYN_ALLOCATION_MIN_EXECUTORS),
                      conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS),
                      conf.get(EXECUTOR_INSTANCES).getOrElse(0)).max
                      
                      
          276:handleAllocatedContainers(allocatedContainers.asScala)
              -> org.apache.spark.deploy.yarn.YarnAllocator#handleAllocatedContainers
                  442:runAllocatedContainers(containersToUse)
              -> org.apache.spark.deploy.yarn.YarnAllocator#runAllocatedContainers
                  511:new ExecutorRunnable(
                            Some(container),
                            conf,
                            sparkConf,
                            driverUrl,
                            executorId,
                            executorHostname,
                            executorMemory,
                            executorCores,
                            appAttemptId.getApplicationId.toString,
                            securityMgr,
                            localResources
                          ).run()
              -> org.apache.spark.deploy.yarn.ExecutorRunnable#run
                  65:startContainer()
              -> org.apache.spark.deploy.yarn.ExecutorRunnable#startContainer
                  // nodeManager上须要运行的命令
                  98:val commands = prepareCommand()
                  // 启动nodeManager
                  122:nmClient.startContainer(container.get, ctx)
              -> org.apache.spark.deploy.yarn.ExecutorRunnable#prepareCommand
                  201:val commands = prefixEnv ++
                      Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
                      javaOpts ++
                      // 这句话是重点,说明会启动这个类,具体在第10小节介绍
                      Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
                        "--driver-url", masterAddress,
                        "--executor-id", executorId,
                        "--hostname", hostname,
                        "--cores", executorCores.toString,
                        "--app-id", appId) ++
                      userClassPath ++
                      Seq(
                        s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
                        s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")
                        
      复制代码

      在第5小节建立SparkContext的时候有提到建立executorAllocationManager动态资源申请管理器,这个就是当spark.dynamicAllocation.enabled 设置为true的时候会生效,并在任务运行时动态申请资源

      org.apache.spark.ExecutorAllocationManager 经过调用org.apache.spark.ExecutorAllocationClientrequestTotalExecutors方法来申请资源

      org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend 实现了这个接口

      该类又经过调用子类org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend的doRequestTotalExecutors方法最终实现资源的申请

      org.apache.spark.SparkContext#new
          552:_executorAllocationManager.foreach(_.start())
      -> org.apache.spark.ExecutorAllocationManager#start
          222:val scheduleTask = new Runnable() {
                override def run(): Unit = {
                  try {
                    schedule()
                  } catch {
                    case ct: ControlThrowable =>
                      throw ct
                    case t: Throwable =>
                      logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
                  }
                }
              }
              // 建立一个定时任务类来调度资源
              executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
      -> org.apache.spark.ExecutorAllocationManager#schedule
          281: updateAndSyncNumExecutorsTarget(now)
      -> org.apache.spark.ExecutorAllocationManager#updateAndSyncNumExecutorsTarget
          // maxNumExecutorsNeeded 经过
          // listener.totalPendingTasks(在CoarseGrainedSchedulerBackend的261行触发更新)
          // listener.totalRunningTasks(在DAGScheduler的996行触发更新)
          // spark.executor.cores (用户自定义,默认1)计算而来,具体计算&更新方法待补充
          310: val maxNeeded = maxNumExecutorsNeeded
          331: val delta = addExecutors(maxNeeded)
          380: val addRequestAcknowledged = testing ||
        client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
          
      -> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#requestTotalExecutors
          548: doRequestTotalExecutors(numExecutors)
      
      ->org.apache.spark.scheduler.cluster.YarnSchedulerBackend#doRequestTotalExecutors
          // 构建org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RequestExecutors 消息 ,并使用clientEndpoint发送
          138:yarnSchedulerEndpointRef.ask[Boolean](prepareRequestExecutors(requestedTotal))
      
      ->org.apache.spark.scheduler.cluster.YarnSchedulerBackend.YarnSchedulerEndpoint#receiveAndReply 
          // 最终将RequestExecutors 消息转发给ApplicationMaster
          280:case r: RequestExecutors =>
                 amEndpoint match {
                   case Some(am) =>
                     am.ask[Boolean](r).andThen {
                       case Success(b) => context.reply(b)
                       case Failure(NonFatal(e)) =>
                         logError(s"Sending $r to AM was unsuccessful", e)
                         context.sendFailure(e)
                     }(ThreadUtils.sameThread)
                   case None =>
                     logWarning("Attempted to request executors before the AM has registered!")
                     context.reply(false)
                 }
      ->org.apache.spark.deploy.yarn.ApplicationMaster.AMEndpoint#receiveAndReply
          696: a.requestTotalExecutorsWithPreferredLocalities(r.requestedTotal,
                r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklist)
      -> org.apache.spark.deploy.yarn.YarnAllocator#requestTotalExecutorsWithPreferredLocalities
          // 最终修改了targetNumExecutors,这样当调用org.apache.spark.deploy.yarn.YarnAllocator#allocateResources的时候就会拿到不同的targetNumExecutors,而requestedTotal的源头是 org.apache.spark.ExecutorAllocationManager#schedule计算出来的
          218: targetNumExecutors = requestedTotal
      复制代码

      小疑问,org.apache.spark.SparkContext#requestExecutors方法也能够经过调用org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#requestExecutors进而申请资源,且该方法里的logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager")日志能够在Spark Driver日志里找到,但不知道谁调用但这个方法

    1. org.apache.spark.executor.CoarseGrainedExecutorBackend 类

      第8小节中,申请到资源后,在NodeManager中会启动org.apache.spark.executor.CoarseGrainedExecutorBackend 进程,该进程和org.apache.spark.executor.CoarseGrainedScheduleBackend是多对一关系,从名字是能够当作CoarseGrainedExecutorBackend是Executor的管理器,它会持有一个Executor对象,Executor会经过启动一个线程池来运行Task,这样整个流程就串起来了

      org.apache.spark.executor.CoarseGrainedExecutorBackend#main
          284:run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
      -> org.apache.spark.executor.CoarseGrainedExecutorBackend#run
          226:env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
          env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
      -> org.apache.spark.executor.CoarseGrainedExecutorBackend#onStart
          // 向CoarseGrainedSchedulerBackend注册本身
          63:ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
      -> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#receiveAndReply 
          // 检测没有问题,通知CoarseGrainedExecutorBackend建立Executor
          195: executorRef.send(RegisteredExecutor)
      -> org.apache.spark.executor.CoarseGrainedExecutorBackend#receive
          83: executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
      -> org.apache.spark.executor.Executor#new
          101:Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
      复制代码
    2. 至此SparkContext建立流程已经所有完毕,SparkContext建立成功后,就会开始执行用户代码,先构建RDD的逻辑关系图,而后遇到action会切分stage造成物理执行图,而后经过SparkContext.runJob执行,具体分析可见 github.com/JerryLead/S…

相关文章
相关标签/搜索