Flink系列之Metrics

 Flink是一个针对流数据和批处理的分布式处理引擎,近两年才真正的频繁出如今数据处理领域 。其实Flink在2014年就已经成为ASF(Apache Software Foundation)的顶级项目之一,也许以前是被spark掩盖了光芒,spark在数据处理上的优点不能否认,可是我的通过对spark和flink的源码研读和项目实战后,更偏心flink一些。在实时计算方面,相对于spark的微批处理(micro batch),flink的数据处理方式更真正称得上流处理,不久前release的spark 2.3目前也已经提供了相似flink流处理的方式,可是目前尚未通过大型互联网公司的实战验证,不知道其线上表现如何,咱们能够拭目以待;除了流处理的方式之外,flink在内存管理,网络传输方面也颇有其独特之处,另外spark SQL和flink SQL相比,在代码层面上,flink作了简单的封装后直接利用了calcite的API,让SQL变的再也不那么的神秘,更便于咱们自定义语义,定制咱们本身的SQL语句;Flink基于其状态机制提供的CEP(Complex Event Processing)Library可让咱们在流处理过程匹配出咱们定义的事件组合。这些我以后都会在flink系列里一一作其原理说明和代码解读。
 
回到此篇文章的标题:flink-metrics。为何要以这篇文章做为系列的开头,开头不该该是flink的原理和组件说明此类的文章吗?是的,本应该是从浅入深的去开启flink系列,可是个人考虑有两点:其一,此文章是以试水为目的,想了解一下你们的兴趣点在哪,若是你们大部分是身经百战的流计算实践者,那flink小白入门篇的文章貌似不用再去赘述了。但若是你们对flink很感兴趣,可是对其基本原理和概念不是很了解,那我系列第二篇会补上。其二,目前我本身的项目须要,对metrics看的多了些,想记录一下,也算是作个回顾吧。

 

Flink  Metrics指任务在flink集群中运行过程当中的各项指标,包括机器系统指标:Hostname,CPU,Memory,Thread,GC,NetWork,IO 和 任务运行组件指标:JobManager,TaskManager,Job, Task,Operater相关指标。Flink提供metrics的目的有两点:第一,实时采集metrics的数据供flink UI进行数据展现,用户能够在页面上看到本身提交任务的状态,延迟等信息。第二,对外提供metrics收集接口,用户能够将整个fllink集群的metrics经过MetricsReport上报至第三方系统进行存储,展现和监控。第二种对大型的互联网公司颇有用,通常他们的集群规模比较大,不可能经过flink UI进行全部任务的展现,因此就经过metrics上报的方式进行dashboard的展现,同时存储下来的metrics能够用于监控报警,更进一步来讲,能够用历史数据进行数据挖掘产生更大的价值。Flink原生的提供了几种主流的第三方上报方式:JMXReporter,GangliaReport,GraphiteReport等,用户能够直接配置使用。

 

Flink Metrics是经过引入com.codahale.metrics包实现的,它将收集的metrics分为四大类:Counter,Gauge,Histogram和Meter下面分别说明:
  • Counter  计数器 ,用来统计一个metrics的总量。拿flink中的指标来举例,像Task/Operator中的numRecordsIn(此task或者operator接收到的record总量)和numRecordsOut(此task或者operator发送的record总量)就属于Counter。
  • Gauge   指标值  , 用来记录一个metrics的瞬间值。拿flink中的指标举例,像JobManager或者TaskManager中的JVM.Heap.Used就属于Gauge,记录某个时刻JobManager或者TaskManager所在机器的JVM堆使用量。
  • Histogram  直方图, 有的时候咱们不知足于只拿到metrics的总量或者瞬时值,当想获得metrics的最大值,最小值,中位数等信息时,咱们就能用到Histogram了。Flink中属于Histogram的指标不多,可是最重要的一个是属于operator的latency。此项指标会记录数据处理的延迟信息,对任务监控起到很重要的做用。
  • Meter  平均值, 用来记录一个metrics某个时间段内平均值。flink中相似指标有task/operator中的numRecordsInPerSecond,字面意思就能够理解,指的是此task或者operator每秒接收的记录数。
 
Metrics代码解析
那Flink代码中是怎样对metrics进行收集的呢(具体代码在flink-runtime的metrics包里)。下面咱们就来按步骤说明:
  1. flink中先会定义好ScopeFormat,scopeFormat定义了各种组件metrics_group的范围,而后各个组件(JobManager,TaskManager,Operator等)都会继承ScopeFormat类进行各自的format实现。
  2. 然后开始定义各个组件的metricsGroup。每一个group中定义属于这个组件中全部的metrics。好比TaskIOMetricGroup类,就定义了task执行过程当中有关IO的metrics。
  3. 定义好各个metricsGroup后,在初始化各个组件的时候,会将相应的metricsGroup当作参数放入构造函数中进行初始化。咱们拿JobManager为例来讲:                             class JobManager(protected val flinkConfiguration: Configuration,
    protected val futureExecutor: ScheduledExecutorService, protected val ioExecutor: Executor, protected val instanceManager: InstanceManager, protected val scheduler: FlinkScheduler, protected val blobServer: BlobServer, protected val libraryCacheManager: BlobLibraryCacheManager, protected val archive: ActorRef, protected val restartStrategyFactory: RestartStrategyFactory, protected val timeout: FiniteDuration, protected val leaderElectionService: LeaderElectionService, protected val submittedJobGraphs : SubmittedJobGraphStore, protected val checkpointRecoveryFactory : CheckpointRecoveryFactory, protected val jobRecoveryTimeout: FiniteDuration, protected val jobManagerMetricGroup: JobManagerMetricGroup, protected val optRestAddress: Option[String])
    初始化JobManager的时候带上了JobManagerMetricGroup,后面此类在preStart()方法中调用了instantiateMetrics(jobManagerMetricGroup),咱们再看instantiateMetrics方法内容:   
  1.  private def instantiateMetrics(jobManagerMetricGroup: MetricGroup) : Unit = {
    jobManagerMetricGroup.gauge[Long, Gauge[Long]]("taskSlotsAvailable", new Gauge[Long] { override def getValue: Long = JobManager.this.instanceManager.getNumberOfAvailableSlots }) jobManagerMetricGroup.gauge[Long, Gauge[Long]]("taskSlotsTotal", new Gauge[Long] { override def getValue: Long = JobManager.this.instanceManager.getTotalNumberOfSlots }) jobManagerMetricGroup.gauge[Long, Gauge[Long]]("numRegisteredTaskManagers", new Gauge[Long] { override def getValue: Long = JobManager.this.instanceManager.getNumberOfRegisteredTaskManagers }) jobManagerMetricGroup.gauge[Long, Gauge[Long]]("numRunningJobs", new Gauge[Long] { override def getValue: Long = JobManager.this.currentJobs.size }) } 在instantiateMetrics方法内,把相应的metrics都加入到了jobManagerMetricGroup中,这样就创建了metrics和metrics_group的映射关系。
  2.  随后,在各个组件中实例化MetricRegistryImpl,而后利用MetricRegistry的startQueryService方法启动metrics查询服务(本质上是启动相应的Akka Actor)
  3. 最后,利用flink的原生reporter(主要是上文介绍的三种方式)和MetricRegistry创建联系,这样report里就能够拿出全部采集到的metrics,进而将metrics发往第三方系统。
 
Metrics配置
当咱们了解了flink metrics的具体实现步骤后,那就是上手操做了,怎样配置才能让metrics生效呢?接下来就介绍一下配置步骤:
  • flink目录下有一个conf的文件夹,conf下有一个flink-conf.yaml文件,全部的flink有关配置都在这里进行。
  • 配置metrics_scope,metrics_scope指定metrics上报时的组合方式。一共有6个scope须要配置:                                                    metrics.scope.jm      配置JobManager相关metrics,默认格式为 <host>.jobmanager                                                   metrics.scope.jm.job   配置JobManager上Job的相关metrics,默认格式为 <host>.jobmanager.<job_name>
               metrics.scope.tm         配置TaskManager上相关metrics,默认格式为  <host>.taskmanager.<tm_id>     
               metrics.scope.tm.job   配置TaskManager上Job相关metrics,默认格式为 <host>.taskmanager.<tm_id>.<job_name>
               metrics.scope.task   配置Task相关metrics,默认为 <host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>
              metrics.scope.operator  配置Operator相关metrics,默认格式 为 <host>.taskmanager.<tm_id>.<job_name>.            <operator_name>.<subtask_index>
          以上6种scope能够根据用户意愿改变组合方式,例如 metrics.scope.operator,我能够改为  <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>,修改后,接收到的operator的metrics就会是以下格式:<host>.<job_name>.<task_name>.<operator_name>.<subtask_index>.xxx =  xxxx(若是所有用默认,则能够不须要在文件里配置,源码里已经指定了默认值)
  • 配置Report,Report相关配置根据其不一样的实现类有所不一样,我就用项目目前使用的GraphiteReport为例来讲明:
          metrics.reporters: grph
          metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter
          metrics.reporter.grph.host: xxx
          metrics.reporter.grph.port: xxx
          metrics.reporter.grph.protocol: TCP/UDP
      metrics.reporters指定report的名称,metrics.reporter.grph.class指定具体的MetricsReport实现类,metrics.reporter.grph.host指定远端graphite主机ip,metrics.reporter.grph.port指定远端graphite监听端口,metrics.reporter.grph.protocol指定graphite利用的协议。
  • 最后保存文件,重启flink集群便可生效
若是咱们不使用flink原生的MetricsReport,想本身实现定制的Report能够吗?答案是确定的,用户能够参照GraphiteReporter类,自定义类继承 ScheduledDropwizardReporter类,重写report方法便可。咱们如今除了利用GraphiteReport,也本身定义了KafkaReport上报定制的metrics来知足更多的用户需求。
 
总结
Flink的metrics在整个项目里是不可或缺的,社区里Jira也常常有人提出各类improvement,要加入这样那样的metrics,但最终社区接受的不多,由于这些都是对我的公司定制化的代码改动,meger进master的意义不大。合理的利用metrics能够及时的发现集群和任务的情况,从而进行相应的对策,能够保证集群的稳定性,避免没必要要的损失。
相关文章
相关标签/搜索