微信公众号「中间件兴趣圈」主要关注RocketMQ、Dubbo、Netty、Elasticsearch、ElasticJob、Mycat、Mybatis等主流开源中间件。java
最近收到不少RocketMQ使用者,反馈生产环境中在消息发送过程当中偶尔会出现以下4个错误信息之一:git
在进行消息中间件的选型时,若是待选中间件在功能上、性能上都能知足业务的状况下,建议把中间件的实现语言这个因素也考虑进去,毕竟选择一门用本身擅长的语言实现的中间件会更具掌控性。在出现异常的状况下,咱们能够根据本身的经验提取错误信息关键字system busy,在RocketMQ源码中直接搜索,获得抛出上述错误信息的代码以下: github
其代码入口为:org.apache.rocketmq.remoting.netty.NettyRemotingAbstract 的 processRequestCommand 方法。从图中能够看出,抛出上述错误的关键缘由是:pair.getObject1() 的 rejectRequest 方法和抛出RejectedExecutionException 异常。apache
备注:本文偏实战,源码只是做为分析的重点证据,故本文只会点出关键源码,并不会详细跟踪其整个实现流程,若是想详细了解其实现,能够查阅笔者编著的《RocketMQ技术内幕》。json
RocketMQ的网络设计很是值得咱们学习与借鉴,首先在客户端端将不一样的请求定义不一样的请求命令CODE,服务端会将客户端请求进行分类,每一个命令或每类请求命令定义一个处理器(NettyRequestProcessor),而后每个NettyRequestProcessor绑定到一个单独的线程池,进行命令处理,不一样类型的请求将使用不一样的线程池进行处理,实现线程隔离。服务器
为了方便下文的描述,咱们先简单的认识一下NettyRequestProcessor、Pair、RequestCode。其核心关键点以下: 微信
因为读者朋友提出的问题,都是发生在消息发送过程当中,故本文重点关注SendMessageProcessor#rejectRequest方法。 SendMessageProcessor#rejectRequest网络
public boolean rejectRequest() { return this.brokerController.getMessageStore().isOSPageCacheBusy() || // [@1](https://my.oschina.net/u/1198) this.brokerController.getMessageStore().isTransientStorePoolDeficient(); // @2 }
拒绝请求的条件有两个,只要其中任意一个知足,则返回true。数据结构
代码@1:Os PageCache busy,判断操做系统PageCache是否繁忙,若是忙,则返回true。想必看到这里你们确定与我同样好奇,RocketMQ是如何判断pageCache是否繁忙呢?下面会重点分析。架构
代码@2:transientStorePool是否不足。
DefaultMessageStore#isOSPageCacheBusy()
public boolean isOSPageCacheBusy() { long begin = this.getCommitLog().getBeginTimeInLock(); // [@1](https://my.oschina.net/u/1198) start long diff = this.systemClock.now() - begin; // @1 end return diff < 10000000 && diff > this.messageStoreConfig.getOsPageCacheBusyTimeOutMills(); // @2 }
代码@1:先重点解释begin、diff两个局部变量的含义:
代码@2:若是一次消息追加过程的时间超过了Broker配置文件osPageCacheBusyTimeOutMills,则认为pageCache繁忙,osPageCacheBusyTimeOutMills默认值为1000,表示1s。
DefaultMessageStore#isTransientStorePoolDeficient
public boolean isTransientStorePoolDeficient() { return remainTransientStoreBufferNumbs() == 0; } public int remainTransientStoreBufferNumbs() { return this.transientStorePool.remainBufferNumbs(); }
最终调用TransientStorePool#remainBufferNumbs方法。
public int remainBufferNumbs() { if (storeConfig.isTransientStorePoolEnable()) { return availableBuffers.size(); } return Integer.MAX_VALUE; }
若是启用transientStorePoolEnable机制,返回当前可用的ByteBuffer个数,即整个isTransientStorePoolDeficient方法的用意是是否还存在可用的ByteBuffer,若是不存在,即表示pageCache繁忙。那什么是transientStorePoolEnable机制呢?
Java NIO的内存映射机制,提供了将文件系统中的文件映射到内存机制,实现对文件的操做转换对内存地址的操做,极大的提升了IO特性,但这部份内存并非常驻内存,能够被置换到交换内存(虚拟内存),RocketMQ为了提升消息发送的性能,引入了内存锁定机制,即将最近须要操做的commitlog文件映射到内存,并提供内存锁定功能,确保这些文件始终存在内存中,该机制的控制参数就是transientStorePoolEnable。
重点关注MappedFile的ByteBuffer writeBuffer、MappedByteBuffer mappedByteBuffer这两个属性的初始化,由于这两个方法是写消息与查消息操做的直接数据结构。
两个关键点以下:
消息写入时: MappedFile#appendMessagesInner
从中可见,在消息写入时,若是writerBuffer不为空,说明开启了transientStorePoolEnable机制,则消息首先写入writerBuffer中,若是其为空,则写入mappedByteBuffer中。
消息拉取(读消息): MappedFile#selectMappedBuffer
消息读取时,是从mappedByteBuffer中读(pageCache)。
你们是否是发现了一个有趣的点,若是开启transientStorePoolEnable机制,是否是有了读写分离的效果,先写入writerBuffer中,读倒是从mappedByteBuffer中读取。
为了对transientStorePoolEnable引入意图阐述的更加明白,这里我引入Rocketmq社区贡献者胡宗棠关于此问题的看法。
一般有以下两种方式进行读写:
舒适提示:若是想与胡宗棠大神进一步沟通交流,能够关注他的github帐号:https://github.com/zongtanghu
不知道你们会不会有另一个担心,若是开启了transientStorePoolEnable,内存锁定机制,那是否是随着commitlog文件的不断增长,最终致使内存溢出?
从这里能够看出,TransientStorePool默认会初始化5个DirectByteBuffer(对外内存),并提供内存锁定功能,即这部份内存不会被置换,能够经过transientStorePoolSize参数控制。
在消息写入消息时,首先从池子中获取一个DirectByteBuffer进行消息的追加。当5个DirectByteBuffer所有写满消息后,该如何处理呢?从RocketMQ的设计中来看,同一时间,只会对一个commitlog文件进行顺序写,写完一个后,继续建立一个新的commitlog文件。故TransientStorePool的设计思想是循环利用这5个DirectByteBuffer,只须要写入到DirectByteBuffer的内容被提交到PageCache后,便可重复利用。对应的代码以下: TransientStorePool#returnBuffer
public void returnBuffer(ByteBuffer byteBuffer) { byteBuffer.position(0); byteBuffer.limit(fileSize); this.availableBuffers.offerFirst(byteBuffer); }
其调用栈以下:
从上面的分析看来,并不会随着消息的不断写入而致使内存溢出。
其抛出的源码入口点:NettyRemotingAbstract#processRequestCommand,上面的原理分析部分已经详细介绍其实现原理,总结以下。
在不开启transientStorePoolEnable机制时,若是Broker PageCache繁忙时则抛出上述错误,判断PageCache繁忙的依据就是向PageCache追加消息时,若是持有锁的时间超过1s,则会抛出该错误;在开启transientStorePoolEnable机制时,其判断依据是若是TransientStorePool中不存在可用的堆外内存时抛出该错误。
其抛出的源码入口点:NettyRemotingAbstract#processRequestCommand,其调用地方紧跟3.1,是在向线程池执行任务时,被线程池拒绝执行时抛出的,咱们能够顺便看看Broker消息处理发送的线程信息: BrokerController#registerProcessor
该线程池的队列长度默认为10000,咱们能够经过sendThreadPoolQueueCapacity来改变默认值。
其抛出的源码入口点:DefaultMessageStore#putMessage,在进行消息追加时,再一次判断PageCache是否繁忙,若是繁忙,则抛出上述错误。
其抛出源码的入口点:BrokerFastFailure#cleanExpiredRequest。该方法的调用频率为每隔10s中执行一次,不过有一个执行前提条件就是Broker端要开启快速失败,默认为开启,能够经过参数brokerFastFailureEnable来设置。该方法的实现要点是每隔10s,检测一次,若是检测到PageCache繁忙,而且发送队列中还有排队的任务,则直接再也不等待,直接抛出系统繁忙错误,使正在排队的线程快速失败,结束等待。
通过上面的原理讲解与现象分析,消息发送时抛出system busy、broker busy的缘由都是PageCache繁忙,那是否是能够经过调整上述提到的某些参数来避免抛出错误呢?.例如以下参数:
修改上述参数,都不可取,缘由是出现system busy、broker busy这个错误,其本质是系统的PageCache繁忙,通俗一点讲就是向PageCache追加消息时,单个消息发送占用的时间超过1s了,若是继续往该Broker服务器发送消息并等待,其TPS根本没法知足,哪仍是高性能的消息中间了呀。故才会采用快速失败机制,直接给消息发送者返回错误,消息发送者默认状况会重试2次,将消息发往其余Broker,保证其高可用。
下面根据我的的看法,提出以下解决办法:
在broker.config中将transientStorePoolEnable=true。
方案依据: 启用“读写”分离,消息发送时消息先追加到DirectByteBuffer(堆外内存)中,而后在异步刷盘机制下,会将DirectByteBuffer中的内容提交到PageCache,而后刷写到磁盘。消息拉取时,直接从PageCache中拉取,实现了读写分离,减轻了PageCaceh的压力,能从根本上解决该问题。
方案缺点: 会增长数据丢失的可能性,若是Broker JVM进程异常退出,提交到PageCache中的消息是不会丢失的,但存在堆外内存(DirectByteBuffer)中但还未提交到PageCache中的这部分消息,将会丢失。但一般状况下,RocketMQ进程退出的可能性不大。
方案依据:
当Broker服务器自身比较忙的时候,快速失败,而且在接下来的一段时间内会规避该Broker,这样该Broker恢复提供了时间保证,Broker自己的架构是支持分布式水平扩容的,增长Topic的队列数,下降单台Broker服务器的负载,从而避免出现PageCache。 > 舒适提示:在Broker扩容时候,能够复制集群中任意一台Broker服务下${ROCKETMQ_HOME}/store/config/topics.json到新Broker服务器指定目录,避免在新Broker服务器上为Broker建立队列,而后消息发送者、消息消费者都能动态获取Topic的路由信息。
与之扩容对应的,也能够经过对原有Broker进行升配,例如增长内存、把机械盘换成SSD,但这种状况,一般须要重启Broekr服务器,没有扩容来的方便。
本文就介绍到这里了,亲爱的读者朋友,还有更好的方案没?欢迎留言与做者互动,共同探讨。
做者简介:《RocketMQ技术内幕》做者,RocketMQ 社区布道师,维护公众号:中间件兴趣圈,可扫描以下二维码与做者进行互动。