Flume-NG内置计数器(监控)源码级分析

  Flume的内置监控怎么整?这个问题有不少人问。目前了解到的信息是可使用Cloudera Manager、Ganglia有图形的监控工具,以及从浏览器获取json串,或者自定义向其余监控系统汇报信息。那监控的信息是什么呢?就是各个组件的统计信息,好比成功接收的Event数量、成功发送的Event数量,处理的Transaction的数量等等。并且不一样的组件有不一样的Countor来作统计,目前直到1.5版本仍然只对三大组件:source、sink、channel进行统计分别是SourceCounter、SinkCounter、ChannelCounter,这三个计数器的统计项是固定的,就是你不能本身设置本身的统计项;另外还有ChannelProcessorCounter和SinkProcessorCounter,这两项目前没有设置统计项,因此是目前仍是“摆设”。另外有些同窗可能也发现了,有些内置的组件使用CounterGroup这个来统计信息,这个能够本身随意设置统计项,可是遗憾的是目前(1.5版本)这个能够自定义的计数器的信息还没法用在监控上,由于这只是一个单独的类,并无继承MonitoredCounterGroup这个抽象类。有些内置组件使用的是CounterGroup,因此监控时会没有数据,不一样的版本使用此CounterGroup的组件可能不一样。下面咱们重点介绍:SourceCounter、SinkCounter、ChannelCounter。node

  Flume-NG的全部统计信息、监控及相关的类都在org.apache.flume.instrumentation.http、org.apache.flume.instrumentation、org.apache.flume.instrumentation.util三个包下。mysql

  上面提到了MonitoredCounterGroup,这个类是用来跟踪内部的统计指标的,注册组件的MBean并跟踪和更新统计值。须要监控的组件都要继承这个类,这个类能够跟踪flume内部的全部组件,可是目前只实现了3个。其中比较重要的方法有如下几个:web

  (1)、构造方法MonitoredCounterGroup(Type type, String name, String... attrs),这个方法主要是设置组件的类型、名称;而后将全部的attrs(这是设定的各个统计项)加入Map<String, AtomicLong> counterMap,值设定为0;而后初始化计数器的开始时间和结束时间,都设为0.sql

  (2)、start()方法,会先注册计数器,而后对全部统计项的统计值设为0;将开始时间设置为当前时间数据库

  (3)、register()方法,若是这个计数器还未注册,将这个计数器的MBean进行注册,就能够进行跟踪了apache

  (4)、stop()方法,会设置结束时间为当前时间;输出各个统计项的信息。咱们 Ctrl+C 结束进程时,最后显示的统计信息就是来自这里。json

  其它方法都是获取counterMap的中信息或者更新值等,比较简单。数组

  接下来咱们看看,三个组件中各类统计项及其含义吧:浏览器

  1、SourceCounter,继承了MonitoredCounterGroup。主要统计项以下:服务器

  (1)"src.events.received",表示source接受的event个数;

  (2)"src.events.accepted",表示source处理成功的event个数,和上面的区别就是上面虽然接受了可能没处理成功;

  (3)"src.append.received",表示调用append次数,在avrosource和thriftsource中调用;

  (4)"src.append.accepted",表示append处理成功次数;

  (5)"src.append-batch.received",表示appendBatch被调用的次数,在avrosource和thriftsource中调用;

  (6)"src.append-batch.accepted",表示appendBatch处理成功次数;

  (7)"src.open-connection.count",用在avrosource中表示打开链接的数量;

  通常source调用都集中在前俩。

  2、SinkCounter,继承了MonitoredCounterGroup

  (1)"sink.connection.creation.count",这个调用的地方颇多,都表示“连接”建立的数量,好比与HBase创建连接,与avrosource创建连接以及文件的打开等;

  (2)"sink.connection.closed.count",对应于上面的stop操做、destroyConnection、close文件操做等。

  (3)"sink.connection.failed.count",表示上面所表示“连接”时异常、失败的次数;

  (4)"sink.batch.empty",表示这个批次处理的event数量为0的状况;

  (5)"sink.batch.underflow",表示这个批次处理的event的数量介于0和配置的batchSize之间;

  (6)"sink.batch.complete",表示这个批次处理的event数量等于设定的batchSize;

  (7)"sink.event.drain.attempt",准备处理的event的个数;

  (8)"sink.event.drain.sucess",这个表示处理成功的event数量,与上面不一样的是上面的是还未处理的。

  3、ChannelCounter,继承了MonitoredCounterGroup

  (1)"channel.current.size",这个表示这个channel的当前容量;

  (2)"channel.event.put.attempt",通常指的是在channel的事务当中,source的put操做中记录尝试发送event的个数;

  (3)"channel.event.take.attempt",通常指的是在channel的事务中,sink的take操做记录尝试拿event的个数;

  (4)"channel.event.put.success",通常指的是在channel的事务中,put成功的event的数量;

  (5)"channel.event.take.success",通常指的是channel事务中,take成功的event的数量;

  (6)"channel.capacity",指的是channel的容量,在channel的start方法中设置。

  上面这些统计项都是固定的,咱们能够根据须要增长相应项的值,能够在监控中查看组件的变化状况,从而掌握flume进程的运行状况。好比能够查看channel的容量从而了解到source和sink的相对处理速度,还有能够看source或者sink每一个批次处理成功与失败的次数,了解组件的运行情况等等。

  固然有些同窗可能在自定义本身的组件时,想统计一些本身的统计项,这些统计项在上面三大组件中是没有,怎么办?本身定制啊,上面说了必需要继承MonitoredCounterGroup这个抽象类,设定本身的统计项,而后将统计项设置成数组调用MonitoredCounterGroup的构造函数;而后在自定义的计数器中增长更新数值的方法。最后在自定义的组件中构造自定义的计数器,并启用它的start方法,剩下的就是在该更新统计项数值的地方更新就能够了。

 

  还有一个重要的内容就是监控的实现!没错,内置的有两种HTTP方式(就是json串)和Ganglia,后者须要安装Ganglia,前者很是简单,只须要在Flume的启动命令中加上:-Dflume.monitoring.type=http -Dflume.monitoring.port=XXXX  ,最后的XXXX是你须要设置的端口!而后你就能够在浏览器上经过访问这个Flume所在节点的IP:XXXX/metrics,不断刷新就能够看到最新的组件统计信息。关于Ganglia的请读者自行组建Ganglia集群并参考用户指南来操做。

  若是我想本身实现一个server向其余系统汇报信息,咋整?目前有至少两个方法:

  1、就是上面的HTTP啊,你能够不断去获取json串,本身解析出来各个统计指标,而后剩下的就是你想怎么整就怎么整吧。

  2、就是本身实现一个相似HTTP的server,必须实现org.apache.flume.instrumentation.MonitorService接口,这个接口只有俩方法:start和stop。这个接口继承自Configurable接口因此拥有能够读取配置文件的configure(configure(Context context))方法,来获取一些配置信息。

  以HTTP为例(对应的类是org.apache.flume.instrumentation.http.HTTPMetricsServer),它的start方法启动了一个jetty做为web server,提供WEB服务。并实现了AbstractHandler的一个处理数据的类HTTPMetricsHandler,这个类的handle(String target, HttpServletRequest request, HttpServletResponse response,int dispatch)方法来设置一些WEB页面的格式以及经过JMXPollUtil.getAllMBeans()获取全部组件注册的MBean构成的Map<String, Map<String, String>> metricsMap,遍历这个metricsMap将这个metricsMap转换成json输出到web页面。stop方法就是一些清理工做,这里是关闭jetty server。很简单吧,因此咱们彻底能够实现一个server,在start方法中启动一个线程每隔一秒或者本身定遍历这个metricsMap,写入mysql、HBase或者别的地方,你随便。。。

  你能够在定义的组件中调用本身的计数器,而后将计数器、监控类、自定义组件(source、sink、channel)打包放到lib下,在启动命令后加-Dflume.monitoring.type=AAAAA -Dflume.monitoring.node=BBBB,就能够了。注意,Dflume.monitoring.type这个好似必需要设置的,就是你本身的监控类(这里是AAAAA),后面的无关紧要都是一些参数,你能够自定义参数名,好比能够设置数据库服务器IP、端口等。

    至此,这里介绍完了。这些都是从源码中看出来的,还不曾实现,供大伙借鉴。

相关文章
相关标签/搜索