RabbitMQ消息可靠性分析

Introduction

有不少人问过我这么一类问题:**RabbitMQ如何确保消息可靠?**不少时候,笔者的回答都是:说来话长的事情何来长话短说。的确,要确保消息可靠不仅是单单几句就可以叙述明白的,包括Kafka也是如此。可靠并非一个绝对的概念,曾经有人也留言说过相似所有磁盘损毁也会致使消息丢失,笔者戏答:还有机房被炸了也会致使消息丢失。可靠性是一个相对的概念,在条件合理的范围内系统所能确保的多少个9的可靠性。一切尽量的趋于完美而没法企及于完美。 咱们能够尽量的确保RabbitMQ的消息可靠。在详细论述RabbitMQ的消息可靠性以前,咱们先来回顾下消息在RabbitMQ中的经由之路。数据库

如图所示,从AMQP协议层面上来讲:编程

消息先从生产者Producer出发到达交换器Exchange; 交换器Exchange根据路由规则将消息转发对应的队列Queue之上; 消息在队列Queue上进行存储; 消费者Consumer订阅队列Queue并进行消费。 咱们对于消息可靠性的分析也从这四个阶段来一一探讨。缓存

Phase 1

消息从生产者发出到达交换器Exchange,在这个过程当中能够发生各类状况,生产者客户端发送出去以后能够发生网络丢包、网络故障等形成消息丢失。通常状况下若是不采起措施,生产者没法感知消息是否已经正确无误的发送到交换器中。若是消息在传输到Exchange的过程当中发生失败而可让生产者感知的话,生产者能够进行进一步的处理动做,好比从新投递相关消息以确保消息的可靠性。网络

为此AMQP协议在创建之初就考虑到这种状况而提供了事务机制。RabbitMQ客户端中与事务机制相关的方法有三个:异步

  • channel.txSelect、
  • channel.txCommit
  • channel.txRollback。

channel.txSelect用于将当前的信道设置成事务模式,channel.txCommit用于提交事务,而channel.txRollback用于事务回滚。在经过channel.txSelect方法开启事务以后,咱们即可以发布消息给RabbitMQ了,若是事务提交成功,则消息必定到达了RabbitMQ中,若是在事务提交执行以前因为RabbitMQ异常崩溃或者其余缘由抛出异常,这个时候咱们即可以将其捕获,进而经过执行channel.txRollback方法来实现事务回滚。注意这里的RabbitMQ中的事务机制与大多数数据库中的事务概念并不相同,须要注意区分。分布式

事务确实可以解决消息发送方和RabbitMQ之间消息确认的问题,只有消息成功被RabbitMQ接收,事务才能提交成功,不然咱们即可在捕获异常以后进行事务回滚,与此同时能够进行消息重发。可是使用事务机制的话会“吸干”RabbitMQ的性能,那么有没有更好的方法既能保证消息发送方确认消息已经正确送达,又能基本上不带来性能上的损失呢?从AMQP协议层面来看并无更好的办法,可是RabbitMQ提供了一个改进方案,即发送方确认机制(publisher confirm)。性能

生产者将信道设置成confirm(确认)模式,一旦信道进入confirm模式,全部在该信道上面发布的消息都会被指派一个惟一的ID(从1开始),一旦消息被投递到全部匹配的队列以后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的惟一ID),这就使得生产者知晓消息已经正确到达了目的地了。RabbitMQ回传给生产者的确认消息中的deliveryTag包含了确认消息的序号,此外RabbitMQ也能够设置channel.basicAck方法中的multiple参数,表示到这个序号以前的全部消息都已经获得了处理。操作系统

事务机制在一条消息发送以后会使发送端阻塞,以等待RabbitMQ的回应,以后才能继续发送下一条消息。相比之下,发送方确认机制最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就能够在等信道返回确认的同时继续发送下一条消息,当消息最终获得确认以后,生产者应用即可以经过回调方法来处理该确认消息,若是RabbitMQ由于自身内部错误致使消息丢失,就会发送一条nack(Basic.Nack)命令,生产者应用程序一样能够在回调方法中处理该nack命令。设计

生产者经过调用channel.confirmSelect方法(即Confirm.Select命令)将信道设置为confirm模式,以后RabbitMQ会返回 Confirm.Select-Ok命令表示赞成生产者将当前信道设置为confirm模式。全部被发送的后续消息都被ack或者nack一次,不会出现一条消息即被ack又被nack的状况。而且RabbitMQ也并无对消息被confirm的快慢作任何保证。code

事务机制和publisher confirm机制二者是互斥的,不能共存。若是企图将已开启事务模式的信道再设置为publisher confirm模式,RabbitMQ会报错:{amqp_error, precondition_failed, “cannot switch from tx to confirm mode”, ‘confirm.select’},或者若是企图将已开启publisher confirm模式的信道在设置为事务模式的话,RabbitMQ也会报错:{amqp_error, precondition_failed, “cannot switch from confirm to tx mode”, ‘tx.select’ }。

事务机制和publisher confirm机制确保的是消息可以正确的发送至RabbitMQ,这里的“发送至RabbitMQ”的含义是指消息被正确的发往至RabbitMQ的交换器,若是此交换器没有匹配的队列的话,那么消息也将会丢失。因此在使用这两种机制的时候要确保所涉及的交换器可以有匹配的队列。更进一步的讲,发送方要配合mandatory参数或者备份交换器一块儿使用来提升消息传输的可靠性。

Phase 2

mandatory和immediate是channel.basicPublish方法中的两个参数,它们都有当消息传递过程当中不可达目的地时将消息返回给生产者的功能。而RabbitMQ提供的备份交换器(Alternate Exchange)能够将未能被交换器路由的消息(没有绑定队列或者没有匹配的绑定)存储起来,而不用返回给客户端。 RabbitMQ 3.0版本开始去掉了对于immediate参数的支持,对此RabbitMQ官方解释是:immediate参数会影响镜像队列的性能,增长代码复杂性,建议采用TTL和DLX的方法替代。因此本文只简单介绍mandatory和备份交换器。 当mandatory参数设为true时,交换器没法根据自身的类型和路由键找到一个符合条件的队列的话,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当mandatory参数设置为false时,出现上述情形的话,消息直接被丢弃。 那么生产者如何获取到没有被正确路由到合适队列的消息呢?这时候能够经过调用channel.addReturnListener来添加ReturnListener监听器实现。使用mandatory参数的关键代码以下所示:

channel.basicPublish(EXCHANGE_NAME, "", true, MessageProperties.PERSISTENT_TEXT_PLAIN, "mandatory test".getBytes());
channel.addReturnListener(new ReturnListener() {
    public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP
            .BasicProperties basicProperties, byte[] body) throws IOException {
        String message = new String(body);
        System.out.println("Basic.Return返回的结果是:" + message);
    }
});

上面代码中生产者没有成功的将消息路由到队列,此时RabbitMQ会经过Basic.Return返回“mandatory test”这条消息,以后生产者客户端经过ReturnListener监听到了这个事件,上面代码的最后输出应该是“Basic.Return返回的结果是:mandatory test”。

生产者能够经过ReturnListener中返回的消息来从新投递或者其它方案来提升消息的可靠性。 备份交换器,英文名称Alternate Exchange,简称AE,或者更直白的能够称之为“备胎交换器”。生产者在发送消息的时候若是不设置mandatory参数,那么消息在未被路由的状况下将会丢失,若是设置了mandatory参数,那么须要添加ReturnListener的编程逻辑,生产者的代码将变得复杂化。若是你不想复杂化生产者的编程逻辑,又不想消息丢失,那么可使用备份交换器,这样能够将未被路由的消息存储在RabbitMQ中,再在须要的时候去处理这些消息。 能够经过在声明交换器(调用channel.exchangeDeclare方法)的时候添加alternate-exchange参数来实现,也能够经过策略的方式实现。若是二者同时使用的话,前者的优先级更高,会覆盖掉Policy的设置。

参考下图,若是此时咱们发送一条消息到normalExchange上,当路由键等于“normalKey”的时候,消息能正确路由到normalQueue这个队列中。若是路由键设为其余值,好比“errorKey”,即消息不能被正确的路由到与normalExchange绑定的任何队列上,此时就会发送给myAe,进而发送到unroutedQueue这个队列。

备份交换器其实和普通的交换器没有太大的区别,为了方便使用,建议设置为fanout类型,如若读者想设置为direct或者topic的类型也没有什么不妥。须要注意的是消息被从新发送到备份交换器时的路由键和从生产者发出的路由键是同样的。备份交换器的实质就是原有交换器的一个“备胎”,全部没法正确路由的消息都发往这个备份交换器中,能够为全部的交换器设置同一个AE,不过这里须要提早确保的是AE已经正确的绑定了队列,最好类型也是fanout的。若是备份交换器和mandatory参数一块儿使用,那么mandatory参数无效。

Phase 3

mandatory或者AE可让消息在路由到队列以前获得极大的可靠性保障,可是消息存入队列以后的可靠性又如何保证?

首先是持久化。持久化能够提升队列的可靠性,以防在异常状况(重启、关闭、宕机等)下的数据丢失。队列的持久化是经过在声明队列时将durable参数置为true实现的,若是队列不设置持久化,那么在RabbitMQ服务重启以后,相关队列的元数据将会丢失,此时数据也会丢失。正所谓“皮之不存,毛将焉附”,队列都没有了,消息又能存在哪里呢?队列的持久化能保证其自己的元数据不会因异常状况而丢失,可是并不能保证内部所存储的消息不会丢失。要确保消息不会丢失,须要将其设置为持久化。经过将消息的投递模式(BasicProperties中的deliveryMode属性)设置为2便可实现消息的持久化。

设置了队列和消息的持久化,当RabbitMQ服务重启以后,消息依旧存在。单单只设置队列持久化,重启以后消息会丢失;单单只设置消息的持久化,重启以后队列消失,既而消息也丢失。单单设置消息持久化而不设置队列的持久化显得毫无心义。

在持久化的消息正确存入RabbitMQ以后,还须要有一段时间(虽然很短,可是不可忽视)才能存入磁盘之中。RabbitMQ并不会为每条消息都作同步存盘(调用内核的fsync6方法)的处理,可能仅仅保存到操做系统缓存之中而不是物理磁盘之中。若是在这段时间内RabbitMQ服务节点发生了宕机、重启等异常状况,消息保存还没来得及落盘,那么这些消息将会丢失。

若是在Phase1中采用了事务机制或者publisher confirm机制的话,服务端的返回是在消息落盘以后执行的,这样能够进一步的提升了消息的可靠性。可是即使如此也没法避免单机故障且没法修复(好比磁盘损毁)而引发的消息丢失,这里就须要引入镜像队列。镜像队列至关于配置了副本,绝大多数分布式的东西都有多副本的概念来确保HA。在镜像队列中,若是主节点(master)在此特殊时间内挂掉,能够自动切换到从节点(slave),这样有效的保证了高可用性,除非整个集群都挂掉。虽然这样也不能彻底的保证RabbitMQ消息不丢失(好比机房被炸。。。),可是配置了镜像队列要比没有配置镜像队列的可靠性要高不少,在实际生产环境中的关键业务队列通常都会设置镜像队列。

Phase 4

进一步的从消费者的角度来讲,若是在消费者接收到相关消息以后,还没来得及处理就宕机了,这样也算数据丢失。

为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制(message acknowledgement)。消费者在订阅队列时,能够指定autoAck参数,当autoAck等于false时,RabbitMQ会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,以后再删除)。当autoAck等于true时,RabbitMQ会自动把发送出去的消息置为确认,而后从内存(或者磁盘)中删除,而无论消费者是否真正的消费到了这些消息。

采用消息确认机制后,只要设置autoAck参数为false,消费者就有足够的时间处理消息(任务),不用担忧处理消息过程当中消费者进程挂掉后消息丢失的问题,由于RabbitMQ会一直等待持有消息直到消费者显式调用Basic.Ack命令为止。

当autoAck参数置为false,对于RabbitMQ服务端而言,队列中的消息分红了两个部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,可是尚未收到消费者确认信号的消息。若是RabbitMQ一直没有收到消费者的确认信号,而且消费此消息的消费者已经断开链接,则RabbitMQ会安排该消息从新进入队列,等待投递给下一个消费者,固然也有可能仍是原来的那个消费者。

RabbitMQ不会为未确认的消息设置过时时间,它判断此消息是否须要从新投递给消费者的惟一依据是消费该消息的消费者链接是否已经断开,这么设计的缘由是RabbitMQ容许消费者消费一条消息的时间能够好久好久。

若是消息消费失败,也能够调用Basic.Reject或者Basic.Nack来拒绝当前消息而不是确认,若是只是简单的拒绝那么消息会丢失,须要将相应的requeue参数设置为true,那么RabbitMQ会从新将这条消息存入队列,以即可以发送给下一个订阅的消费者。若是requeue参数设置为false的话,RabbitMQ当即会把消息从队列中移除,而不会把它发送给新的消费者。

还有一种状况须要考虑:requeue的消息是存入队列头部的,便可以快速的又被发送给消费,若是此时消费者又不能正确的消费而又requeue的话就会进入一个无尽的循环之中。对于这种状况,笔者的建议是在出现没法正确消费的消息时不要采用requeue的方式来确保消息可靠性,而是从新投递到新的队列中,好比设定的死信队列中,以此能够避免前面所说的死循环而又能够确保相应的消息不丢失。对于死信队列中的消息能够用另外的方式来消费分析,以便找出问题的根本。

相关文章
相关标签/搜索