flume的memeryChannel中transactionCapacity和sink的batchsize须要注意事项

一.html

fluem中出现,transactionCapacity查询一下,得出一下这些:java

最近在作flume的实时日志收集,用flume默认的配置后,发现不是彻底实时的,因而看了一下,原来是memeryChannel的transactionCapacity在做怪,由于他默认是100,也就是说收集端的sink会在收集到了100条之后再去提交事务(即发送到下一个目的地),因而我修改了transactionCapacity到10,想看看是否是会更加实时一点,结果发现收集日志的agent启动的时候报错了。git

16/04/29 09:36:15 ERROR sink.AbstractRpcSink: Rpc Sink avro-sink: Unable to get event from channel memoryChannel. Exception follows.
org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 10 full, consider committing more frequently, increasing capacity, or increasing thread count
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.Java:96)
at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:354)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:745)github

因而很纳闷,为何默认值100能够,而设置10就会说小了呢,因而查阅资料,发现原来是sink的batchsize参数在做怪,下面,我就来理一理这个前因后果,这个sink的batchsize是什么意思呢,就是sink会一次从channel中取多少个event去发送,而这个发送是要最终以事务的形式去发送的,所以这个batchsize的event会传送到一个事务的缓存队列中(takeList),这是一个双向队列,这个队列能够在事务失败时进行回滚(也就是把取出来的数据吐memeryChannel的queue中),它的初始大小就是transactionCapacity定义的大小,源码中有: takeList = new LinkedBlockingDeque<Event>(transCapacity); 源码来自https://segmentfault.com/a/1190000003586635的分享。redis

再看这个错误抛出的地方:apache

 if(takeList.remainingCapacity() == 0) {
        throw new ChannelException("Take list for MemoryTransaction, capacity " +
            takeList.size() + " full, consider committing more frequently, " +
            "increasing capacity, or increasing thread count");
    }json

在上面的状况中,sink一次取100个events,塞到takelist中,在塞了10个后,就会引起上述异常,所以,这个错误的解决办法就是:在sink中,channel的transactionCapacity参数不能小于sink的batchsize。ubuntu

 

二.segmentfault

Flume-ng出现HDFS IO error,Callable timed out异常

目前解决方案:

浏览器

记Flume-NG一些注意事项

1、关于Source:

一、spool-source:适合静态文件,即文件自己不是动态变化的;

二、avro source能够适当提升线程数量来提升此source性能;

三、ThriftSource在使用时有个问题须要注意,使用批量操做时出现异常并不会打印异常内容而是"Thrift source %s could not append events to the channel.",这是由于源码中在出现异常时,它并未捕获异常而是获取组件名称,这是源码中的一个bug,也能够说明thrift不多有人用,不然这个问题也不会存在在不少版本中;

四、若是一个source对应多个channel,默认就是每一个channel是一样的一份数据,会把这批数据复制N份发送到N个channel中,因此若是某个channel满了会影响总体的速度的哦;

五、ExecSource官方文档已经说明是异步的,可能会丢数据哦,尽可能使用tail -F,注意是大写的;

2、关于Channel:

一、采集节点建议使用新的复合类型的SpillableMemoryChannel,汇总节点建议采用memory channel,具体还要看实际的数据量,通常每分钟数据量超过120MB大小的flume agent都建议用memory channel(本身测的file channel处理速率大概是2M/s,不一样机器、不一样环境可能不一样,这里只提供参考),由于一旦此agent的channel出现溢出状况,将会致使大多数时间处于file channel(SpillableMemoryChannel自己是file channel的一个子类,并且复合channel会保证必定的event的顺序的使得读完内存中的数据后,再须要把溢出的拿走,可能这时内存已满又会溢出。。。),性能大大下降,汇总一旦成为这样后果可想而知;

二、调整memory 占用物理内存空间,须要两个参数byteCapacityBufferPercentage(默认是20)和byteCapacity(默认是JVM最大可用内存的0.8)来控制,计算公式是:byteCapacity = (int)((context.getLong("byteCapacity", defaultByteCapacity).longValue() * (1 - byteCapacityBufferPercentage * .01 )) /byteCapacitySlotSize),很明显能够调节这两个参数来控制,至于byteCapacitySlotSize默认是100,将物理内存转换成槽(slot)数,这样易于管理,可是可能会浪费空间,至少我是这样想的。。。;

三、还有一个有用的参数"keep-alive"这个参数用来控制channel满时影响source的发送,channel空时影响sink的消费,就是等待时间,默认是3s,超过这个时间就甩异常,通常不需配置,可是有些状况颇有用,好比你得场景是每分钟开头集中发一次数据,这时每分钟的开头量可能比较大,后面会愈来愈小,这时你能够调大这个参数,不至于出现channel满了得状况;

3、关于Sink:

一、avro sink的batch-size能够设置大一点,默认是100,增大会减小RPC次数,提升性能;

二、内置hdfs sink的解析时间戳来设置目录或者文件前缀很是损耗性能,由于是基于正则来匹配的,能够经过修改源码来替换解析时间功能来极大提高性能,稍后我会写一篇文章来专门说明这个问题;

三、RollingFileSink文件名不能自定义,并且不能定时滚动文件,只能按时间间隔滚动,能够本身定义sink,来作定时写文件;

四、hdfs sink的文件名中的时间戳部分不能省去,可增长前缀、后缀以及正在写的文件的先后缀等信息;"hdfs.idleTimeout"这个参数颇有意义,指的是正在写的hdfs文件多长时间不更新就关闭文件,建议都配置上,好比你设置了解析时间戳存不一样的目录、文件名,并且rollInterval=0、rollCount=0、rollSize=1000000,若是这个时间内的数据量达不到rollSize的要求并且后续的写入新的文件中了,就是一直打开,相似情景不注意的话可能不少;"hdfs.callTimeout"这个参数指的是每一个hdfs操做(读、写、打开、关闭等)规定的最长操做时间,每一个操做都会放入"hdfs.threadsPoolSize"指定的线程池中得一个线程来操做;

五、关于HBase sink(非异步hbase sink:AsyncHBaseSink),rowkey不能自定义,并且一个serializer只能写一列,一个serializer按正则匹配多个列,性能可能存在问题,建议本身根据需求写一个hbase sink;

六、avro sink能够配置failover和loadbalance,所用的组件和sinkgroup中的是同样的,并且也能够在此配置压缩选项,须要在avro source中配置解压缩;

4、关于SinkGroup:

一、无论是loadbalance或者是failover的多个sink须要共用一个channel;

二、loadbalance的多个sink若是都是直接输出到同一种设备,好比都是hdfs,性能并不会有明显增长,由于sinkgroup是单线程的它的process方法会轮流调用每一个sink去channel中take数据,并确保处理正确,使得是顺序操做的,可是若是是发送到下一级的flume agent就不同了,take操做是顺序的,可是下一级agent的写入操做是并行的,因此确定是快的;

三、其实用loadbalance在必定意义上能够起到failover的做用,生产环境量大建议loadbalance;

5、关于监控monitor:

一、监控我这边作得仍是比较少的,可是目前已知的有如下几种吧:cloudera manager(前提是你得安装CDH版本)、ganglia(这个天生就是支持的)、http(其实就是将统计信息jmx信息,封装成json串,使用jetty展现在浏览器中而已)、再一个就是本身实现收集监控信息,本身作(能够收集http的信息或者本身实现相应的接口实现本身的逻辑,具体能够参考我之前的博客);

二、简单说一下cloudera manager这种监控,最近在使用,确实很强大,能够查看实时的channel进出数据速率、channel实时容量、sink的出速率、source的入速率等等,图形化的东西确实很丰富很直观,能够提供不少flume agent总体运行状况的信息和潜在的一些信息;

6、关于flume启动:

一、flume组件启动顺序:channels——>sinks——>sources,关闭顺序:sources——>sinks——>channels;

二、自动加载配置文件功能,会先关闭全部组件,再重启全部组件;

三、关于AbstractConfigurationProvider中的Map<Class<? extends Channel>, Map<String, Channel>> channelCache这个对象,始终存储着agent中得全部channel对象,由于在动态加载时,channel中可能还有未消费完的数据,可是须要对channel从新配置,因此用以来缓存channel对象的全部数据及配置信息;

四、经过在启动命令中添加 "no-reload-conf"参数为true来取消自动加载配置文件功能;

7、关于interceptor:

请看个人关于这个组件的博客,传送门;

8、关于自定义组件:sink、source、channel:

一、channel不建议自定义哦,这个要求比较高,其余俩都是框架式的开发,往指定的方法填充本身配置、启动、关闭、业务逻辑便可,之后有机会单独写一篇文章来介绍;

二、关于自定义组件请相信github,上面好多好多好多,能够直接用的自定义组件....;

9、关于Flume-NG集群网络拓扑方案:

一、在每台采集节点上部署一个flume agent,而后作一到多个汇总flume agent(loadbalance),采集只负责收集数据发往汇总,汇总能够写HDFS、HBase、spark、本地文件、kafka等等,这样通常修改会只在汇总,agent少,维护工做少;

二、采集节点没有部署flume agent,可能发往mongo、redis等,这时你须要自定义source或者使用sdk来将其中的数据取出并发往flume agent,这样agent就又能够充当“采集节点”或者汇总节点了,可是这样在前面至关于加了一层控制,就又多了一层风险;

三、因为能力有限,其它未知,上面两种,第一种好些,这里看看美团的架构———— 传送门

 

4、

java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null

sink是hdfs,而后使用目录自动生成功能。出现如题的错误,看官网文档说的是须要在每一个文件记录行的开头须要有时间戳,可是时间戳的格式可能比较难调节,因此亦可设置hdfs.useLocalTimeStamp这个参数,好比以每一个小时做为一个文件夹,那么配置应该是这样:

 

[plain]  view plain  copy
 
  1. a1.sinks.k1.hdfs.path = hdfs://ubuntu:9000/flume/events/%y-%m-%d/%H  
  2. a1.sinks.k1.hdfs.filePrefix = events-  
  3. a1.sinks.k1.hdfs.round = true  
  4. a1.sinks.k1.hdfs.roundValue = 1  
  5. a1.sinks.k1.hdfs.roundUnit = hour  
  6. a1.sinks.k1.hdfs.useLocalTimeStamp = true  


或者修改hdfs.timeZone这个参数使之能够和咱们上传的log文件的日期格式同样应该就能够了,没有测试过。

 

5、flume学习(三):flume将log4j日志数据写入到hdfs

、本次咱们把log4j的日志直接采集输出到hdfs中去。须要修改flume.conf中sink的配置:

[plain]  view plain  copy
 
  1. tier1.sources=source1  
  2. tier1.channels=channel1  
  3. tier1.sinks=sink1  
  4.   
  5. tier1.sources.source1.type=avro  
  6. tier1.sources.source1.bind=0.0.0.0  
  7. tier1.sources.source1.port=44444  
  8. tier1.sources.source1.channels=channel1  
  9.   
  10. tier1.channels.channel1.type=memory  
  11. tier1.channels.channel1.capacity=10000  
  12. tier1.channels.channel1.transactionCapacity=1000  
  13. tier1.channels.channel1.keep-alive=30  
  14.   
  15. tier1.sinks.sink1.type=hdfs  
  16. tier1.sinks.sink1.channel=channel1  
  17. tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events  
  18. tier1.sinks.sink1.hdfs.fileType=DataStream  
  19. tier1.sinks.sink1.hdfs.writeFormat=Text  
  20. tier1.sinks.sink1.hdfs.rollInterval=0  
  21. tier1.sinks.sink1.hdfs.rollSize=10240  
  22. tier1.sinks.sink1.hdfs.rollCount=0  
  23. tier1.sinks.sink1.hdfs.idleTimeout=60  

 

6、【Flume】【源码分析】flume中sink到hdfs,文件系统频繁产生文件,文件滚动配置不起做用?

解决方案 
缘由:flume配置问题或者说代码问题  ,文件滚动的判断条件存在漏洞
增长配置参数 ,便可按照参数滚动文件
flume1.sinks.sink1.hdfs.minBlockReplicas=1

参考:http://blog.csdn.net/simonchi/article/details/43231891 

 

7、查看最终配置

(来源http://www.it610.com/article/2107322.htm)

 
最终配置文件示例

```
# flume1 which ones we want to activate.
flume1.channels = ch1
flume1.sources = src1
flume1.sinks = sink1


# Define a memory channel called ch1 on flume1
flume1.channels.ch1.type = memory
flume1.channels.ch1.capacity = 100000
flume1.channels.ch1.transactionCapacity = 1000
flume1.channels.ch1.keep-alive = 30
 
# Define an Avro source called src1 on flume1 and tell it
# to bind to 0.0.0.0:8888. Connect it to channel ch1.
flume1.sources.src1.channels = ch1
flume1.sources.src1.type = avro
flume1.sources.src1.bind = 0.0.0.0
flume1.sources.src1.port = 8888
flume1.sources.src1.threads = 5
 
 
flume1.sinks.sink1.type = hdfs
flume1.sinks.sink1.channel = ch1
flume1.sinks.sink1.hdfs.path =hdfs://master:9000/ysg/flume/ysg/%Y%m 
flume1.sinks.sink1.hdfs.filePrefix = ysg
flume1.sinks.sink1.hdfs.fileSuffix = .log
flume1.sinks.sink1.hdfs.inUseSuffix = .tmp
flume1.sinks.sink1.hdfs.maxOpenFiles = 5000 
flume1.sinks.sink1.hdfs.batchSize= 1
flume1.sinks.sink1.hdfs.fileType = DataStream
flume1.sinks.sink1.hdfs.writeFormat =Text
#flume1.sinks.sink1.hdfs.rollSize =64*1024*1024
flume1.sinks.sink1.hdfs.rollSize = 67108864
flume1.sinks.sink1.hdfs.rollCount = 0
flume1.sinks.sink1.hdfs.rollInterval = 0
flume1.sinks.sink1.hdfs.minBlockReplicas=1

flume1.sinks.sink1.hdfs.useLocalTimeStamp = true
flume1.sinks.sink1.hdfs.connect-timeout=80000
flume1.sinks.sink1.hdfs.callTimeout=120000
flume1.sinks.sink1.hdfs.idleTimeout = 60

 

8、 3.1 基础参数调优经验  --去掉 每写一行在行尾添加一个换行符 状况

  • HdfsSink中默认的serializer会每写一行在行尾添加一个换行符,咱们日志自己带有换行符,这样会致使每条日志后面多一个空行,修改配置不要自动添加换行符;      
lc.sinks.sink_hdfs.serializer.appendNewline = false
 
  • 调大MemoryChannel的capacity,尽可能利用MemoryChannel快速的处理能力;
    • 调大HdfsSink的batchSize,增长吞吐量,减小hdfs的flush次数;
    • 适当调大HdfsSink的callTimeout,避免没必要要的超时错误;

  

 

感谢上面带有链接的帖子。支持原创!(本帖来源于互联网,如有侵犯,请联系我!)

相关文章
相关标签/搜索