spark Master启动流程

 

spark Master是spark集群的首脑,负责资源调度,任务分配,负载平衡等功能web

 

 

 如下是master启动流程概述shell

经过shell进行对master进行启动apache

 

首先看一下启动脚本more start-master.sh jvm

此时咱们知道最终调用的是org.apache.spark.deploy.master.Mastertcp

 

这是Master源码:函数

private[spark] object Master extends Logging {
  val systemName = "sparkMaster"
  private val actorName = "Master"

  //master启动的入口
  def main(argStrings: Array[String]) {
    SignalLogger.register(log)
    //建立SparkConf
    val conf = new SparkConf
    //保存参数到SparkConf
    val args = new MasterArguments(argStrings, conf)
    //建立ActorSystem和Actor
    val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
    //等待结束
    actorSystem.awaitTermination()
  }

  /**
   * Returns an `akka.tcp://...` URL for the Master actor given a sparkUrl `spark://host:port`.
   *
   * @throws SparkException if the url is invalid
   */
  def toAkkaUrl(sparkUrl: String, protocol: String): String = {
    val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
    AkkaUtils.address(protocol, systemName, host, port, actorName)
  }

  /**
   * Returns an akka `Address` for the Master actor given a sparkUrl `spark://host:port`.
   *
   * @throws SparkException if the url is invalid
   */
  def toAkkaAddress(sparkUrl: String, protocol: String): Address = {
    val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl)
    Address(protocol, systemName, host, port)
  }

  /**
   * Start the Master and return a four tuple of:
   *   (1) The Master actor system
   *   (2) The bound port
   *   (3) The web UI bound port
   *   (4) The REST server bound port, if any
   */
  def startSystemAndActor(
      host: String,
      port: Int,
      webUiPort: Int,
      conf: SparkConf): (ActorSystem, Int, Int, Option[Int]) = {
    val securityMgr = new SecurityManager(conf)
    //利用AkkaUtils建立ActorSystem
    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,
      securityManager = securityMgr)
    //经过ActorSystem建立Actor -> actorSystem.actorOf, 就会执行Master的构造方法->而后执行生命周期方法
    val actor = actorSystem.actorOf(
      Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)
    val timeout = AkkaUtils.askTimeout(conf)
    val portsRequest = actor.ask(BoundPortsRequest)(timeout)
    val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse]
    (actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)
  }
}

最终会经过Master的main函数进行最jvm进程启动url

相关文章
相关标签/搜索