RabbitMQ详解

介绍RabbitMQ前,有必须先了解一下AMQP协议。AMQP协议是一个高级抽象层消息通讯协议,RabbitMQ是AMQP协议的实现。它主要包括如下组件:正则表达式

 

1. Server(broker): 接受客户端链接,实现AMQP消息队列和路由功能的进程。安全

2. Virtual Host:实际上是一个虚拟概念,相似于权限控制组,一个Virtual Host里面能够有若干个Exchange和Queue,可是权限控制的最小粒度是Virtual Host服务器

3.Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三种,不一样类型的Exchange路由的行为是不同的。异步

4.Message Queue:消息队列,用于存储还未被消费者消费的消息。函数

5.Message: 由Header和Body组成,Header是由生产者添加的各类属性的集合,包括Message是否被持久化、由哪一个Message Queue接受、优先级是多少等。而Body是真正须要传输的APP数据。性能

6.Binding:Binding联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header获得Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,二者的匹配方式由Exchange Type决定。 操作系统

7.Connection:链接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP链接。线程

8.Channel:信道,仅仅建立了客户端到Broker之间的链接后,客户端仍是不能发送消息的。须要为每个Connection建立Channel,AMQP协议规定只有经过Channel才能执行AMQP的命令。一个Connection能够包含多个Channel。之因此须要Channel,是由于TCP链接的创建和释放都是十分昂贵的,若是一个客户端每个线程都须要与Broker交互,若是每个线程都创建一个TCP链接,暂且不考虑TCP链接是否浪费,就算操做系统也没法承受每秒创建如此多的TCP链接。RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,可是建议尽可能共用Connection。设计

9.Command:AMQP的命令,客户端经过Command完成与AMQP服务器的交互来实现自身的逻辑。例如在RabbitMQ中,客户端能够经过publish命令发送消息,txSelect开启一个事务,txCommit提交一个事务。

在了解了AMQP模型之后,须要简单介绍一下AMQP的协议栈,AMQP协议自己包括三层:

 

1.       Modle Layer,位于协议最高层,主要定义了一些供客户端调用的命令,客户端能够利用这些命令实现本身的业务逻辑,例如,客户端能够经过queue.declare声明一个队列,利用consume命令获取一个队列中的消息。

2.       Session Layer,主要负责将客户端的命令发送给服务器,在将服务器端的应答返回给客户端,主要为客户端与服务器之间通讯提供可靠性、同步机制和错误处理。

3.       Transport Layer,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示。

 

从AMQP协议能够看出,MessageQueue、Exchange和Binding构成了AMQP协议的核心,下面咱们就围绕这三个主要组件    从应用使用的角度全面的介绍如何利用Rabbit MQ构建消息队列以及使用过程当中的注意事项。

 

 

1. 声明MessageQueue

      在Rabbit MQ中,不管是生产者发送消息仍是消费者接受消息,都首先须要声明一个MessageQueue。这就存在一个问题,是生产者声明仍是消费者声明呢?要解决这个问题,首先须要明确:

a)消费者是没法订阅或者获取不存在的MessageQueue中信息。

b)消息被Exchange接受之后,若是没有匹配的Queue,则会被丢弃。

在明白了上述两点之后,就容易理解若是是消费者去声明Queue,就有可能会出如今声明Queue以前,生产者已发送的消息被丢弃的隐患。若是应用可以经过消息重发的机制容许消息丢失,则使用此方案没有任何问题。可是若是不能接受该方案,这就须要不管是生产者仍是消费者,在发送或者接受消息前,都须要去尝试创建消息队列。这里有一点须要明确,若是客户端尝试创建一个已经存在的消息队列,Rabbit MQ不会作任何事情,并返回客户端创建成功的。

       若是一个消费者在一个信道中正在监听某一个队列的消息,Rabbit MQ是不容许该消费者在同一个channel去声明其余队列的。Rabbit MQ中,能够经过queue.declare命令声明一个队列,能够设置该队列如下属性:

a) Exclusive:排他队列,若是一个队列被声明为排他队列,该队列仅对首次声明它的链接可见,并在链接断开时自动删除。这里须要注意三点:其一,排他队列是基于链接可见的,同一链接的不一样信道是能够同时访问同一个链接建立的排他队列的。其二,“首次”,若是一个链接已经声明了一个排他队列,其余链接是不容许创建同名的排他队列的,这个与普通队列不一样。其三,即便该队列是持久化的,一旦链接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。

b)   Auto-delete:自动删除,若是该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。

 c)   Durable:持久化,这个会在后面做为专门一个章节讨论。

d)  其余选项,例如若是用户仅仅想查询某一个队列是否已存在,若是不存在,不想创建该队列,仍然能够调用queue.declare,只不过须要将参数passive设为true,传给queue.declare,若是该队列已存在,则会返回true;若是不存在,则会返回Error,可是不会建立新的队列。

2. 生产者发送消息

        在AMQP模型中,Exchange是接受生产者消息并将消息路由到消息队列的关键组件。ExchangeType和Binding决定了消息的路由规则。因此生产者想要发送消息,首先必需要声明一个Exchange和该Exchange对应的Binding。能够经过 ExchangeDeclare和BindingDeclare完成。在Rabbit MQ中,声明一个Exchange须要三个参数:ExchangeName,ExchangeType和Durable。ExchangeName是该Exchange的名字,该属性在建立Binding和生产者经过publish推送消息时须要指定。ExchangeType,指Exchange的类型,在RabbitMQ中,有三种类型的Exchange:direct ,fanout和topic,不一样的Exchange会表现出不一样路由行为。Durable是该Exchange的持久化属性,这个会在消息持久化章节讨论。声明一个Binding须要提供一个QueueName,ExchangeName和BindingKey。下面咱们就分析一下不一样的ExchangeType表现出的不一样路由规则。

        生产者在发送消息时,都须要指定一个RoutingKey和Exchange,Exchange在接到该RoutingKey之后,会判断该ExchangeType:

                         a) 若是是Direct类型,则会将消息中的RoutingKey与该Exchange关联的全部Binding中的BindingKey进行比较,若是相等,则发送到该Binding对应的Queue中。

 
                  b)  若是是 Fanout 类型,则会将消息发送给全部与该 Exchange 定义过 Binding 的全部 Queues 中去,实际上是一种广播行为。
         
 

        c)若是是Topic类型,则会按照正则表达式,对RoutingKey与BindingKey进行匹配,若是匹配成功,则发送到对应的Queue中。

             

3. 消费者订阅消息    

    在RabbitMQ中消费者有2种方式获取队列中的消息:

       a)  一种是经过basic.consume命令,订阅某一个队列中的消息,channel会自动在处理完上一条消息以后,接收下一条消息。(同一个channel消息处理是串行的)。除非关闭channel或者取消订阅,不然客户端将会一直接收队列的消息。

             b)  另一种方式是经过basic.get命令主动获取队列中的消息,可是绝对不能够经过循环调用basic.get来代替basic.consume,这是由于basic.get RabbitMQ在实际执行的时候,是首先consume某一个队列,而后检索第一条消息,而后再取消订阅。若是是高吞吐率的消费者,最好仍是建议使用basic.consume。

      若是有多个消费者同时订阅同一个队列的话,RabbitMQ是采用循环的方式分发消息的,每一条消息只能被一个订阅者接收。例如,有队列Queue,其中ClientA和ClientB都Consume了该队列,MessageA到达队列后,被分派到ClientA,ClientA回复服务器收到响应,服务器删除MessageA;再有一条消息MessageB抵达队列,服务器根据“循环推送”原则,将消息会发给ClientB,而后收到ClientB的确认后,删除MessageB;等到再下一条消息时,服务器会再将消息发送给ClientA。

       这里咱们能够看出,消费者再接到消息之后,都须要给服务器发送一条确认命令,这个便可以在handleDelivery里显示的调用basic.ack实现,也能够在Consume某个队列的时候,设置autoACK属性为true实现。这个ACK仅仅是通知服务器能够安全的删除该消息,而不是通知生产者,与RPC不一样。 若是消费者在接到消息之后还没来得及返回ACK就断开了链接,消息服务器会重传该消息给下一个订阅者,若是没有订阅者就会存储该消息。

        既然RabbitMQ提供了ACK某一个消息的命令,固然也提供了Reject某一个消息的命令。当客户端发生错误,调用basic.reject命令拒绝某一个消息时,能够设置一个requeue的属性,若是为true,则消息服务器会重传该消息给下一个订阅者;若是为false,则会直接删除该消息。固然,也能够经过ack,让消息服务器直接删除该消息而且不会重传。

4. 持久化:

        Rabbit MQ默认是不持久队列、Exchange、Binding以及队列中的消息的,这意味着一旦消息服务器重启,全部已声明的队列,Exchange,Binding以及队列中的消息都会丢失。经过设置Exchange和MessageQueue的durable属性为true,可使得队列和Exchange持久化,可是这还不能使得队列中的消息持久化,这须要生产者在发送消息的时候,将delivery mode设置为2,只有这3个所有设置完成后,才能保证服务器重启不会对现有的队列形成影响。这里须要注意的是,只有durable为true的Exchange和durable为ture的Queues才能绑定,不然在绑定时,RabbitMQ都会抛错的。持久化会对RabbitMQ的性能形成比较大的影响,可能会降低10倍不止。

5. 事务:

     对事务的支持是AMQP协议的一个重要特性。假设当生产者将一个持久化消息发送给服务器时,由于consume命令自己没有任何Response返回,因此即便服务器崩溃,没有持久化该消息,生产者也没法获知该消息已经丢失。若是此时使用事务,即经过txSelect()开启一个事务,而后发送消息给服务器,而后经过txCommit()提交该事务,便可以保证,若是txCommit()提交了,则该消息必定会持久化,若是txCommit()还未提交即服务器崩溃,则该消息不会服务器就收。固然Rabbit MQ也提供了txRollback()命令用于回滚某一个事务。

6. Confirm机制:

      使用事务当然能够保证只有提交的事务,才会被服务器执行。可是这样同时也将客户端与消息服务器同步起来,这背离了消息队列解耦的本质。Rabbit MQ提供了一个更加轻量级的机制来保证生产者能够感知服务器消息是否已被路由到正确的队列中——Confirm。若是设置channel为confirm状态,则经过该channel发送的消息都会被分配一个惟一的ID,而后一旦该消息被正确的路由到匹配的队列中后,服务器会返回给生产者一个Confirm,该Confirm包含该消息的ID,这样生产者就会知道该消息已被正确分发。对于持久化消息,只有该消息被持久化后,才会返回Confirm。Confirm机制的最大优势在于异步,生产者在发送消息之后,便可继续执行其余任务。而服务器返回Confirm后,会触发生产者的回调函数,生产者在回调函数中处理Confirm信息。若是消息服务器发生异常,致使该消息丢失,会返回给生产者一个nack,表示消息已经丢失,这样生产者就能够经过重发消息,保证消息不丢失。Confirm机制在性能上要比事务优越不少。可是Confirm机制,没法进行回滚,就是一旦服务器崩溃,生产者没法获得Confirm信息,生产者其实自己也不知道该消息吃否已经被持久化,只有继续重发来保证消息不丢失,可是若是原先已经持久化的消息,并不会被回滚,这样队列中就会存在两条相同的消息,系统须要支持去重。

消息持久化是 RabbitMQ 最为人津津乐道的特性之一, RabbitMQ 可以在付出最小的性能代价的基础上实现消息的持久化,最大的奥秘就在于 RabbitMQ 多层消息队列的设计上。下面,本文就从 MessageQueue 的设计和消息在 MessageQueue 的生命周期两个方面全面介绍  RabbitMQ 的消息队列。

   RabbitMQ彻底实现了AMQP协议,相似于一个邮箱服务。Exchange负责根据ExchangeType和RoutingKey将消息投递到对应的消息队列中,消息队列负责在消费者获取消息前暂存消息。在RabbitMQ中,MessageQueue主要由两部分组成,一个为AMQQueue,主要负责实现AMQP协议的逻辑功能。另一个是用来存储消息的BackingQueue,本文重点关注的是BackingQueue的设计。

   

    在RabbitMQ中BackingQueue又由5个子队列组成:Q一、Q二、Delta、Q3和Q4。RabbitMQ中的消息一旦进入队列,不是固定不变的,它会随着系统的负载在队列中不断流动,消息的状态不断发生变化。RabbitMQ中的消息一共有5种状态:

   a)Alpha:消息的内容和消息索引都保存在内存中;

   b)Beta:消息内容保存在磁盘上,消息索引保存在内存中;

   c)Gamma:消息内容保存在磁盘上,消息索引在磁盘和内存都有;

   d)Delta:消息内容和索引都在磁盘上;

   注意:对于持久化的消息,消息内容和消息索引都必须先保存到磁盘上,才会处于上述状态中的一种,而Gamma状态的消息只有持久化的消息才会有该状态。

 

      BackingQueue 中的 5 个子队列中的消息状态, Q1 和 Q4 对应的是 Alpha 状态, Q2 和 Q3 是 Beta 状态, Delta 对应的是 Delta 状态。上述就是 RabbitMQ 的多层队列结构的设计,咱们能够看出从 Q1 到 Q4 ,基本经历的是由 RAM 到 DISK,再到 RAM 的设计。这样的设计的好处就是当队列负载很高的状况下,可以经过将一部分消息由磁盘保存来节省内存空间,当负载下降的时候,这部分消息又渐渐回到内存,被消费者获取,使得整个队列有很好的弹性。下面咱们就来看一下,整个消息队列的工做流程。

     引发消息流动主要有两方面的因素:其一是消费者获取消息;其二是因为内存不足,引发消息的换出到磁盘上( Q1-.>Q2 、 Q2->Delta 、 Q3->Delta 、 Q4->Q3 )。 RabbitMQ 在系统运行时会根据消息传输的速度计算一个当前内存中可以保存的最大消息数量( Target_RAM_Count ),当内存中的消息数量大于该值时,就会引发消息的流动。进入队列的消息,通常会按着 Q1->Q2->Delta->Q3->Q4 的顺序进行流动,可是并非每条消息都必定会经历全部的状态,这个取决于当时系统的负载情况。
       当消费者获取消息时,首先会从 Q4 队列中获取消息,若是 Q4 获取成功,则返回,若是 Q4 为空,则尝试从 Q3 获取消息;首先,系统会判断 Q3 队列是否为空,若是为空,则直接返回队列为空,即此时队列中无消息(后续会论证)。若是不为空,则取出 Q3 的消息,而后判断此时 Q3 和 Delta 队列的长度,若是都为空,则可认为 Q2 、 Delta 、 Q3 和 Q4 所有为空 (后续说明 ) ,此时将 Q1 中消息直接转移到 Q4 中,下次直接从 Q4 中获取消息。若是 Q3 为空, Delta 不空,则将 Delta 中的消息转移到 Q3 中;若是 Q3 非空,则直接下次从 Q3 中获取消息。在将 Delta 转移到 Q3 的过程当中, RabbitMQ 是按照索引分段读取的,首先读取某一段,直到读到的消息非空为止,而后判断读取的消息个数与 Delta 中的消息个数是否相等,若是相等,则判定此时 Delta 中已无消息,则直接将 Q2 和刚读到的消息一并放入 Q3 中。若是不相等,则仅将这次读到的消息转移到Q3 中。这就是消费者引发的消息流动过程。
    

     下面咱们分析一下因为内存不足引发的消息换出。消息换出的条件是内存中保存的消息数量 + 等待 ACK 的消息的数量 >Target_RAM_Count 。当条件触发时,系统首先会判断若是当前进入等待 ACK 的消息的速度大于进入队列的消息的速度时,会先处理等待 ACK 的消息。步骤基本上 Q1->Q2 或者 Q3 移动,取决于 Delta 队列是否为空。 Q4->Q3 移动, Q2 和Q3 向 Delta 移动。

    最后,咱们来分析一下前面遗留的两个问题,一个是为何 Q3 队列为空便可认定整个队列为空。试想若是 Q3 为空,Delta 不空,则在 Q3 取出最后一条消息时, Delta 上的消息就会被转移到 Q3 上,与 Q3 空矛盾。若是 Q2 不空,则在 Q3 取出最后一条消息,若是 Delta 为空时,会将 Q2 的消息并入 Q3 ,与 Q3 为空矛盾。若是 Q1 不空,则在 Q3 取出最后一条消息,若是 Delta 和 Q3 均为空时,则将 Q1 的消息转移到 Q4 中,与 Q4 为空矛盾。这也解释了另一个问题,即为何 Q3 和Delta 为空, Q2 就为空。
    上述就是整个消息在 RabbitMQ 队列中流动过程。从上述流程能够看出,消息若是可以被尽早消费掉,就不须要经历持久化的过程,由于这样会加系统的开销。若是消息被消费的速度过慢, RabbitMQ 经过换出内存的方式,防止内存溢出。
相关文章
相关标签/搜索