flink实战-使用自定义聚合函数统计网站TP指标

  • 背景java

  • 自定义聚合函数git

  • 实例讲解github


背景

在网站性能测试中,咱们常常会选择 TP50、TP95 或者 TP99 等做为性能指标。接下来咱们讲讲这些指标的含义、以及在flink中如何实时统计:sql

  • TP50,top percent 50,即 50% 的数据都知足某一条件;apache

  • TP95,top percent 95,即 95% 的数据都知足某一条件;微信

  • TP99,top percent 99,即 99% 的数据都知足某一条件;app

咱们举一个例子,咱们要统计网站一分钟以内的的响应时间的TP90,正常的处理逻辑就是把这一分钟以内全部的网站的响应时间从小到大排序,而后计算出总条数count,而后计算出排名在90%的响应时间是多少(count*0.9),就是咱们要的值。ide

自定义聚合函数

这个需求很明显就是一个使用聚合函数来作的案例,Flink中提供了大量的聚合函数,好比count,max,min等等,可是对于这个需求,却没法知足,因此咱们须要自定义一个聚合函数来实现咱们的需求。函数

在前段时间,咱们聊了聊flink的聚合算子,具体可参考: flink实战-聊一聊flink中的聚合算子 , 聚合算子是咱们在写代码的时候用来实现一个聚合功能,聚合函数其实和聚合算子相似,只不过聚合函数用于在写sql的时候使用。性能

自定义聚合函数须要继承抽象类org.apache.flink.table.functions.AggregateFunction。并实现下面几个方法。

  • createAccumulator():这个方法会在一次聚合操做的开始调用一次,主要用于构造一个Accumulator,用于存储在聚合过程当中的临时对象。

  • accumulate() 这个方法,每来一条数据会调用一次这个方法,咱们就在这个方法里实现咱们的聚合函数的具体逻辑。

  • getValue() 这个方法是在聚合结束之后,对中间结果作处理,而后将结果返回,最终sql中获得的结果数据就是这个值。

实例讲解

对于TP指标,正常的思路咱们能够先建立一个临时变量,里面有一个list,每来一个数据,就放到这个list里面,在getValue方法里,进行排序,取相应的TP值。

可是这种思路会有一个问题,就是若是要聚合的时间范围内,数据过多的话。就会在list存储大量的数据,会形成checkpoint过大,时间过长,最后致使程序失败。得不到正确的结果。

因此咱们须要换一个思路,既然最后咱们想要的是一个有序列表,那么咱们是否是能够把这个list结构优化一下,使用Treemap来存储,map的key就是指标,好比响应时间。value就是对应的指标出现的次数。这样getValue方法里,只须要将map的value值累加,就能获得总数count,而后计算出来相应的tp值的位置position,最后咱们再从头累加map的value,直到累加结果大于相应的位置position,则map的key即为所求。

示例以下:咱们先构建一个source,只是随机生成一个变量,网站的相应时间response_time。

String sql = "CREATE TABLE source (\n" +
"
response_time INT,\n" +
"
ts AS localtimestamp,\n" +
"
WATERMARK FOR ts AS ts," +
"
proctime as proctime()\n" +
"
) WITH (\n" +
"
'connector' = 'datagen',\n" +
"
'rows-per-second'='1000',\n" +
"
'fields.response_time.min'='1',\n" +
"
'fields.response_time.max'='1000'" +
"
)";

定义一个聚合函数用的临时变量:

public static class TPAccum{
public Integer tp;
public Map<Integer,Integer> map = new HashMap<>();
}

实现自定义聚合函数类

public static class TP extends AggregateFunction<Integer,TPAccum>{

@Override
public TPAccum createAccumulator(){
return new TPAccum();
}

@Override
public Integer getValue(TPAccum acc){
if (acc.map.size() == 0){
return null;
} else {
Map<Integer,Integer> map = new TreeMap<>(acc.map);
int sum = map.values().stream().reduce(0, Integer::sum);

int tp = acc.tp;
int responseTime = 0;
int p = 0;
Double d = sum * (tp / 100D);
for (Map.Entry<Integer,Integer> entry: map.entrySet()){
p += entry.getValue();
int position = d.intValue() - 1;
if (p >= position){
responseTime = entry.getKey();
break;
}

}
return responseTime;
}
}

public void accumulate(TPAccum acc, Integer iValue, Integer tp){
acc.tp = tp;
if (acc.map.containsKey(iValue)){
acc.map.put(iValue, acc.map.get(iValue) + 1);
} else {
acc.map.put(iValue, 1);
}
}

}

实际的查询sql以下:

String sqlSelect =
"select TUMBLE_START(proctime,INTERVAL '1' SECOND) as starttime,mytp(response_time,50) from source" +
" group by TUMBLE(proctime,INTERVAL '1' SECOND)";

完整代码请参考: https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/function/UdafTP.java

更多内容,欢迎关注个人公众号【大数据技术与应用实战】


本文分享自微信公众号 - 大数据技术与应用实战(bigdata_bigdata)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。

相关文章
相关标签/搜索