RabbitMQ是一个流行的开源消息队列系统,是AMQP(高级消息队列协议)标准的实现,由以高性能、健壮、可伸缩性出名的Erlang语言开发,并继承了这些优势。业界有较多项目使用RabbitMQ,包括OpenStack、spring、Logstash等。html
腾讯云在开发云消息队列系统(CMQ)时,对RabbitMQ进行了大量的学习和优化,包括瓶颈分析、内存管理、参数调优等。下文结合Erlang和RabbitMQ架构来分析实践中遇到的问题,并探讨相应的优化方案。java
图1 AMQP模型git
AMQP是一个异步消息传递所使用的应用层协议规范,AMQP客户端可以无视消息来源任意发送和接受消息,Broker提供消息的路由、队列等功能。Broker主要由Exchange和Queue组成:Exchange负责接收消息、转发消息到绑定的队列;Queue存储消息,提供持久化、队列等功能。AMQP客户端经过Channel与Broker通讯,Channel是多路复用链接中的一条独立的双向数据流通道。github
RabbitMQ Server实现了AMQP模型中Broker部分,将Channel和Queue设计成了Erlang进程,并用Channel进程的运算实现Exchange的功能。spring
图2 RabbitMQ进程模型bash
图2中,tcp_acceptor进程接收客户端链接,建立rabbit_reader、rabbit_writer、rabbit_channel进程。rabbit_reader接收客户端链接,解析AMQP帧;rabbit_writer向客户端返回数据;rabbit_channel解析AMQP方法,对消息进行路由,而后发给相应队列进程。rabbit_amqqueue_process是队列进程,在RabbitMQ启动(恢复durable类型队列)或建立队列时建立。rabbit_msg_store是负责消息持久化的进程。架构
在整个系统中,存在一个tcp_accepter进程,一个rabbit_msg_store进程,有多少个队列就有多少个rabbit_amqqueue_process进程,每一个客户端链接对应一个rabbit_reader和rabbit_writer进程。异步
RabbitMQ能够对内存和磁盘使用量设置阈值,当达到阈值后,生产者将被阻塞(block),直到对应项恢复正常。除了这两个阈值,RabbitMQ在正常状况下还用流控(Flow Control)机制来确保稳定性。tcp
Erlang进程之间并不共享内存(binaries类型除外),而是经过消息传递来通讯,每一个进程都有本身的进程邮箱。Erlang默认没有对进程邮箱大小设限制,因此当有大量消息持续发往某个进程时,会致使该进程邮箱过大,最终内存溢出并崩溃。性能
在RabbitMQ中,若是生产者持续高速发送,而消费者消费速度较低时,若是没有流控,很快就会使内部进程邮箱大小达到内存阈值,阻塞生产者(得益于block机制,并不会崩溃)。而后RabbitMQ会进行page操做,将内存中的数据持久化到磁盘中。
为了解决该问题,RabbitMQ使用了一种基于信用证的流控机制。消息处理进程有一个信用组{InitialCredit,MoreCreditAfter},默认值为{200, 50}。消息发送者进程A向接收者进程B发消息,每发一条消息,Credit数量减1,直到为0,A被block住;对于接收者B,每接收MoreCreditAfter条消息,会向A发送一条消息,给予A MoreCreditAfter个Credit,当A的Credit>0时,A能够继续向B发送消息。
图3 RabbitMQ生产消息传输路径
能够看出基于信用证的流控最终将消息发送进程的发送速度限制在消息处理进程的处理速度内。RabbitMQ中与流控有关的进程构成了一个有向无环图。
如上所述,消息的存储和队列功能是在amqqueue进程中实现。为了高效处理入队和出队的消息、避免没必要要的磁盘IO,amqqueue进程为消息设计了4种状态和5个内部队列。
4种状态包括:alpha,消息的内容和索引都在内存中;beta,消息的内容在磁盘,索引在内存;gamma,消息的内容在磁盘,索引在磁盘和内存中都有;delta,消息的内容和索引都在磁盘。对于持久化消息,RabbitMQ先将消息的内容和索引保存在磁盘中,而后才处于上面的某种状态(即只可能处于alpha、gamma、delta三种状态之一)。
5个内部队列包括:q一、q二、delta、q三、q4。q1和q4队列中只有alpha状态的消息;q2和q3包含beta和gamma状态的消息;delta队列是消息按序存盘后的一种逻辑队列,只有delta状态的消息。因此delta队列并不在内存中,其余4个队列则是由erlang queue模块实现。
图4 内部队列消息传递顺序
消息从q1入队,q4出队,在内部队列中传递的过程通常是经q1顺序到q4。实际执行并不是必然如此:开始时全部队列都为空,消息直接进入q4(没有消息堆积时);内存紧张时将q4队尾部分消息转入q3,进而再由q3转入delta,此时新来的消息将存入q1(有消息堆积时)。
Paging就是在内存紧张时触发的,paging将大量alpha状态的消息转换为beta和gamma;若是内存依然紧张,继续将beta和gamma状态转换为delta状态。Paging是一个持续过程,涉及到大量消息的多种状态转换,因此Paging的开销较大,严重影响系统性能。
在生产者、消费者均正常状况下,RabbitMQ压测性能很是稳定,保持在一个恒定的速度。当消费者异常或不消费时,RabbitMQ则表现极不稳定。
图5 消息持久化、无消费场景
测试场景以下,exchange和队列都是持久化的,消息也是持久化的、固定为1K,而且无消费者。如上图所示,在达到内存paging阈值后,生产速率下降,并持续较长时间。内存使用状况代表,在内存中的消息数目只有18M内容,其余消息已经page到磁盘中,然而进程内存仍占用2G。Erlang内存使用代表,Queues占用了2G,Binaries占用了2.1G。
该状况说明在消息从内存page到磁盘后(即从q二、q3队列转到delta后),系统中产生了大量的垃圾(garbage),而Erlang VM没有进行及时的垃圾回收(GC)。这致使RabbitMQ错误的计算了内存使用量,并持续调用paging流程,直到Erlang VM隐式垃圾回收。
RabbitMQ内存使用量的计算是在memory_monitor进程内执行的,该进程周期性计算系统内存使用量。同时amqqueue进程会周期性拉取内存使用量,当内存达到paging阈值时,触发amqqueue进程进行paging。paging发生后,amqqueue进程每收到一条新消息都会对内部队列进行page(每次page都会计算出必定数目的消息存盘)。
该过程可行的优化方案是:在amqqueue进程将大部分消息paging到磁盘后,显式调用GC,同时将memory_monitor周期设为0.5s、amqqueue拉取周期设为1s,这样就可以达到秒级恢复;去掉对每条消息执行paging的操做,用amqqueue周期性拉取内存使用量的操做来触发page,这样可以更快将消息paging到磁盘,并且保持这个周期内生产速度不降低。
具体修改可查看:
https://github.com/rabbitmq/rabbitmq-server/compare/stable...javaforfun:stable
图6 paging时主动垃圾回收
从修改后效果能够看出,三次paging都很快结束,前两次paging相邻较近是由于两个镜像节点分别执行了paging。
该问题已反馈至RabbitMQ社区:
从图5中还能够发现,在22:01时生产速度有一个明显的降低(此时未发生paging)。经过流控分析,链路被block在amqqueue进程;经观察发现节点内存使用降低了,说明该节点执行了GC。Erlang GC是按进程级别的标记-清扫模式,会将当前进程暂停,直至GC结束。因为在RabbitMQ中,一个队列只有一个amqqueue进程,该进程又会处理大量的消息,产生大量的垃圾。这就致使该进程GC较慢,进而流控block上游更长时间。
查看RabbitMQ代码发现,amqqueue进程的gen_server模型在正常的逻辑中调用了hibernate,该操做可能致使两次没必要要的GC。优化掉hibernate对系统稳定性有一些帮助。
对流控可能比较好的优化方案是:用多个amqqueue进程来实现一个队列,这样能够下降rabbit_channel被单个amqqueue进程block的几率,同时在单队列的场景下也能更好利用多核的特性。不过该方案对RabbitMQ现有的架构改动很大,难度也很大。
RabbitMQ可优化的参数分为两个部分,Erlang部分和RabbitMQ自身。
IO_THREAD_POOL_SIZE:CPU大于或等于16核时,将Erlang异步线程池数目设为100左右,提升文件IO性能。
hipe_compile:开启Erlang HiPE编译选项(至关于Erlang的jit技术),可以提升性能20%-50%。在Erlang R17后HiPE已经至关稳定,RabbitMQ官方也建议开启此选项。
queue_index_embed_msgs_below:RabbitMQ 3.5版本引入了将小消息直接存入队列索引(queue_index)的优化,消息持久化直接在amqqueue进程中处理,再也不经过msg_store进程。因为消息在5个内部队列中是有序的,因此再也不须要额外的位置索引(msg_store_index)。该优化提升了系统性能10%左右。
vm_memory_high_watermark:用于配置内存阈值,建议小于0.5,由于Erlang GC在最坏状况下会消耗一倍的内存。
vm_memory_high_watermark_paging_ratio:用于配置paging阈值,该值为1时,直接触发内存满阈值,block生产者。
queue_index_max_journal_entries:journal文件是queue_index为避免过多磁盘寻址添加的一层缓冲(内存文件)。对于生产消费正常的状况,消息生产和消费的记录在journal文件中一致,则不用再保存;对于无消费者状况,该文件增长了一次多余的IO操做。
RabbitMQ在2007年发布第一个版本时,只有5000行Erlang代码,到如今已经加入了很是多的特性,但基本架构没有变。从多核的角度看,流控机制和单amqqueue进程之间存在一些冲突,对消费者异常这种场景,还须要从整个架构方面作更多优化。
除了上述内容,RabbitMQ在Cluster、HA、可靠交付、扩展支持等方面也作了大量的工做,这些都值得深刻的学习。