公司用的flume-ng采集数据,数据源是日志文件,而后经过正则表达式来过滤指定日志,最后发送给kafka。使用的是apache-flume-ng1.6.0版本。java
最初消耗以下:正则表达式
通过我改进后,消耗以下:apache
cpu消耗整整下降了几十甚至百倍。app
其中改进点有两个:一是自定义一个关键字筛选器;二是重写KafkaSink.java文件为另外一个类,添加无事件处理睡眠配置。spa
咱们的日志格式大概是这样子:日志
XXXXXXXX`XXXXXXX`XXXXXX`…`op=0x38(OP_REPORT_ACTION)`seq=3424`xxx=xxxx`xxx=xxxx`xxx=xxxx……code
关键字筛选器fm.lizhi.flume.channel.OpChannelSelector,配置以下:事件
agent2.sources.Usersource2.selector.type = fm.lizhi.flume.channel.OpChannelSelector agent2.sources.Usersource2.selector.mapping.0x3a(OP_ADD_COLLECT_PROGRAM)=Userchannel2 agent2.sources.Usersource2.selector.mapping.0x38(OP_REPORT_ACTION) = Userchannel2 agent2.sources.Usersource2.selector.skip = 90
上面的skip意思是直接跳过90个字符不检测(由于咱们打印的op=确定在90个字符之后,因此为了节约资源设置)。ip
这个selector主要检测op=后的值,匹配上mapping后的值,就发送给相应的channel,不然丢弃。从而达到筛选目的。资源
对KafkaSink的改进以下:
package fm.lizhi.flume.sink; /** * 近实时的KafkaSink,相比KafkaSink多了backoffSleepMillisecond属性。<br> * 改造自1.6.0版本的KafkaSink * @author zhenghaofeng */ public class NRTKafkaSink extends AbstractSink implements Configurable { private int bakoffSleepMillisecond; private static final String BACKOFF_SLEEP_MILLISECOND = "backoffSleepMillisecond"; private static final int DEFUALT_BACKOFF_SLEEP_MILLISECOND = 0; public Status process() throws EventDeliveryException { boolean isNoEventMore = false; try { …………………… for (; processedEvents < batchSize; processedEvents += 1) { event = channel.take(); if (event == null) { // no events available in channel isNoEventMore = true; break; } …………………… } …………………… transaction.commit(); if (isNoEventMore) { try { Thread.sleep(bakoffSleepMillisecond); } catch (InterruptedException e) { // TODO: handle exception } } } catch (Exception ex) { …………………… } finally { …………………… } return result; } public void configure(Context context) { ……………… bakoffSleepMillisecond = context.getInteger(BACKOFF_SLEEP_MILLISECOND, DEFUALT_BACKOFF_SLEEP_MILLISECOND); ………………………… } }
为何event=null的时候不返回BACKOFF状态,而采用标记睡眠呢?由于若是返回BACKOFF状态,flume默认会睡眠1~5秒。而咱们一般不但愿延时这么久,因此为了避免侵入flume原先的代码,就直接在这里添加睡眠中断。
配置里写法以下:
agent2.sinks.Usersink2.type=fm.lizhi.flume.sink.NRTKafkaSink agent2.sinks.Usersink2.backoffSleepMillisecond=100