Sprk submit 过程

前言

折腾了好久,终于开始学习 Spark 的源码了,第一篇我打算讲一下 Spark 做业的提交过程。html

img

这个是 Spark 的 App 运行图,它经过一个 Driver 来和集群通讯,集群负责做业的分配。今天我要讲的是如何建立这个 Driver Program 的过程。web

做业提交方法以及参数

咱们先看一下用 Spark Submit 提交的方法吧,下面是从官方上面摘抄的内容。apache

# Run on a Spark standalone cluster
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://207.184.161.138:7077 \
  --executor-memory 20G \
  --total-executor-cores 100 \
  /path/to/examples.jar \
  1000
复制代码

这个是提交到 standalone 集群的方式,打开 spark-submit 这文件,咱们会发现它最后是调用了org.apache.spark.deploy.SparkSubmit 这个类。app

咱们直接进去看就好了,main 函数就几行代码,太节省了。框架

def main(args: Array[String]) {
    val appArgs = new SparkSubmitArguments(args)
    val (childArgs, classpath, sysProps, mainClass) = createLaunchEnv(appArgs)
    launch(childArgs, classpath, sysProps, mainClass, appArgs.verbose)
}
复制代码

咱们主要看看 createLaunchEnv 方法就能够了,launch 是反射调用 mainClass,精华全在 createLaunchEnv 里面了。dom

在里面我发现一些有用的信息,可能在官方文档上面都没有的,发出来你们瞅瞅。前面不带 -- 的能够在 spark-defaults.conf 里面设置,带 -- 的直接在提交的时候指定,具体含义你们一看就懂。tcp

val options = List[OptionAssigner](
      OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"),
      OptionAssigner(args.name, ALL_CLUSTER_MGRS, false, sysProp = "spark.app.name"),
      OptionAssigner(args.name, YARN, true, clOption = "--name", sysProp = "spark.app.name"),
      OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true,
        sysProp = "spark.driver.extraClassPath"),
      OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true,
        sysProp = "spark.driver.extraJavaOptions"),
      OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true,
        sysProp = "spark.driver.extraLibraryPath"),
      OptionAssigner(args.driverMemory, YARN, true, clOption = "--driver-memory"),
      OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"),
      OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"),
      OptionAssigner(args.queue, YARN, true, clOption = "--queue"),
      OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"),
      OptionAssigner(args.numExecutors, YARN, true, clOption = "--num-executors"),
      OptionAssigner(args.numExecutors, YARN, false, sysProp = "spark.executor.instances"),
      OptionAssigner(args.executorMemory, YARN, true, clOption = "--executor-memory"),
      OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, false,
        sysProp = "spark.executor.memory"),
      OptionAssigner(args.executorCores, YARN, true, clOption = "--executor-cores"),
      OptionAssigner(args.executorCores, YARN, false, sysProp = "spark.executor.cores"),
      OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, false,
        sysProp = "spark.cores.max"),
      OptionAssigner(args.files, YARN, false, sysProp = "spark.yarn.dist.files"),
      OptionAssigner(args.files, YARN, true, clOption = "--files"),
      OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"),
      OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"),
      OptionAssigner(args.archives, YARN, false, sysProp = "spark.yarn.dist.archives"),
      OptionAssigner(args.archives, YARN, true, clOption = "--archives"),
      OptionAssigner(args.jars, YARN, true, clOption = "--addJars"),
      OptionAssigner(args.jars, ALL_CLUSTER_MGRS, false, sysProp = "spark.jars")
 )
复制代码

Driver 程序的部署模式有两种,client 和 cluster,默认是 client。client 的话默认就是直接在本地运行了 Driver 程序了,cluster 模式还会兜一圈把做业发到集群上面去运行。分布式

指定部署模式须要用参数 --deploy-mode 来指定,或者在环境变量当中添加 DEPLOY_MODE 变量来指定。ide

下面讲的是 cluster 的部署方式,兜一圈的这种状况。函数

yarn 模式的话 mainClass 是 org.apache.spark.deploy.yarn.Client,standalone 的 mainClass 是org.apache.spark.deploy.Client。

此次咱们讲 org.apache.spark.deploy.Client,yarn 的话单独找一章出来单独讲,目前超哥仍是推荐使用 standalone 的方式部署 spark,具体缘由不详,听说是由于资源调度方面的问题。

说个快捷键吧,Ctrl+Shift+N,而后输入 Client 就能找到这个类,这是 IDEA 的快捷键,至关好使。

咱们直接找到它的 main 函数,发现了它竟然使用了 Akka 框架,我百度了一下,被它震惊了。

Akka

在 main 函数里面,主要代码就这么三行。

//建立一个ActorSystem
val (actorSystem, _) = AkkaUtils.createActorSystem("driverClient",Utils.localHostName(),0,
  conf, new SecurityManager(conf))
//执行ClientActor的preStart方法和receive方法
actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf))
//等待运行结束
actorSystem.awaitTermination()
复制代码

看了这里真的有点儿懵啊,这是啥玩意儿,不懂的朋友们,请点击这里 Akka。下面是它官方放出来的例子:

//定义一个case class用来传递参数
case class Greeting(who: String)
//定义Actor,比较重要的一个方法是receive方法,用来接收信息的
class GreetingActor extends Actor with ActorLogging {
   def receive = {
       case Greeting(who) ⇒ log.info("Hello " + who)
   }
}
//建立一个ActorSystem
val system = ActorSystem("MySystem")
//给ActorSystem设置Actor
val greeter = system.actorOf(Props[GreetingActor], name = "greeter")
//向greeter发送信息,用Greeting来传递
greeter ! Greeting("Charlie Parker")
复制代码

简直是无比强大啊,就这么几行代码就搞定了,接下来看你会更加震惊的。

咱们回到 Client 类当中,找到 ClientActor,它有两个方法,是以前说的 preStart 和 receive 方法,preStart 方法用于链接 master 提交做业请求,receive 方法用于接收从 master 返回的反馈信息。

咱们先看 preStart 方法吧。

override def preStart() = {
    // 这里须要把master的地址转换成akka的地址,而后经过这个akka地址得到指定的actor
    // 它的格式是"akka.tcp://%s@%s:%s/user/%s".format(systemName, host, port, actorName)
    masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master))
    // 把自身设置成远程生命周期的事件
    context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

    driverArgs.cmd match {
      case "launch" =>
        // 此处省略100个字
        val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
        // 此处省略100个字
        // 向master发送提交Driver的请求,把driverDescription传过去,RequestSubmitDriver前面说过了,是个case class
        masterActor ! RequestSubmitDriver(driverDescription)

      case "kill" =>
        val driverId = driverArgs.driverId
        val killFuture = masterActor ! RequestKillDriver(driverId)
    }
}
复制代码

从上面的代码看得出来,它须要设置 master 的链接地址,最后提交了一个 RequestSubmitDriver 的信息。在 receive 方法里面,就是等待接受回应了,有两个 Response 分别对应着这里的 launch 和 kill。

线索貌似到这里就断了,那下一步在哪里了呢?固然是在 Master 里面啦,怎么知道的,猜的,哈哈。

Master 也是继承了 Actor,在它的 main 函数里面找到了如下代码:

val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf, 
  securityManager = securityMgr)
val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort, securityMgr), actorName)
val timeout = AkkaUtils.askTimeout(conf)
val respFuture = actor.ask(RequestWebUIPort)(timeout)
val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse]
复制代码

和前面的 actor 基本一致,多了 actor.ask 这句话,查了一下官网的文档,这句话的意思的发送消息,而且接受一个 Future 做为 response,和前面的 actor ! message 的区别就是它还接受返回值。

具体的 Akka 的用法,你们仍是参照官网吧,Akka 确实如它官网所言的那样子,是一个简单、强大、并行的分布式框架。

小结:

Akka 的使用确实简单,短短的几行代码即刻完成一个通讯功能,比 Socket 简单不少。可是它也逃不脱咱们常说的那些东西,请求、接收请求、传递的消息、注册的地址和端口这些概念。

调度 schedule

咱们接下来查找 Master 的 receive 方法吧,Master 是做为接收方的,而不是主动请求,这点和 hadoop 是一致的。

case RequestSubmitDriver(description) => {
        val driver = createDriver(description)
        persistenceEngine.addDriver(driver)
        waitingDrivers += driver
        drivers.add(driver)
        // 调度
        schedule()
         // 告诉client,提交成功了,把driver.id告诉它
        sender ! SubmitDriverResponse(true, Some(driver.id), s"Driver successfully submitted as ${driver.id}")
      }
复制代码

这里咱们主要看 schedule 方法就能够了,它是执行调度的方法。

private def schedule() {
    if (state != RecoveryState.ALIVE) { return }

    // 首先调度Driver程序,从workers里面随机抽一些出来
    val shuffledWorkers = Random.shuffle(workers) 
    for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
      for (driver <- waitingDrivers) {
        // 判断内存和cpu够不够,够的就执行了哈
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
          launchDriver(worker, driver)
          waitingDrivers -= driver
        }
      }
    }

    // 这里是按照先进先出的,spreadOutApps是由spark.deploy.spreadOut参数来决定的,默认是true
    if (spreadOutApps) {
      // 遍历一下app
      for (app <- waitingApps if app.coresLeft > 0) {
        // canUse里面判断了worker的内存是否够用,而且该worker是否已经包含了该app的Executor
        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
          .filter(canUse(app, _)).sortBy(_.coresFree).reverse
        val numUsable = usableWorkers.length
        val assigned = new Array[Int](numUsable) 
        // 记录每一个节点的核心数
        var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
        var pos = 0
        // 遍历直到分配结束
        while (toAssign > 0) {
          // 从0开始遍历可用的work,若是可用的cpu减去已经分配的>0,就能够分配给它
          if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
            toAssign -= 1
            // 这个位置的work的可分配的cpu数+1
            assigned(pos) += 1
          }
          pos = (pos + 1) % numUsable
        }
        // 给刚才标记的worker分配任务
        for (pos <- 0 until numUsable) {
          if (assigned(pos) > 0) {
            val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
            launchExecutor(usableWorkers(pos), exec)
            app.state = ApplicationState.RUNNING
          }
        }
      }
    } else {
      // 这种方式和上面的方式的区别是,这种方式尽量用少许的节点来完成这个任务
      for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
        for (app <- waitingApps if app.coresLeft > 0) {
          // 判断条件是worker的内存比app须要的内存多
          if (canUse(app, worker)) {
            val coresToUse = math.min(worker.coresFree, app.coresLeft)
            if (coresToUse > 0) {
              val exec = app.addExecutor(worker, coresToUse)
              launchExecutor(worker, exec)
              app.state = ApplicationState.RUNNING
            }
          }
        }
      }
    }
  }
复制代码

它的调度器是这样的,先调度 Driver 程序,而后再调度 App,调度 App 的方式是从各个 worker 的里面和 App 进行匹配,看须要分配多少个 cpu。

那咱们接下来看两个方法 launchDriver 和 launchExecutor 便可。

def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
    logInfo("Launching driver " + driver.id + " on worker " + worker.id)
    worker.addDriver(driver)
    driver.worker = Some(worker)
    worker.actor ! LaunchDriver(driver.id, driver.desc)
    driver.state = DriverState.RUNNING
  }
复制代码

给 worker 发送了一个 LaunchDriver 的消息,下面在看 launchExecutor 的方法。

def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
    worker.addExecutor(exec)
    worker.actor ! LaunchExecutor(masterUrl,
      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
    exec.application.driver ! ExecutorAdded(
      exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
  }
复制代码

它要作的事情多一点,除了给 worker 发送 LaunchExecutor 指令外,还须要给 driver 发送 ExecutorAdded 的消息,说你的任务已经有人干了。

在继续 Worker 讲以前,咱们先看看它是怎么注册进来的,每一个 Worker 启动以后,会自动去请求 Master 去注册本身,具体咱们能够看 receive 的方法里面的 RegisterWorker 这一段,它须要上报本身的内存、Cpu、地址、端口等信息,注册成功以后返回 RegisteredWorker 信息给它,说已经注册成功了。

Worker 执行

一样的,咱们到 Worker 里面在 receive 方法找 LaunchDriver 和 LaunchExecutor 就能够找到咱们要的东西。

case LaunchDriver(driverId, driverDesc) => {
      logInfo(s"Asked to launch driver $driverId")
      val driver = new DriverRunner(driverId, workDir, sparkHome, driverDesc, self, akkaUrl)
      drivers(driverId) = driver
      driver.start()

      coresUsed += driverDesc.cores
      memoryUsed += driverDesc.mem
}
复制代码

看一下 start 方法吧,start 方法里面,实际上是 new Thread().start(),run 方法里面是经过传过来的 DriverDescription 构造的一个命令,丢给 ProcessBuilder 去执行命令,结束以后调用。

worker !DriverStateChanged 通知 worker,worker 再经过 master ! DriverStateChanged 通知 master,释放掉 worker 的 cpu 和内存。

同理,LaunchExecutor 执行完毕了,经过 worker ! ExecutorStateChanged 通知 worker,而后 worker 经过 master ! ExecutorStateChanged 通知 master,释放掉 worker 的 cpu 和内存。

下面咱们再梳理一下这个过程,只包括 Driver 注册,Driver 运行以后的过程在以后的文章再说,比较复杂。

一、Client 经过得到 Url 地址得到 ActorSelection(master 的 actor 引用), 而后经过 ActorSelection 给 Master 发送注册 Driver 请求(RequestSubmitDriver)

二、Master 接收到请求以后就开始调度了,从 workers 列表里面找出能够用的 Worker

三、经过 Worker 的 actor 引用 ActorRef 给可用的 Worker 发送启动 Driver 请求(LaunchDriver)

四、调度完毕以后,给 Client 回复注册成功消息 (SubmitDriverResponse)

五、Worker 接收到 LaunchDriver 请求以后,经过传过来的 DriverDescription 的信息构造出命令来,经过 ProcessBuilder 执行

六、ProcessBuilder 执行完命令以后,经过 DriverStateChanged 经过 Worker

七、Worker 最后把 DriverStateChanged 汇报给 Master

后记:听超哥说,org.apache.spark.deploy.Client 这个类快要被删除了,不知道 cluster 的这种模式是否是也被放弃了,官方给出来的例子推荐的是 client 模式 -> 直接运行程序。难怪在做业调度的时候,看到别的 actor 叫 driverActor。

不过这篇文章还有存在的意义, Akka 和调度这块,和我如今正在写的第三篇以及第四篇关系很密切。

转自

Spark提交过程

相关文章
相关标签/搜索