RabbitMQ消息的存储机制以及队列的结构

消息的存储机制

不论是持久化的消息仍是非持久化的消息均可以被写入到磁盘。持久化的消息在到达队列时就被写入到磁盘,而且若是能够,持久化的消息也会在内存中保存一个备份,这样就能够提升必定的性能,当内存吃紧的时候会从内存中清除。非持久化的消息通常只保存在内存中,在内存吃紧的时候会被换入到磁盘中,以节省内存空间。这两种类型的消息的落盘处理都在RabbitMQ的“持久层”中完成。性能

持久层是一个逻辑上的概念,实际包含两个部分:队列索引(rabbit_queue_index)和消息存储(rabbit_msg_store)。rabbit_queue_index负责维护队列中落盘消息的信息,包括消息的存储地点、是否已经交互给消费者、是否已经被消费者ack等。每一个队列都有与之对应的一个rabbit_queue_index。rabbit_msg_store以键值对的形式存储消息,它被全部队列共享,在每一个节点中有且只有一个。从技术层面上来讲,rabbit_msg_store具体还能够分为msg_store_persistent和msg_store_transient,msg_store_persistent负责持久化消息的持久化,重启后消息不会丢失;msg_store_transient负责非持久化消息的持久化,重启后消息会丢失。一般状况下,习惯性的将msg_store_persistent和msg_store_transient当作rabbit_msg_store这样一个总体。fetch

消息(包括消息体、属性和headers)能够直接存储在rabbit_queue_index中,也能够被保存在rabbit_msg_store中。默认在$RABBITMQ_HOME/var/lib/mnesia/rabbit@$HOSTNAME/ 路径下包含queues、msg_store_persistent、msg_store_transient这三个文件夹下,其分别存储对应的信息。优化

最佳的配备是较小的消息存储在rabbit_queue_index中而较大的消息存储在rabbit_msg_store中。这个消息的大小的界定能够经过queue_index_embed_msgs_below来配置,默认大小为4096B。注意这里的消息大小是指消息体、属性及headers总体的大小。当一个消息小于设定的大小阈值时就能够存储在rabbit_queue_index中,这样就能够获得性能上的优化。3d

rabbit_queue_index中以顺序(文件名以0开始累加)的段文件来进行存储,后缀为“.idx”,每一个段文件中包含固定的SEGMENT_ENTRY_COUNT条记录,SEGMENT_ENTRY_COUNT默认值为16384.每一个rabbit_queue_index从磁盘中读取消息的时候至少要在内存中维护一个段文件,因此设置queue_index_embed_msgs_below 值的时候要格外的当心谨慎,一点点增大可能会引发内存爆炸式的增加。blog

通过rabbit_msg_store处理的全部消息都会以追加的方式写入到文件中,当一个文件的大小超过指定的限制(file_size_limit)后,关闭这个文件再建立一个新的文件以供新的消息写入。文件名(文件后缀是“.rdq”)从0开始进行累加,所以文件名最小的文件也是最老的文件。在进行消息的存储时,RabbitMQ会在ETS(Erlang Term Storage)表中记录消息在文件中的映射(Index)和文件的相关信息(FileSummary)。索引

在读取消息的时候,先根据消息的ID(msg_id)找到对应存储的文件,若是文件存在而且未被锁住,则直接打开文件,从指定位置读取消息的内容。若是文件不存在或者被锁住了,则发送请求由rabbit_msg_store进行处理。接口

消息的删除只是从ETS表中删除指定消息的相关信息,同时更新消息对应的存储文件的相关信息。执行消息删除操做时,并不当即对在文件中的消息进行删除,也就是说消息依然在文件中,仅仅是标记为垃圾数据而已。当一个文件中都是垃圾数据时能够将这个文件删除。当检测到先后两个文件中的有效数据能够合并在一个文件中,而且全部的垃圾数据的大小和全部文件(至少有3个文件存在的状况下)的数据大小的比值超过设置的阈值GARBAGE_FACTORION(默认值为0.5)时才会触发垃圾回收将两个文件合并。队列

执行合并的两个文件必定是逻辑上相邻的两个文件。以下图所示,执行合并时首先锁定两个文件,并先对前面文件中的有效数据进行整理,再将后面的文件的有效数据写入到前面的文件,同时更新消息在ETS表中的记录,最后删除后面的文件。ip

队列的结构

一般队列由rabbit_amqqueue_process和backing_queue这两部分组成,rabbit_amqqueue_process负责协议相关的消息处理,即接收生产者发布的消息、向消费者交付消息、处理消息的确认(包括生产端的confirm和消费端的ack等)backing_queue是消息存储的具体形式和引擎,并向rabbit_amqqueue_process提供相关的接口以供调用。若是消息投递的目的队列是空的,而且有消费者订阅了这个队列,那么该消息会直接发送给消费者,不会通过队列这一步。而当消息没法直接投递给消费者时,须要暂时将消息存入队列,以便从新投递。消息存入队列后,不是固定不变的,它会随着系统的负载在队列中不断的流动,消息的状态会不断的发生变化。RabbitMQ中的队列消息可能会处于如下4种状态:内存

  ❤ alpha:消息内容(包括消息体、属性和headers)和消息索引都存储在内存中;

  ❤ beta:消息内容保存在磁盘中,消息索引保存在内存中;

  ❤ gamma:消息内容保存在磁盘中,消息索引在磁盘和内存中都有;

  ❤ delta:消息内容和索引都在磁盘中;

对于持久化的消息,消息内容和消息索引都必须保存在磁盘上,才会处于上述状态中的一种。而gamma状态的消息是只有持久化的消息才会有的状态。

RabbitMQ在运行时会根据统计的消息传送速度按期计算一个当前内存中可以保存的最大的消息数量(target_ram_count),若是alpha状态的消息数量大于此值时,就会引发消息的状态转换,多余的消息可能会转换到beta状态、gamma状态或者delta状态。区分这4中状态的做用主要是知足不一样的内存和CPU需求。alpha状态最消耗内存,但不多消耗CPU。delta状态基本不消耗内存,可是须要更多的CPU和磁盘的I/O操做。delta状态须要执行两次I/O操做才能读取到消息,一次是读消息索引(从rabbit_queue_index中),一次是读消息内容(从rabbit_msg_store中);beta和gamma状态都只须要一次I/O操做就能够读取到消息(从rabbit_msg_store中)。

对于普通的没有设置优先级和镜像的队列来讲,backing_queue的默认实现是rabbit_variable_queue,其内部经过5个子队列Q一、Q二、Delta、Q3和Q4来体现消息的各个状态。整个队列包括rabbit_amqqueue_process和backing_queue的各个子队列,队列的结构能够参考下图:

 

其中Q一、Q4只包含alpha状态的消息,Q2和Q3包含beta和gamma状态的消息,Delta只包含delta状态的消息。通常状况下,消息按照Q1>Q2>Delta>Q3>Q4这样的顺序步骤进行流动,但并非每一条消息都必定会经历全部的状态,这个取决于当前系统的负载情况。从Q1到Q4基本经历内存到磁盘,再由磁盘到内存这样的一个过程,如此能够在负载很高的状况下,可以经过将一部分消息由磁盘保存来节省内存空间,而在负载下降的时候,这部分消息又渐渐地回到内存被消费者获取,使得整个队列有很好的弹性。

消费者获取消息也会引发消息的状态转换,当消费者获取消息时,首先会从Q4中获取消息,若是获取成功则返回。若是Q4为空,则尝试从Q3中获取消息,系统首先会判断Q3是否为空,若是为空则返回队列为空,即此时队列中没有消息。若是Q3不为空,则取出Q3中的消息,进而再判断此时Q3和Delta中的长度,若是都为空,则能够认为Q二、Delta、Q三、Q4所有为空,此时将Q1中的消息直接转移至Q4,下次直接从Q4中获取消息,在将消息从Delta转移到Q3的过程当中,是按照索引分段读取的,首先读取某一段,而后判断读取消息的个数与Delta中的消息的个数是否相等,若是相等,则能够判断此时Delta中无消息,则直接将刚读取到的消息一并放入到Q3中;若是不相等,仅将这次读取到的消息转移到Q3。

在系统负载较高时,已接收到的消息若是不能很快被消费掉,这些消息就会进入到很深的队列中去,这样会增长处理每一个消息的平均开销。由于要花更多的时间和资源处理“堆积”的消息,如此用来处理新流入的消息的能力就会下降,使得后流入的消息又被积压到很深的队列中继续增大每一个消息的平均开销,继而状况变得愈来愈恶化,使得系统的处理能力大大下降。

应对这个问题有3种措施:

  (1)增长prefetch_count的值,即一次发送多条消息给消费者,加快消息被消费的速度。

  (2)采用multiple ack,下降处理ack带来的开销;

  (3)流量控制;

参考:《RabbitMQ实战指南》 朱忠华 编著;

相关文章
相关标签/搜索