对于一个系统而言,首先考虑要知足一些业务场景,并实现功能。随着系统功能愈来愈多,代码量级愈来愈高,系统的可维护性、可测试性、性能都会成为新的挑战,这时监控功能就变得愈来愈重要了。在国内,绝大多数IT公司的项目都以业务为导向,以完成功能为目标,这些项目在立项、设计、开发、上线的各个阶段,不多有人会考虑到监控的问题。在国内,开发人员可以认真的在代码段落中打印日志,就已经属于最优秀的程序员了。然而,在国外的不少项目则不会这样,看看久负盛名的Hadoop的监控系统就可见一斑,尤为是在Facebook,更是把功能、日志以及监控列为同等重要,做为一个合格工程师的三驾马车。html
Spark做为优秀的开源系统,在监控方面也有本身的一整套体系。一个系统有了监控功能后将收获诸多益处,如可测试性、性能优化、运维评估、数据统计等。Spark的度量系统使用codahale提供的第三方度量仓库Metrics,本节将着重介绍Spark基于Metrics构建度量系统的原理与实现。对于Metrics感兴趣的读者,能够参考阅读《附录D Metrics简介》中的内容。前端
Spark的度量系统中有三个概念:程序员
为了更加直观的表现上述概念,咱们以图1来表示Spark中度量系统的工做流程。django
图1 度量系统的工做流程浏览器
任何监控都离不开度量数据的采集,离线的数据采集很容易作到和被采集模块之间的解耦,可是对于实时度量数据,尤为是那些内存中数据的采集就很难解耦。这就相似于网页监控数据的埋点同样,你要在网页中加入一段额外的js代码(例如Google分析,即使你只是引入一个js文件,这很难让前端工程师感到开心)。还有一类监控,好比在Java Web中增长一个负责监控的Servlet或者一个基于Spring3.0的拦截器,这种方式虽然将耦合度从代码级别下降到配置级别,但却没法有效的对内存中的数据结构进行监控。Spark的度量系统对系统功能来讲是在代码层面耦合的,这种牺牲对于可以换取对实时的、处于内存中的数据进行更有效的监控是值得的。性能优化
Spark将度量来源抽象为Source,其定义见代码清单1。服务器
代码清单1 度量源的定义前端工程师
private[spark] trait Source { def sourceName: String def metricRegistry: MetricRegistry }
Spark中有不少Source的具体实现,能够经过图2来了解。数据结构
图2 Source的继承体系架构
为了说明Source该如何实现,咱们选择ApplicationSource(也是由于其实现简单明了,足以说明问题)为例,其实现见代码清单2。
代码清单2 ApplicationSource的实现
private[master] class ApplicationSource(val application: ApplicationInfo) extends Source { override val metricRegistry = new MetricRegistry() override val sourceName = "%s.%s.%s".format("application", application.desc.name, System.currentTimeMillis()) metricRegistry.register(MetricRegistry.name("status"), new Gauge[String] { override def getValue: String = application.state.toString }) metricRegistry.register(MetricRegistry.name("runtime_ms"), new Gauge[Long] { override def getValue: Long = application.duration }) metricRegistry.register(MetricRegistry.name("cores"), new Gauge[Int] { override def getValue: Int = application.coresGranted }) }
望文生义,ApplicationSource用于采集Spark应用程序相关的度量。代码清单2中ApplicationSource重载了metricRegistry和sourceName,而且向自身的注册表注册了status(即应用状态,包括:WAITING, RUNNING, FINISHED, FAILED, KILLED, UNKNOWN)、runtime_ms(运行持续时长)、cores(受权的内核数)等度量。这三个度量的取值分别来自于ApplicationInfo的state、duration和coresGranted三个属性。这三个度量都由Gauge的匿名内部类实现,Gauge是Metrics提供的用于估计度量值的特质。有关Gauge、MetricRegistry、MetricRegistry注册度量的方法register及命名方法name的更详细介绍请阅读《附录D Metrics简介》。
Source准备好度量数据后,咱们就须要考虑如何输出和使用的问题。这里介绍一些常见的度量输出方式:阿里数据部门采用的一种度量使用方式就是输出到日志;在命令行运行过Hadoop任务(例如:mapreduce)的使用者也会发现控制台打印的内容中也包含度量信息;用户可能但愿将有些度量信息保存到文件(例如CSV),以便未来可以查看;若是以为使用CSV或者控制台等方式不够直观,还能够将采集到的度量数据输出到专用的监控系统界面。这些最终对度量数据的使用,或者说是输出方式,Spark将它们统一抽象为Sink。Sink的定义见代码清单3。
代码清单3 度量输出的定义
private[spark] trait Sink { def start(): Unit def stop(): Unit def report(): Unit }
从代码清单3能够看到Sink是一个特质,包含三个接口方法:
从这三个方法的解释来看,很难让读者得到更多的信息。咱们先把这些困惑放在一边,来看看Spark中Sink的类继承体系,如图3所示。
图3 Sink的类继承体系
图3中展现了6种Sink的具体实现。
了解了Sink的类继承体系,咱们挑选Slf4jSink做为Spark中Sink实现类的例子,来了解Sink具体该如何实现。Slf4jSink的实现见代码清单4。
代码清单4 Slf4jSink的实现
private[spark] class Slf4jSink( val property: Properties, val registry: MetricRegistry, securityMgr: SecurityManager) extends Sink { val SLF4J_DEFAULT_PERIOD = 10 val SLF4J_DEFAULT_UNIT = "SECONDS" val SLF4J_KEY_PERIOD = "period" val SLF4J_KEY_UNIT = "unit" val pollPeriod = Option(property.getProperty(SLF4J_KEY_PERIOD)) match { case Some(s) => s.toInt case None => SLF4J_DEFAULT_PERIOD } val pollUnit: TimeUnit = Option(property.getProperty(SLF4J_KEY_UNIT)) match { case Some(s) => TimeUnit.valueOf(s.toUpperCase()) case None => TimeUnit.valueOf(SLF4J_DEFAULT_UNIT) } MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) val reporter: Slf4jReporter = Slf4jReporter.forRegistry(registry) .convertDurationsTo(TimeUnit.MILLISECONDS) .convertRatesTo(TimeUnit.SECONDS) .build() override def start() { reporter.start(pollPeriod, pollUnit) } override def stop() { reporter.stop() } override def report() { reporter.report() } }
从Slf4jSink的实现能够看到Slf4jSink的start、stop及report实际都是代理了Metrics库中的Slf4jReporter的start、stop及report方法。Slf4jReporter的start方法实际是其父类ScheduledReporter的start实现。而传递的两个参数pollPeriod和pollUnit,正是被ScheduledReporter使用做为定时器获取数据的周期和时间单位。有关ScheduledReporter中start、stop及Slf4jReporter的report方法的实现能够参阅《附录D Metrics简介》。
通过近一年的准备,基于Spark2.1.0版本的《Spark内核设计的艺术 架构设计与实现》一书现已出版发行,图书如图:
纸质版售卖连接以下: