Storm的Metric接口简介

本文由做者林洋港受权网易云社区发布。html


做为服务端程序,咱们老是须要向外界报告一些统计数据,以助于了解系统的运行状况,好比某个接口的调用时间、系统处理的请求数等等。当咱们的程序以Storm Topology的形式运行时一样须要输出这些统计数据。Storm为咱们提供了Metric接口,能够方便的把一些统计指标输出到指定的地方。Storm Metric的统计方式为每隔指定的时间间隔输出统计内容。本文首先介绍Storm Metric相关的接口以及它们之间的关系,而后以实际应用中的一个例子来讲明如何使用Metric接口。本文使用的Storm版本为0.9.1-incubating。数据库

IMetric是Storm用于保存统计数据的接口安全

public interface IMetric {服务器

public Object getValueAndReset();架构

}并发

接口只有一个getValueAndReset方法,当须要输出统计内容时,Storm就会调用这个方法。值得注意的是getValueAndReset方法返回的是Object类型,这为统计内容的形式提供了灵活性,咱们能够返回任意的类型做为统计信息,这一点在后面的例子中咱们会再提到。另外一个引发咱们注意的地方是IMetric接口并无声明更新统计数据的方法,这样当咱们实现IMetric接口的时候就更加灵活了——参数类型、参数个数都没有限制。Storm自身提供了6个IMetric实现:AssignableMetric、CombinedMetric、CountMetric、MultiCountMetric、ReducedMetric、StateMetric。这里只介绍CountMetric和MultiCountMetric的使用方式,以印证前面说的IMetric接口统计数据更新方式的灵活性以及getValueAndReset返回Object类型的灵活性。CountMetric就是一个简单的计数器,有两个方法incr()和incrBy(long incrementBy),其getValueAndReset方法返回一个long类型的值:app

public Object getValueAndReset() {分布式

long ret = _value;ide

_value = 0;性能

return ret;

}

MultiCountMetric,顾名思义,就是多个指标的计数器,维护着一个Map,只有一个方法CountMetric scope(String key)。所以MultiCountMetric的更新方式为MultiCountMetric.scope(key).incr()或MultiCountMetric.scope(key).incrBy(long incrementBy)。它的getValueAndReset返回的是一个Map:

public Object getValueAndReset() {

Map ret = new HashMap();

for(Map.Entry e : _value.entrySet()) {

ret.put(e.getKey(), e.getValue().getValueAndReset());

}

return ret;

}

除了IMetric接口,还有另一个接口IMetricsConsumer,它负责向外输出统计信息,即把IMetric getValueAndReset方法返回的数据输出到外面。IMetricsConsumer有三个方法

void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter);

void handleDataPoints(TaskInfo taskInfo, Collection dataPoints);

void cleanup();

其中prepare是初始化,cleanup是生命周期结束时的清理工做,handleDataPoints才是真正的统计信息输出方法,taskInfo参数存储当前task的信息(host、port、component id、task id等等),dataPoints存储的是IMetric返回的统计信息,多是出于性能考虑,dataPoints是一个集合,包含了多个IMetric返回的数据。让咱们来具体看看DataPoint这个类:

public static class DataPoint {

@Override

public String toString() {

return "[" + name + " = " + value + "]";

}

public String name;

public Object value;

}

name是IMetric注册时的名字,value就是IMetric getValueAndReset返回的那个Object。

Storm只提供了一个IMetricsConsumer实现——LoggingMetricsConsumer。LoggingMetricsConsumer作的事情很简单,就是把dataPoints输出到日志文件metrics.log,下面是其handleDataPoints方法的部分代码:

for (DataPoint p : dataPoints) {

sb.delete(header.length(), sb.length());

sb.append(p.name)

.append(padding).delete(header.length()+23,sb.length()).append("\t")

.append(p.value);

LOG.info(sb.toString());

}

能够看到它经过调用DataPoint的value的toString方法把统计信息输出到日志里面的,因此若是你的IMetric实现返回的是本身定义的类型,记得重载toString()方法,让统计信息以可读的格式输出。

到这里Storm的Metric接口和自带的实现基本介绍完了,接下来咱们来看看怎么使用Storm自带的这些实现。首先,Storm默认的配置是关掉Metric功能的,能够有两种方式开启Metric功能:

1)在storm.yaml里面配置,这种是集群级别的设置,我的不建议这么作,因此就很少介绍了

2)conf.registerMetricsConsumer(Class klass, long parallelismHint);这是topology级别的,klass是IMetricsConsumer的实现类,parallelismHint这个参数Storm代码里面没注释我也没深刻看底层的实现,这里结合本身的实验谈谈它的意义:topology是在1个或多个worker上面以多个task的方式跑的嘛,parallelismHint就是指定多少个并发来输出统计信息。这里我也不知道parallelismHint指的是多个task、worker仍是supervisor,反正parallelismHint=1的时候只在特定的一个supervisor下面的metrics.log有统计信息,parallelismHint>1时可能取决于worker的数量,我测试的时候因为是在多个supervisor上跑的,所以观察到多个supervisor都有metrics.log的输出。我的经验是parallelismHint设为1,这样能够在一个supervisor下面的metrics.log就能看到全部task的统计信息。

因为我建议采用第二种方法,因此示例代码为:

//客户端注册IMetricsConsumer

conf.registerMetricsConsumer(LoggingMetricsConsumer.class);

StormSubmitter.submitTopology(name, conf, builder.createTopology());

//咱们假设要统计spout某段代码的调用次数

//注册IMetric

@Override

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

...

metric=new CountMetric();

context.registerMetric("spout time cost", metric, 60); //所以DataPoint的name为spout time cost,60表示1分钟统计一次

...

}

//更新统计数据

@Override

public void nextTuple() {

if(...)...

else{

...

metric.incr();

}

}

这样就能够了,而后你就能在metrics.log看到统计数据了。

如今,假设咱们的需求跟上面不太同样:1)metrics.log只打印咱们本身维护的统计信息,屏蔽__system、__fail-count这种系统本身的统计信息;2)不仅统计代码的调用次数,还要统计调用时间——最小时间、最大时间、平均时间。

第一点能够经过重载LoggingMetricsConsumer的方法来实现:

public class AppLoggingMetricsConsumer extends LoggingMetricsConsumer {

@Override

public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {

if (taskInfo.srcComponentId != null && taskInfo.srcComponentId.startsWith("__")) return;

if (dataPoints == null || dataPoints.isEmpty()) return;

List<DataPoint> list = new ArrayList<DataPoint>();

for (DataPoint p : dataPoints) {

if (p.name == null || p.name.startsWith("__")) continue;

list.add(p);

}

if (list.isEmpty()) return;

super.handleDataPoints(taskInfo, list);

}

}

第二点须要开发咱们本身的IMetric接口实现类TimeCostMetric,如下是其主要代码:

@Override

public Object getValueAndReset() {

TimeCost timeCost=new TimeCost();

timeCost.count=count;

if(timeCost.count>0){

timeCost.min=min;

timeCost.max=max;

timeCost.mean=all*1.0/timeCost.count;

}

init();

return timeCost;

}

public void update(long time){

count++;

all+=time;

if(min>time)min=time;

if(max<time)max=time;

}

public static class TimeCost implements Serializable{

private static final long serialVersionUID = 8355726599226036228L;

int count;

long min;

long max;

double mean;

public String toString(){

return "count: "+count+", min: "+min+", max:"+max+", mean: "+mean;

}

}

TimeCostMetric的getValueAndReset方法返回的是一个TimeCost 对象,日志中最终打印的就是其toString方法的内容。而后把前面红色部分的代码改为下面的内容:

① conf.registerMetricsConsumer(AppLoggingMetricsConsumer .class);

② metric=new TimeCostMetric();

context.registerMetric("MQ spout time cost", metric, 60);

③ metric.incr();

再来看看metrics.log

本文中是直接把统计信息打到日志中,你也能够本身实现IMetricsConsumer接口,把统计信息保存到指定的地方,如数据库、监控平台等等。



免费领取验证码、内容安全、短信发送、直播点播体验包及云服务器等套餐

更多网易技术、产品、运营经验分享请访问网易云社区

相关文章:
【推荐】 网页设计简史看设计&代码“隔膜”
【推荐】 巧用Scrum与Kanban
【推荐】 网易美学-系统架构系列1-分布式与服务化

相关文章
相关标签/搜索