欢迎转载,转载请注明出处,徽沪一郎.java
WEB UI和Metrics子系统为外部观察监测Spark内部运行状况提供了必要的窗口,本文将简略的过一下其内部代码实现。web
先上图感觉一下spark webui 假设当前已经在本机运行standalone cluster模式,输入http://127.0.0.1:8080将会看到以下页面
json
driver application默认会打开4040端口进行http监听,能够看到application相关的详细信息设计模式
显示每一个stage的详细信息tomcat
本节要讨论的重点是http server是如何启动的,页面中的数据是从哪里获取到的?Spark中用到的http server是jetty, jetty采用java编写,是很是轻巧的servlet engine和http server。可以嵌入到用户程序中执行,不用像tomcat或jboss那样须要本身独立的jvm进程。app
SparkUI在SparkContext初始化的时候建立 jvm
// Initialize the Spark UI , registering all associated listeners private [spark] val ui = new SparkUI (this) ui.bind ()
initialize的主要工做是注册页面处理句柄,WebUI的子类须要实现本身的initialize函数ide
bind将真正启动jetty server.函数
def bind () { assert (! serverInfo .isDefined , " Attempted to bind % s more than once!". format ( className )) try { // 启 动 JettyServer serverInfo = Some( startJettyServer (" 0.0.0.0 ", port , handlers , conf)) logInfo (" Started %s at http ://%s:%d". format ( className , publicHostName , boundPort )) } catch { case e: Exception => logError (" Failed to bind %s". format ( className ) , e) System .exit (1) } }
在startJettyServer函数中将JettyServer运行起来的关键处理函数是connectoop
def connect(currentPort: Int): (Server, Int) = { val server = new Server(new InetSocketAddress(hostName, currentPort)) val pool = new QueuedThreadPool pool.setDaemon(true) server.setThreadPool(pool) server.setHandler(collection) Try { server.start() } match { case s: Success[_] => (server, server.getConnectors.head.getLocalPort) case f: Failure[_] => val nextPort = (currentPort + 1) % 65536 server.stop() pool.stop() val msg = s"Failed to create UI on port $currentPort. Trying again on port $nextPort." if (f.toString.contains("Address already in use")) { logWarning(s"$msg - $f") } else { logError(msg, f.exception) } connect(nextPort) } } val (server, boundPort) = connect(port) ServerInfo(server, boundPort, collection) }
页面中的数据是如何获取的呢,这就要归功于SparkListener了,典型的观察者设计模式。当有与stage及task相关的事件发生时,这些Listener都将收到通知,并进行数据更新。
须要指出的是,数据尽管得以自动更新,但页面并无,仍是须要手工刷新才能获得最新的数据。
上图显示的是SparkUI中注册了哪些SparkListener子类。来看一看这些子类是在何时注册进去的, 注意研究一下SparkUI.initialize函
def initialize() { listenerBus.addListener(storageStatusListener) val jobProgressTab = new JobProgressTab(this) attachTab(jobProgressTab) attachTab(new StorageTab(this)) attachTab(new EnvironmentTab(this)) attachTab(new ExecutorsTab(this)) attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) attachHandler(createRedirectHandler("/", "/stages", basePath = basePath)) attachHandler( createRedirectHandler("/stages/stage/kill", "/stages", jobProgressTab.handleKillRequest)) if (live) { sc.env.metricsSystem.getServletHandlers.foreach(attachHandler) } }
举一个实际例子来看看Notifier发送Event的时刻,好比有任务提交的时 resourceOffer->taskStarted->handleBeginEvent
private [ scheduler ] def handleBeginEvent (task: Task[_ ], taskInfo : TaskInfo ) { listenerBus .post( SparkListenerTaskStart (task. stageId , taskInfo )) submitWaitingStages () }
post实际上是向listenerBus的消息队列中添加一个消息,真正将消息发送 出去的时另外一个处理线程listenerThread
override def run (): Unit = Utils. logUncaughtExceptions { while (true) { eventLock . acquire () // Atomically remove and process this event LiveListenerBus .this. synchronized { val event = eventQueue .poll if (event == SparkListenerShutdown ) { // Get out of the while loop and shutdown the daemon thread return } Option (event). foreach ( postToAll ) } } }
Option(event).foreach(postToAll)负责将事件通知给各个Observer.postToAll的函数实现以下
def postToAll(event: SparkListenerEvent) { event match { case stageSubmitted: SparkListenerStageSubmitted => foreachListener(_.onStageSubmitted(stageSubmitted)) case stageCompleted: SparkListenerStageCompleted => foreachListener(_.onStageCompleted(stageCompleted)) case jobStart: SparkListenerJobStart => foreachListener(_.onJobStart(jobStart)) case jobEnd: SparkListenerJobEnd => foreachListener(_.onJobEnd(jobEnd)) case taskStart: SparkListenerTaskStart => foreachListener(_.onTaskStart(taskStart)) case taskGettingResult: SparkListenerTaskGettingResult => foreachListener(_.onTaskGettingResult(taskGettingResult)) case taskEnd: SparkListenerTaskEnd => foreachListener(_.onTaskEnd(taskEnd)) case environmentUpdate: SparkListenerEnvironmentUpdate => foreachListener(_.onEnvironmentUpdate(environmentUpdate)) case blockManagerAdded: SparkListenerBlockManagerAdded => foreachListener(_.onBlockManagerAdded(blockManagerAdded)) case blockManagerRemoved: SparkListenerBlockManagerRemoved => foreachListener(_.onBlockManagerRemoved(blockManagerRemoved)) case unpersistRDD: SparkListenerUnpersistRDD => foreachListener(_.onUnpersistRDD(unpersistRDD)) case applicationStart: SparkListenerApplicationStart => foreachListener(_.onApplicationStart(applicationStart)) case applicationEnd: SparkListenerApplicationEnd => foreachListener(_.onApplicationEnd(applicationEnd)) case SparkListenerShutdown => } }
在系统设计中,测量模块是不可或缺的组成部分。经过这些测量数据来感知系统的运行状况。
在Spark中,测量模块由MetricsSystem来担任,MetricsSystem中有三个重要的概念,分述以下。
Spark目前支持将测量数据保存或发送到以下目的地
下面从MetricsSystem的建立,数据源的添加,数据更新与发送几个方面来跟踪一下源码。
MetricsSystem依赖于由codahale提供的第三方库Metrics,能够在metrics.codahale.com找到更为详细的介绍。
以Driver Application为例,driver application首先会初始化SparkContext,在SparkContext的初始化过程当中就会建立MetricsSystem,具体调用关系以下。 SparkContext.init->SparkEnv.init->MetricsSystem.createMetricsSystem
注册数据源,继续以SparkContext为例
private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this) private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this) private def initDriverMetrics() { SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource) SparkEnv.get.metricsSystem.registerSource(blockManagerSource) } initDriverMetrics()
数据读取由Sink来完成,在Spark中建立的Sink子类以下图所示
读取最新的数据,以CsvSink为例,最主要的就是建立CsvReporter,启动以后会按期更新最近的数据到console。不一样类型的Sink所使用的Reporter是不同的。
val reporter: CsvReporter = CsvReporter.forRegistry(registry) .formatFor(Locale.US) .convertDurationsTo(TimeUnit.MILLISECONDS) .convertRatesTo(TimeUnit.SECONDS) .build(new File(pollDir)) override def start() { reporter.start(pollPeriod, pollUnit) }
Spark中关于metrics子系统的配置文件详见conf/metrics.properties. 默认的Sink是MetricsServlet,在任务提交执行以后,输入http://127.0.0.1:4040/metrics/json会获得以json格式保存的metrics信息。