一个Flume 异常(Put queue for MemoryTransaction of capacity 100 full)的排查和解决思路

最近在作一个分布式调用链跟踪系统,java

 

在两个地方采用了flume (我使用的flume版本是1.5.0-cdh5.4.4),一个是宿主系统 ,用flume agent进行日志搜集。 一个是从kafka拉日志分析后写入hbase. node

后面这个flume(从kafka拉日志分析后写入flume)用了3台  , 系统上线之后 ,线上抛了一个这样的异常:apache

Caused by: org.apache.flume.ChannelException: Put queue for MemoryTransaction of capacity 100 full, consider committing more frequently, increasing capacity or increasing thread count
at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doPut(MemoryChannel.java:84)
at org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93)
at org.apache.flume.channel.BasicChannelSemantics.put(BasicChannelSemantics.java:80)
at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:189)多线程

从异常信息直观理解是MemoryChannel的事务的Put队列满了,为何会这样呢?架构

咱们先从Flume的体系结构提及,Flume是apache一个负责日志采集和传输的开源工具,它的特色是可以很灵活的经过配置实现不一样数据存储系统之间的数据交换 。分布式

它有三个最主要的组件:ide

Source : 负责从数据源获取数据,包含两种类型的Source . EventDrivenSource 和 PollableSource ,  前者指的是事件驱动型数据源,故名思议,就是须要外部系统主动送数据  ,好比AvroSource ,ThriftSource ; 而PollableSource 指的是须要Source主动从数据源拉取数据  ,好比KafkaSource  。Source 获取到数据之后向Channel 写入Event 。 工具

Sink : 负责从Channel拉取Event , 写入下游存储或者对接其余Agent. google

Channel:用于实现Source和Sink之间的数据缓冲, 主要有文件通道和内存通道两类。spa

Flume的架构图以下:

 

而个人flume 配置以下:

a1.sources = kafkasource
a1.sinks = hdfssink hbasesink
a1.channels = hdfschannel hbasechannel


a1.sources.kafkasource.channels = hdfschannel hbasechannel

a1.sinks.hdfssink.channel = hdfschannel
a1.sinks.hbasesink.channel = hbasechannel

a1.sources.kafkasource.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.kafkasource.zookeeperConnect = zk1:2181,zk2:2181,zk3:2181
a1.sources.kafkasource.topic = nagual_topic
a1.sources.kafkasource.groupId = flume
a1.sources.kafkasource.kafka.consumer.timeout.ms = 500

 


a1.sinks.hdfssink.type = hdfs
a1.sinks.hdfssink.hdfs.path = hdfs://namenode:8020/flume/kafka_events/%y-%m-%d/%H%M
a1.sinks.hdfssink.hdfs.filePrefix = events-prefix
a1.sinks.hdfssink.hdfs.round = true
a1.sinks.hdfssink.hdfs.roundValue = 10
a1.sinks.hdfssink.hdfs.roundUnit = minute
a1.sinks.hdfssink.hdfs.fileType = SequenceFile
a1.sinks.hdfssink.hdfs.writeFormat = Writable
a1.sinks.hdfssink.hdfs.rollInterval = 60
a1.sinks.hdfssink.hdfs.rollCount = -1
a1.sinks.hdfssink.hdfs.rollSize = -1

 

a1.sinks.hbasesink.type = hbase
a1.sinks.hbasesink.table = htable_nagual_tracelog
a1.sinks.hbasesink.index_table = htable_nagual_tracelog_index
a1.sinks.hbasesink.serializer =NagualTraceLogEventSerializer
a1.sinks.hbasesink.columnFamily = rpcid
a1.sinks.hbasesink.zookeeperQuorum = zk1:2181,zk2:2181,zk3:2181

 


a1.channels.hdfschannel.type = memory
a1.channels.hdfschannel.capacity= 10000
a1.channels.hdfschannel.byteCapacityBufferPercentage = 20
a1.channels.hdfschannel.byteCapacity = 536870912

个人flume agent从kafka拉取日志之后,转换成hbase 的row put操做,中间采用了memchannel ,为何会出现以前提到的异常呢? 经过通读一遍源码, 基本找到了问题所在:

 咱们把源码拆解成如下几个主要步骤来分析:

1、flume 的启动:

 

如上图所示, 整个flume 启动的主要流程是这样的:

FLUME_HOME/bin目录中的flume-ng启动脚本启动Application , Application建立一个PollingPropertiesFileConfigurationProvider, 这个Provider的做用是启动 一个配置文件的监控线程FileWatcherRunnable ,定时监控配置文件的变动,

一旦配置文件变动,则从新获得SinkRunner, SourceRunner以及channel的配置, 包装成MaterialedConfiguration,经过google guava的eventbus 推送配置变动给Application ,Application启动一个LifeCycleSupervisor,由它来负责监控

SourceRunner ,SinkRunner,Channel的运行状况 。 SourceRunner ,SinkRunner ,Channel都继承或实现了LifeCycleAware接口,LifeCycleSupervisor经过定时检查这些组件的指望状态是否和当前状态一致, 若是不一致则调用指望状态对应的方法, 

(具体代码能够参考LifeCycleSupervisor的内部类MonitorRunnable) . 按照 Channel 、 SinkRunner 、 SourceRunner(能够理解为先接水管,再接水盆,再接水龙头)顺序进行启动,每一个组件的启动都作什么呢?

 

 以个人flume配置文件来讲明:

 (1) MemChannel

  咱们首先先介绍一下MemChannel的几个配置参数:

  

   

 capacity :控制了MemChannel中一个LinkedBlockingDeque<Event> (咱们后面简称为MemDeque)的最大event个数。

 transactionCapacity: 控制了一个MemChannel的事务(MemoryTransaction)中putList  和takeList两个 LinkedBlockingDeque 的最大长度 。

 byteCapacityBufferPercentage: 控制了MemDeque中event header 的占比, 默认是20%

 byteCapacity:控制了MemDeque的最大字节数, 默认值是应用分配到的最大堆内存(Xmx参数指定)的 80% (咱们称之为x ),这个值x乘以 1 - byteCapacityBufferPercentage * 0.01 就获得了MemDeque中event body的最大字节数。如何利用这个参数来进行流控, 咱们后面还有详细说明。

  keepalive : 控制了一个 MemChannel 事务从MemDeque中读写操做的最大阻塞时间 , 单位:秒。 

  启动之后, 创建了一个LinkedBlockingDeque ,这是一个双端队列,能够进行双向读写,而且用capacity  参数控制了它的最大长度, 另外还建立了几个信号量Semaphore ,

  queueRemaining: 标识MemDeque的初始容量,

  queueStored :  标识MemDeque写入的 event数量,也就是待处理的event 数量。

  bytesRemaing:标识MemDeque 中还能写入多少字节的flume event body; 

 

 (2) SinkRunner:  

   它的机制是启动一个所谓的PollingRunner 线程 ,经过轮询操做,调用一个 SinkProcessor来进行实际的轮询处理, 而这个SinkProcessor则调用 Sink的process  方法进行event处理, 在轮询的处理上,有一个所谓的 补偿机制( backoff)  ,就是当sink获取不到 event 的时候,  PollingRunner 线程须要等待一段backoff时间,等channel中的数据获得了补偿再来进行pollling 操做。而hbasesink 在启动的时候,则把hbase 操做相关的配置:htable, columnfamily ,hbase zk集群的配置信息准备好了。也就是说SinkRunner采用的方式是Pull .

 

(3) SourceRunner: 

 SourceRunner包含两类, 一类是对应EventDrivenSource 的 EventDrivenSourceRunner , 一个是对应PollableSource的PollableSourceRunner , 简单的说,前者是push ,后者是pull  。 

EventDrivenSource  有表明性的是thrift source  , 在本地启动java nio server之后, 从外部接收event ,交给ThriftSource 内部的ThriftSourceHandler进行处理。 

然后者PollableSourceRunner  ,则经过启动一个PollingRunner线程  ,相似SinkRunner中的轮询处理策略 ,启动Source , 在Source内部, 使用ChannelProcessor处理events , ChannelProcessor内部会走一组过滤器构建的过滤器链 ,而后经过通道选择器ChannelSelector选择好通道之后 ,启动事务 ,把一批event 写入Channel .

 

 咱们用下面这张示意图来讲明各组件启动之后的内部运做原理:

 

 

整个图有些复杂, 说明以下:

PollingSourceRunner经过线程启动定时任务 ,间隔一段时间调用kafkasource 从kafka broker 拉取日志,拉完之后,进入一个ChannelProcessor,这个通道处理器先经过一个过滤器链对event进行过滤 ,过滤之后,经过一个ChannelSelector通道选择器,选择evnet要投递的Channel , 而后启动Channel 下的一个事务 (注意,这个事务是用ThreadLocal维持的,也就是说一个线程对应了一个事务) , 事务启动之后,批量向事务MemoryTransaction的一个putList的尾部写入,putlist是一个LinkedBlockingDeque .

事务提交的时候, 把putlist中的event批量移除, 转移到MemoryChannel的一个LinkedBlockingDeque 里面来. 

而SinkRunner则启动PollingRunner , 也经过定时启动任务,调用SinkProcessor,最后调用HbaseSink的process方法,这个方法也负责启动一个事务 ,批量从MemoryChannel的LinkedBlockingDeque中拉取event , 写入takelist ,批量作完hbase 的put操做之后,作memoryTransaction的事务提交操做。事务提交的处理逻辑前面描述过。

而负责进行通道过载保护,正是在MemoryTransaction事务的提交时刻作的 ,这个过载保护的代码能够参考MemoryChannel的MemoryTransaction 内部 类的doCommit方法, 它的思路是这样的:

比较事务提交的时候,takelist和putlist的大小,若是takelist的长度比 putlist 长度要小, 则认为sink的消费能力(takelist长度标识)要比source的生产能力(putlist)要弱, 此时,须要经过一个bytesRemaining的 Semaphore来决定是否容许把putlist中的event转移到MemoryChannel的linkedBlockingDeque来, 若是容许  ,则操做 , 操做之后,putlist 和takelist 都被清理掉了。  bytesRemaining信号量(标示还有多少flume event body存储空间)和queueStored(标示有多少个event能被消费)信号量都被释放了  。

 

综上所述, Flume 的memorychannel采用了两个双端队列putlist和takelist ,分别表示source 的生产能力 和sink 的消费能力,source 和sink启动一个事务 ,source 写putlist  ,提交事务之后 ,把putlist批量移动到另外一个deque .

而sink  则负责从MemoryChannel的Deque 取event, 写入takelist(只作流控用) , 最后sink的事务提交之后,也把putlist 的event批量移动到deque  。 等于在一个事务里面用putlist 作了写入缓冲,用takelist作了流控, memorychannel中的 deque是多个事务共享的存储。

 

至此, 咱们对memorychannel 的细节已经弄清楚了,回过头来看以前出现的那个异常 , 就能知道为何了?

首先, 咱们的transactionCapacity参数没有配置,那默认就是100 ,也就是putlist和takelist 的长度只有100 ,即写入缓冲容量只有100个event .而MemoryChannel的Deque咱们配置了10000 ,容许 flume event body的最大字节数咱们配置了

536870912 * (1 - 20 * 0.01) = 400M左右 , 问题并非出在了memorychannel的双端队列容量不够用,而是下游的hbase sink由于有一个批量处理的默认值是100 ,而在这默认的100次处理中 ,每一次处理都涉及到了对象的avro反序列化 , 100次批量写

入hbase 之后才会清理MemoryTransaction的 putlist,而这个时候上游kafka source 再有数据写入putlist 就出现了前文描述的那个异常。

 

解决办法:从异常提示的几个思路,咱们一一作个思考:

, consider committing more frequently, increasing capacity or increasing thread count

1)更频繁的提交事务: 若是采用这种思路的话,好比只下降hbase sink   的批处理数量,而上游的kafka source的生产能力保持不变,能够预见的是会形成MemoryChannel中Deque堆积的event数量会愈来愈多(由于更频繁的把event 从 putlist转移到了 Memory Deque) . 这种方法只是把问题从putlist 转移到了另外一个Deque 。(要MemoryChannel的Deque更大了)。

2) 增长transactionCapacity: 即增长每个事务的写缓冲能力(putlist长度增长) ,可是调节到多少呢?若是上游的压力陡增 ,仍是会出现这个问题 。这种方法只能暂时缓解,不能完全解决问题。

3) 增长线程数量: 这里我想flume做者的思路是把sink改成多线程增长消费能力来解决。 这个我认为才是解决问题的根本,增长下游的处理能力 。

 那如何增长下游的处理能力呢,除了作flume自己的scaleout ,减小单台flume的压力外。  还有几种方法供咱们思考:

A:把hbase sink扩展为多线程 , 每个线程一个event队列。ChannelProcessor在投递的时候轮询投递到多个队列 。------考虑用Akka  ?

B: 使用Disruptor , ChannelProcessor做为生产者,SinkProcessor做为消费者 。

C: 直接换成 storm  ,利用storm集群的实时处理能力?

用了两天的时间,采用了storm ,的确极大提升了吞吐能力(20多万条消息大概在两分多钟处理完,QPS 达到了1600。 后面单独再写文章来讲明storm目前在使用过程当中踩到的坑 。

相关文章
相关标签/搜索