任何系统都须要提供监控功能,不然在运行期间发生一些异常时,咱们将会一筹莫展。也许有人说,能够增长日志来解决这个问题。日志只能解决你的程序逻辑在运行期的监控,进而发现Bug,以及提供对业务有帮助的调试信息。当你的JVM进程奔溃或者程序响应速度很慢时,这些日志将毫无用处。好在JVM提供了jstat、jstack、jinfo、jmap、jhat等工具帮助咱们分析,更有VisualVM的可视化界面以更加直观的方式对JVM运行期的情况进行监控。此外,像Tomcat、Hadoop等服务都提供了基于Web的监控页面,用浏览器能访问具备样式及布局,并提供丰富监控数据的页面无疑是一种简单、高效的方式。css
Spark天然也提供了Web页面来浏览监控数据,并且Master、Worker、Driver根据自身功能提供了不一样内容的Web监控页面。不管是Master、Worker,仍是Driver,它们都使用了统一的Web框架WebUI。Master、Worker及Driver分别使用MasterWebUI、WorkerWebUI及SparkUI提供的Web界面服务,后三者都继承自WebUI,并增长了个性化的功能。此外,在Yarn或Mesos模式下还有WebUI的另外一个扩展实现HistoryServer。HistoryServer将会展示已经运行完成的应用程序信息。本章以SparkUI为例,并深刻分析WebUI的框架体系。html
在大型分布式系统中,采用事件监听机制是最多见的。为何要使用事件监听机制?假如Spark UI采用Scala的函数调用方式,那么随着整个集群规模的增长,对函数的调用会愈来愈多,最终会受到Driver所在JVM的线程数量限制而影响监控数据的更新,甚至出现监控数据没法及时显示给用户的状况。因为函数调用多数状况下是同步调用,这就致使线程被阻塞,在分布式环境中,还可能由于网络问题,致使线程被长时间占用。将函数调用更换为发送事件,事件的处理是异步的,当前线程能够继续执行后续逻辑进而被快速释放。线程池中的线程还能够被重用,这样整个系统的并发度会大大增长。发送的事件会存入缓存,由定时调度器取出后,分配给监听此事件的监听器对监控数据进行更新。Spark UI就是这样的服务,它的构成如图1所示。web
图1 SparkUI的组成apache
图1展现了SparkUI中的各个组件,这里对这些组件做简单介绍:json
Spark UI构建在WebUI的框架体系之上,所以应当首先了解WebUI。WebUI定义了一种Web界面展示的框架,并提供返回Json格式数据的Web服务。WebUI用于展现一组标签页,WebUITab定义了标签页的规范。每一个标签页中包含着一组页面,WebUIPage定义了页面的规范。咱们将首先了解WebUIPage和WebUITab,最后从总体来看WebUI。后端
任何的Web界面每每由多个页面组成,每一个页面都将提供不一样的内容展现。WebUIPage是WebUI框架体系的页节点,定义了全部页面应当遵循的规范。抽象类WebUIPage的定义见代码清单1。api
代码清单1 WebUIPage的定义数组
private[spark] abstract class WebUIPage(var prefix: String) { def render(request: HttpServletRequest): Seq[Node] def renderJson(request: HttpServletRequest): JValue = JNothing }
WebUIPage定义了两个方法。浏览器
WebUIPage在WebUI框架体系中的上一级节点(也能够称为父亲)能够是WebUI或者WebUITab,其成员属性prefix将与上级节点的路径一块儿构成当前WebUIPage的访问路径。缓存
有时候Web界面须要将多个页面做为一组内容放置在一块儿,这时候标签页是常见的展示形式。标签页WebUITab定义了全部标签页的规范,并用于展示一组WebUIPage。抽象类WebUITab的定义见代码清单2。
代码清单2 WebUITab的定义
private[spark] abstract class WebUITab(parent: WebUI, val prefix: String) { val pages = ArrayBuffer[WebUIPage]() val name = prefix.capitalize def attachPage(page: WebUIPage) { page.prefix = (prefix + "/" + page.prefix).stripSuffix("/") pages += page } def headerTabs: Seq[WebUITab] = parent.getTabs def basePath: String = parent.getBasePath }
根据代码清单2,能够看到WebUITab有四个成员属性:
此外,WebUITab还有三个成员方法,下面介绍它们的做用:
WebUI是Spark实现的用于提供Web界面展示的框架,凡是须要页面展示的地方均可以继承它来完成。WebUI定义了WebUI框架体系的规范。为便于理解,首先明确WebUI中各个成员属性的含义:
了解了WebUI的成员属性,如今就能够理解其提供的各个方法了。WebUI提供的方法有:
代码清单3 attachHandler的实现
def attachHandler(handler: ServletContextHandler) { handlers += handler serverInfo.foreach(_.addHandler(handler)) }
代码清单4 detachHandler的实现
def detachHandler(handler: ServletContextHandler) { handlers -= handler serverInfo.foreach(_.removeHandler(handler)) }
代码清单5 attachPage的实现
def attachPage(page: WebUIPage) { val pagePath = "/" + page.prefix val renderHandler = createServletHandler(pagePath, (request: HttpServletRequest) => page.render(request), securityManager, conf, basePath) val renderJsonHandler = createServletHandler(pagePath.stripSuffix("/") + "/json", (request: HttpServletRequest) => page.renderJson(request), securityManager, conf, basePath) attachHandler(renderHandler) attachHandler(renderJsonHandler) val handlers = pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]()) handlers += renderHandler }
代码清单6 detachPage的实现
def detachPage(page: WebUIPage) { pageToHandlers.remove(page).foreach(_.foreach(detachHandler)) }
代码清单7 attachTab的实现
def attachTab(tab: WebUITab) { tab.pages.foreach(attachPage) tabs += tab }
代码清单8 detachTab的实现
def detachTab(tab: WebUITab) { tab.pages.foreach(detachPage) tabs -= tab }
代码清单9 addStaticHandler的实现
def addStaticHandler(resourceBase: String, path: String): Unit = { attachHandler(JettyUtils.createStaticHandler(resourceBase, path)) }
代码清单10 removeStaticHandler的实现
def removeStaticHandler(path: String): Unit = { handlers.find(_.getContextPath() == path).foreach(detachHandler) }
代码清单11 bind的实现
def bind() { assert(!serverInfo.isDefined, s"Attempted to bind $className more than once!") try { val host = Option(conf.getenv("SPARK_LOCAL_IP")).getOrElse("0.0.0.0") serverInfo = Some(startJettyServer(host, port, sslOptions, handlers, conf, name)) logInfo(s"Bound $className to $host, and started at $webUrl") } catch { case e: Exception => logError(s"Failed to bind $className", e) System.exit(1) } }
def webUrl: String = shttp://$publicHostName:$boundPort
def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)
代码清单12 stop方法的实现
def stop() { assert(serverInfo.isDefined, s"Attempted to stop $className before binding to a server!") serverInfo.get.stop() }
在SparkContext的初始化过程当中,会建立SparkUI。有了对WebUI的整体认识,如今是时候了解SparkContext是如何构造SparkUI的了。SparkUI是WebUI框架的使用范例,了解了SparkUI的建立过程,读者对MasterWebUI、WorkerWebUI及HistoryServer的建立过程也必然了然于心。建立SparkUI的代码以下:
_statusTracker = new SparkStatusTracker(this) _progressBar = if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) { Some(new ConsoleProgressBar(this)) } else { None } _ui = if (conf.getBoolean("spark.ui.enabled", true)) { Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener, _env.securityManager, appName, startTime = startTime)) } else { // For tests, do not enable the UI None } _ui.foreach(_.bind())
这段代码的执行步骤以下。
1) 建立Spark状态跟踪器SparkStatusTracker。
2) 建立ConsoleProgressBar。能够配置spark.ui.showConsoleProgress属性为false取消对ConsoleProgressBar的建立,此属性默认为true。
3) 调用SparkUI的createLiveUI方法建立SparkUI。
4) 给SparkUI绑定端口。SparkUI继承自WebUI,所以调用了代码清单4-12中WebUI的bind方法启动SparkUI底层的Jetty服务。
上述步骤中,第1)、2)、4)步都很简单,因此着重来分析第3)步。SparkUI的createLiveUI的实现以下。
def createLiveUI( sc: SparkContext, conf: SparkConf, listenerBus: SparkListenerBus, jobProgressListener: JobProgressListener, securityManager: SecurityManager, appName: String, startTime: Long): SparkUI = { create(Some(sc), conf, listenerBus, securityManager, appName, jobProgressListener = Some(jobProgressListener), startTime = startTime) }
能够看到SparkUI的createLiveUI方法中调用了create方法。create的实现以下。
private def create( sc: Option[SparkContext], conf: SparkConf, listenerBus: SparkListenerBus, securityManager: SecurityManager, appName: String, basePath: String = "", jobProgressListener: Option[JobProgressListener] = None, startTime: Long): SparkUI = { val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse { val listener = new JobProgressListener(conf) listenerBus.addListener(listener) listener } val environmentListener = new EnvironmentListener val storageStatusListener = new StorageStatusListener(conf) val executorsListener = new ExecutorsListener(storageStatusListener, conf) val storageListener = new StorageListener(storageStatusListener) val operationGraphListener = new RDDOperationGraphListener(conf) listenerBus.addListener(environmentListener) listenerBus.addListener(storageStatusListener) listenerBus.addListener(executorsListener) listenerBus.addListener(storageListener) listenerBus.addListener(operationGraphListener) new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener, executorsListener, _jobProgressListener, storageListener, operationGraphListener, appName, basePath, startTime) }
能够看到create方法里除了JobProgressListener是外部传入的以外,又增长了一些SparkListener,例如用于对JVM参数、Spark属性、Java系统属性、classpath等进行监控的EnvironmentListener;用于维护Executor的存储状态的StorageStatusListener;用于准备将Executor的信息展现在ExecutorsTab的ExecutorsListener;用于准备将Executor相关存储信息展现在BlockManagerUI的StorageListener;用于构建RDD的DAG(有向无关图)的RDDOperationGraphListener等。这5个SparkListener的实现添加到listenerBus的监听器列表中。最后使用SparkUI的构造器建立SparkUI。
调用SparkUI的构造器建立SparkUI,实际也是对SparkUI的初始化过程。在介绍初始化以前,先来看看SparkUI中的两个成员属性。
SparkUI的构造过程当中会执行initialize方法,其实现见代码清单13。
代码清单13 SparkUI的初始化
def initialize() { val jobsTab = new JobsTab(this) attachTab(jobsTab) val stagesTab = new StagesTab(this) attachTab(stagesTab) attachTab(new StorageTab(this)) attachTab(new EnvironmentTab(this)) attachTab(new ExecutorsTab(this)) attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath)) attachHandler(ApiRootResource.getServletHandler(this)) // These should be POST only, but, the YARN AM proxy won't proxy POSTs attachHandler(createRedirectHandler( "/jobs/job/kill", "/jobs/", jobsTab.handleKillRequest, httpMethods = Set("GET", "POST"))) attachHandler(createRedirectHandler( "/stages/stage/kill", "/stages/", stagesTab.handleKillRequest, httpMethods = Set("GET", "POST"))) } initialize()
根据代码清单13,SparkUI的初始化步骤以下。
1) 构建页面布局并给每一个WebUITab中的全部WebUIPage建立对应的ServletContextHandler。这一步使用了代码清单4-8中展现的attachTab方法。
2) 调用JettyUtils的createStaticHandler方法建立对静态目录org/apache/spark/ui/static提供文件服务的ServletContextHandler,并使用attachHandler方法追加到SparkUI的服务中。
3) 调用JettyUtils的createRedirectHandler方法建立几个将用户对源路径的请求重定向到目标路径的ServletContextHandler。例如,将用户对根路径"/"的请求重定向到目标路径"/jobs/"的ServletContextHandler。
SparkUI到底是如何实现页面布局及展现的? 因为全部标签页都继承了SparkUITab,因此咱们先来看看SparkUITab的实现:
private[spark] abstract class SparkUITab(parent: SparkUI, prefix: String) extends WebUITab(parent, prefix) { def appName: String = parent.getAppName }
根据上述代码,咱们知道SparkUITab继承了WebUITab,并在实现中增长了一个用于获取当前应用名称的方法appName。EnvironmentTab是用于展现JVM、Spark属性、系统属性、类路径等相关信息的标签页,因为其实现简单且能说明问题,因此本节挑选EnvironmentTab做为示例解答本节一开始提出的问题。
EnvironmentTab的实现见代码清单14。
代码清单14 EnvironmentTab的实现
private[ui] class EnvironmentTab(parent: SparkUI) extends SparkUITab(parent, "environment") { val listener = parent.environmentListener attachPage(new EnvironmentPage(this)) }
根据代码清单14,咱们知道EnvironmentTab引用了SparkUI的environmentListener(类型为EnvironmentListener),而且包含EnvironmentPage这个页面。EnvironmentTab经过调用attachPage方法将EnvironmentPage与Jetty服务关联起来。根据代码清单5中attachPage的实现,建立的renderHandler将采用偏函数(request: HttpServletRequest) => page.render(request) 处理请求,于是会调用EnvironmentPage的render方法。EnvironmentPage的render方法将会渲染页面元素。EnvironmentPage的实现见代码清单15。
代码清单15 EnvironmentPage的实现
private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") { private val listener = parent.listener private def removePass(kv: (String, String)): (String, String) = { if (kv._1.toLowerCase.contains("password") || kv._1.toLowerCase.contains("secret")) { (kv._1, "******") } else kv } def render(request: HttpServletRequest): Seq[Node] = { // 调用UIUtils的listingTable方法生成JVM运行时信息、Spark属性信息、系统属性信息、类路径信息的表格 val runtimeInformationTable = UIUtils.listingTable( propertyHeader, jvmRow, listener.jvmInformation, fixedWidth = true) val sparkPropertiesTable = UIUtils.listingTable( propertyHeader, propertyRow, listener.sparkProperties.map(removePass), fixedWidth = true) val systemPropertiesTable = UIUtils.listingTable( propertyHeader, propertyRow, listener.systemProperties, fixedWidth = true) val classpathEntriesTable = UIUtils.listingTable( classPathHeaders, classPathRow, listener.classpathEntries, fixedWidth = true) val content = <span> <h4>Runtime Information</h4> {runtimeInformationTable} <h4>Spark Properties</h4> {sparkPropertiesTable} <h4>System Properties</h4> {systemPropertiesTable} <h4>Classpath Entries</h4> {classpathEntriesTable} </span> // 调用UIUtils的headerSparkPage方法封装好css、js、header及页面布局等 UIUtils.headerSparkPage("Environment", content, parent) } // 定义JVM运行时信息、Spark属性信息、系统属性信息的表格头部propertyHeader和类路径信息的表格头部 // classPathHeaders private def propertyHeader = Seq("Name", "Value") private def classPathHeaders = Seq("Resource", "Source") // 定义JVM运行时信息的表格中每行数据的生成方法jvmRow private def jvmRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr> private def propertyRow(kv: (String, String)) = <tr><td>{kv._1}</td><td>{kv._2}</td></tr> private def classPathRow(data: (String, String)) = <tr><td>{data._1}</td><td>{data._2}</td></tr> }
根据代码清单15,EnvironmentPage的render方法利用从父节点EnvironmentTab中获得的EnvironmentListener中的统计监控数据生成JVM运行时、Spark属性、系统属性以及类路径等状态的摘要信息。以JVM运行时为例,页面渲染的步骤以下:
1) 定义JVM运行时信息、Spark属性信息、系统属性信息的表格头部propertyHeader和类路径信息的表格头部classPathHeaders。
2) 定义JVM运行时信息的表格中每行数据的生成方法jvmRow。
3) 调用UIUtils的listingTable方法生成JVM运行时信息、Spark属性信息、系统属性信息、类路径信息的表格。
4) 调用UIUtils的headerSparkPage方法封装好css、js、header及页面布局等。
UIUtils工具类的实现细节留给感兴趣的读者自行查阅,本文很少赘述。
[1]本节内容用到JettyUtils中的不少方法,读者能够在附录C中找到相应的实现与说明。
通过近一年的准备,基于Spark2.1.0版本的《Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:
纸质版售卖连接以下: