spark Master是spark集群的首脑,负责资源调度,任务分配,负载平衡等功能web
如下是master启动流程概述shell
经过shell进行对master进行启动apache
首先看一下启动脚本more start-master.sh jvm
此时咱们知道最终调用的是org.apache.spark.deploy.master.Mastertcp
这是Master源码:函数
private[spark] object Master extends Logging { val systemName = "sparkMaster" private val actorName = "Master" //master启动的入口 def main(argStrings: Array[String]) { SignalLogger.register(log) //建立SparkConf val conf = new SparkConf //保存参数到SparkConf val args = new MasterArguments(argStrings, conf) //建立ActorSystem和Actor val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf) //等待结束 actorSystem.awaitTermination() } /** * Returns an `akka.tcp://...` URL for the Master actor given a sparkUrl `spark://host:port`. * * @throws SparkException if the url is invalid */ def toAkkaUrl(sparkUrl: String, protocol: String): String = { val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl) AkkaUtils.address(protocol, systemName, host, port, actorName) } /** * Returns an akka `Address` for the Master actor given a sparkUrl `spark://host:port`. * * @throws SparkException if the url is invalid */ def toAkkaAddress(sparkUrl: String, protocol: String): Address = { val (host, port) = Utils.extractHostPortFromSparkUrl(sparkUrl) Address(protocol, systemName, host, port) } /** * Start the Master and return a four tuple of: * (1) The Master actor system * (2) The bound port * (3) The web UI bound port * (4) The REST server bound port, if any */ def startSystemAndActor( host: String, port: Int, webUiPort: Int, conf: SparkConf): (ActorSystem, Int, Int, Option[Int]) = { val securityMgr = new SecurityManager(conf) //利用AkkaUtils建立ActorSystem val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf, securityManager = securityMgr) //经过ActorSystem建立Actor -> actorSystem.actorOf, 就会执行Master的构造方法->而后执行生命周期方法 val actor = actorSystem.actorOf( Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName) val timeout = AkkaUtils.askTimeout(conf) val portsRequest = actor.ask(BoundPortsRequest)(timeout) val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse] (actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort) } }
最终会经过Master的main函数进行最jvm进程启动url