前面的两篇文章分布式系统消息中间件——RabbitMQ的使用基础篇与分布式系统消息中间件——RabbitMQ的使用进阶篇,咱们简单介绍了消息中间件与RabbitMQ的一些基本概念、基础用法以及经常使用的几个特性。但若是咱们想更好的去结合咱们的业务场景使用好RabbitMQ,咱们还须要思考一些问题。好比:什么时候去建立队列,RabbitMQ的持久化,如何保证消息到达RabbitMQ,以及消费者如何确认消息......html
从前面的文章咱们知道,RabbitMQ能够选择在生产者建立队列,也能够在消费者端建立队列,也能够提早建立好队列,而生产者消费者直接使用便可。java
RabbitMQ的消息存储在队列中,交换器的使用并不真正耗费服务器的性能,而队列会。如在实际业务应用中,须要对所建立的队列的流量、内存占用及网卡占用有一个清晰的认知,预估其平均值和峰值,以便在固定硬件资源的状况下可以进行合理有效的分配。数据库
按照RabbitMQ官方建议,生产者和消费者都应该尝试建立(这里指声明操做)队列。这虽然是一个很好的建议,可是在我看来这个时间上没有最好的方案,只有最适合的方案。咱们每每须要结合业务、资源等方面在各类方案里面选择一个最适合咱们的方案。缓存
若是业务自己在架构设计之初己经充分地预估了队列的使用状况,彻底能够在业务程序上线以前在服务器上建立好(好比经过页面管理、RabbitMQ命令或者更好的是从配置中心下发),这样业务程序也能够免去声明的过程,直接使用便可。预先建立好资源还有一个好处是,能够确保交换器和队列之间正确地绑定匹配。不少时候,因为人为因素、代码缺陷等,发送消息的交换器并无绑定任何队列,那么消息将会丢失:或者交换器绑定了某个队列,可是发送消息时的路由键没法与现存的队列匹配,那么消息也会丢失。固然能够配合mandatory参数或者备份交换器(关于mandatory参数的使用详细可参考个人上一篇文章) 来提升程序的健壮性。与此同时,预估好队列的使用状况很是重要,若是在后期运行过程当中超过预约的阈值,能够根据实际状况对当前集群进行扩容或者将相应的队列迁移到其余集群。迁移的过程也能够对业务程序彻底透明。此种方法也更有利于开发和运维分工,便于相应资源的管理。若是集群资源充足,而即将使用的队列所占用的资源又在可控的范围以内,为了增长业务程序的灵活性,也彻底能够在业务程序中声明队列。至因而使用预先分配建立资源的静态方式仍是动态的建立方式,须要从业务逻辑自己、公司运维体系和公司硬件资源等方面考虑。服务器
做为一个内存中间件,在保证了速度的状况下,不可避免存在如内存数据库一样的问题,即丢失问题。持久化能够提升RabbitMQ 的可靠性,以防在异常状况(重启、关闭、宕机等)下的数据丢失。RabbitMQ的持久化分为三个部分:交换器的持久化、队列的持久化和消息的持久化。网络
交换器的持久化是经过在声明队列是将durable 参数置为true 实现的(该参数默认为false)。若是交换器不设置持久化,那么在RabbitMQ 服务重启以后,相关的交换器元数据会丢失,不过消息不会丢失,只是不能将消息发送到这个交换器中了。对一个长期使用的交换器来讲,建议将其置为持久化的。架构
队列的持久化是经过在声明队列时将durable 参数置为true 实现的(该参数默认为false),若是队列不设置持久化,那么在RabbitMQ 服务重启以后,相关队列的元数据会丢失,此时数据也会丢失。正所谓"皮之不存,毛将焉附",队列都没有了,消息又能存在哪里呢?并发
队列的持久化能保证其自己的元数据不会因异常状况而丢失,可是并不能保证内部所存储的消息不会丢失。要确保消息不会丢失,须要将其设置为持久化。经过将消息的投递模式(BasicProperties中的DeliveryMode属性)设置为2便可实现消息的持久化。运维
所以,消息若是要想在Rabbit重启、关闭、宕机时可以恢复,须要作到如下三点:异步
注意:RabbitMQ 确保持久化消息能从服务器重启中恢复的方式是将它们写入磁盘上的一个持久化日志文件中。当发布一条持久化消息到持久化交换器时,Rabbit会在日志提交到日志文件后才发送响应(开启生产者确认机制)。以后,若是消息到了非持久化队列,它会自动从日志文件中删除,而且没法在服务器重启后恢复。所以单单只设置队列持久化,重启以后消息会丢失;单单只设置消息的持久化,重启以后队列消失,继而消息也丢失。单单设置消息持久化而不设置队列的持久化是毫无心义的。当从持久化队列中消费了消息后(而且确认后),RabbitMQ会在持久化日志中把这条消息标记为等待垃圾收集。而在消费持久化消息以前,若RabbitMQ服务器重启,会自动重建交换器、队列以及绑定,重播持久化日志文件中的消息到合适的队列或者交换器上(取决于宕机时,消息处在路由的哪一个环节)。
为了保障消息不会丢失,也许咱们能够简单粗暴的将全部的消息标记为持久化,但这样咱们会付出性能的代价。写入磁盘的速度比写入内存的速度慢得不仅一点点。对于可靠性不是那么高的消息能够不采用持久化处理以提升总体的吞吐量。在选择是否要将消息持久化时,须要在可靠性和吐吞量之间作一个权衡。
将交换器、队列、消息都设置了持久化以后就能百分之百保证数据不丢失了吗?
关于第一个问题,能够经过消费者确认机制来解决。而第二个问题能够经过生产者确认机制来解决,也可使用镜像队列机制(镜像队列机制,将在运维篇总结)。生产者确认消费者确认请往下看。加群探讨学习:874811168 免费获取java视频资料一份
上文咱们知道,在使用RabbitMQ的时候,能够经过消息持久化操做来解决由于服务器的异常崩溃而致使的消息丢失,除此以外,咱们还会遇到一个问题,当消息的生产者将消息发送出去以后,消息到底有没有正确地到达服务器呢?若是不进行特殊配置,默认状况下发送消息的操做是不会返回任何信息给生产者的,也就是默认状况下生产者是不知道消息有没有正确地到达服务器。若是在消息到达服务器以前己经丢失,持久化操做也解决不了这个问题,由于消息根本没有到达服务器,何谈持久化?
RabbitMQ针对这个问题,提供了两种解决方式:
RabbitMQ 客户端中与事务机制相关的方法有三个:channel.TxSelect(用于将当前信道设置为事务模式);channel.TxCommit(用于提交事务),channel.TxRollback(用于回滚事务)。在经过channel.TxSelect方法开启事务以后,咱们即可以发布消息给RabbitMQ了,若是事务提交成功,则消息必定到达了RabbitMQ 中,若是在事务提交执行以前因为RabbitMQ异常崩溃或者其余缘由抛出异常,这个时候咱们即可以将其捕获,进而经过执行channel.TxRollback方法来实现事务回滚。示例代码以下所示:
channel.TxSelect();//将信道设置为事务模式 try { //do something var message = Encoding.UTF8.GetBytes("TestMsg"); channel.BasicPublish("normalExchange", "NormalRoutingKey", true, null, message); //do something channel.TxCommit();//提交事务 } catch (Exception ex) { //log(ex); channel.TxRollback(); }
事务确实可以解决消息发送方和RabbitMQ之间消息确认的问题,只有消息成功被RabbitMQ接收,事务才能提交成功,不然即可在捕获异常以后进行事务回滚,与此同时能够进行消息重发。可是使用事务一样会带来一些问题。
前面介绍了RabbitMQ可能会遇到的一个问题,即消息发送方(生产者〉并不知道消息是否真正地到达了RabbitMQ。随后了解到在AMQP协议层面提供了事务机制来解决这个问题,可是采用事务机制实现会严重下降RabbitMQ的消息吞吐量,这里就引入了一种轻量级的方式一发送方确认(publisher confirm)机制。生产者将信道设置成confirm确认)模式,一旦信道进入confirm模式,全部在该信道上面发布的消息都会被指派一个惟一的ID( 从1开始),一旦消息被投递到全部匹配的队列以后,RabbitMQ就会发送一个确认(BasicAck) 给生产者(包含消息的惟一ID),这就使得生产者知晓消息已经正确到达了目的地了。若是消息和队列是可持久化的,那么确认消息会在消息写入磁盘以后发出。
发送方确认模式,示例代码以下:
//示例1--同步等待 channel.ConfirmSelect();//开启确认模式 var message = Encoding.UTF8.GetBytes("TestMsg"); channel.ExchangeDeclare("normalExchange", "direct", true, false, null); channel.QueueDeclare("normalQueue", true, false, false, null); channel.QueueBind("normalQueue", "normalExchange", "NormalRoutingKey"); channel.BasicPublish("normalExchange", "NormalRoutingKey", true, null, message); //var result=channel.WaitForConfirmsOrDie(Timeout); //WaitForConfirmsOrDie 使用WaitForConfirmsOrDie 在Rabbit发送Nack命令或超时时会抛出一个异常 var result = channel.WaitForConfirms();//等待该信道全部未确认的消息结果 if(!result){ //send message failed; }
//示例2--异步通知 channel.ConfirmSelect();//开启确认模式 var message = Encoding.UTF8.GetBytes("TestMsg"); channel.ExchangeDeclare("normalExchange", "direct", true, false, null); channel.QueueDeclare("normalQueue", true, false, false, null); channel.QueueBind("normalQueue", "normalExchange", "NormalRoutingKey"); channel.BasicPublish("normalExchange", "NormalRoutingKey", true, null, message); channel.BasicAcks += (model, ea) => { //消息被投递到全部匹配的队列以后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的惟一ID) //ea.Multiple为True表明 ea.DeliveryTag编号以前的消息均已被确认。 //do something; }; channel.BasicNacks += (model, ea) => { //若是RabbitMQ 由于自身内部错误致使消息丢失,就会发送一条nack(BasicNack) 命令 //do something; };
关于生产者确认机制一样会有一些问题,broker不能保证消息会被confirm,只知道将会进行confirm。这样若是broker与生产者之间的链接断开,致使生产者不能收到确认消息,可能会重复进行发布。总之,生产者确认模式给客户端提供了一种较为轻量级的方式,可以跟踪哪些消息被broker处理,哪些可能由于broker宕掉或者网络失败的状况而从新发布。
注意:事务机制和publisher confirm机制二者是互斥的,不能共存。若是企图将已开启事务模式的信道再设置为publisher confmn模式, RabbitMQ会报错,或者若是企图将已开启publisher confirm模式的信道设置为事务模式, RabbitMQ也会报错。在性能上来看,而到底应该选择事务机制仍是Confirm机制,则须要结合咱们的业务场景。
为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制(message acknowledgement)。消费者在订阅队列时,能够指定noAck参数,当noAck等于false时,RabbitMQ会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,以后再删除)。当noAck等于true时,RabbitMQ会自动把发送出去的消息置为确认,而后从内存(或者磁盘)中删除,而无论消费者是否真正地消费到了这些消息。
采用消息确认机制后,只要设置noAck参数为false,消费者就有足够的时间处理消息(任务),不用担忧处理消息过程当中消费者进程挂掉后消息丢失的问题,由于RabbitMQ会一直等待持有消息直到消费者显式调用BasicAck命令为止。
当noAck参数置为false,对于RabbitMQ服务端而言,队列中的消息分红了两个部分:一部分是等待投递给消费者的消息:一部分是己经投递给消费者,可是尚未收到消费者确认信号的消息。若是RabbitMQ 一直没有收到消费者的确认信号,而且消费此消息的消费者己经断开链接,则RabbitMQ会安排该消息从新进入队列,等待投递给下一个消费者,固然也有可能仍是原来的那个消费者。
RabbitMQ不会为未确认的消息设置过时时间,它判断此消息是否须要从新投递给消费者的惟一依据是消费该消息的消费者链接是否己经断开,这么设计的缘由是RabbitMQ 容许消费者消费一条消息的时间能够好久好久。加群探讨学习:874811168 免费获取java视频资料一份
关于RabbitMQ消费者确认机制示例代码以下:
//推模式 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //定义消费者回调事件 consumer.Received += (model, ea) => { //do someting; //channel.BasicReject(ea.DeliveryTag, requeue: true);//拒绝 //requeue参数为true会从新将这条消息存入队列,以即可以发送给下一个订阅的消费者 channel.BasicAck(ea.DeliveryTag, multiple: false);//确认 //若:multiple参数为true,则确认DeliverTag这个编号以前的消息 }; channel.BasicConsume(queue: "queueName", noAck: false, consumer: consumer); //拉模式 BasicGetResult result = channel.BasicGet("queueName", noAck: false); //确认 channel.BasicAck(result.DeliveryTag, multiple: false);
如上,消费者在消费消息的同时,Rabbit会同步给予消费者一个DeliveryTag,这个DeliveryTag就像咱们数据库中的主键,消费者在消费完毕后拿着这个DeliveryTag去Rabbit确认或拒绝这个消息。
void BasicAck(ulong deliveryTag, bool multiple); void BasicReject(ulong deliveryTag, bool requeue); void BasicNack(ulong deliveryTag, bool multiple, bool requeue);
说明:将channel.BasicReject 或者channel.BasicNack中的requeue设置为false ,能够启用"死信队列"的功能。(关于死信队列请看个人上一篇文章 http://www.javashuo.com/article/p-suqlfqpk-kw.html)。
上述requeue,都会将消息从新存入队列发送给下一个消费者(也有多是其它消费者)。关于requeue还有下面一种用法。能够选择是否补发给当前的consumer。
//补发消息 true退回到queue中 /false只补发给当前的consumer channel.BasicRecover(true);
注意:RabbitMQ仅仅经过Consumer的链接中断来确认该Message并无被正确处理。也就是说,RabbitMQ给了Consumer足够长的时间来作数据处理。若是忘记了ack,那么后果很严重。当Consumer退出时,Message会从新分发。而后RabbitMQ会占用愈来愈多的内存,因为RabbitMQ会长时间运行,这个“内存泄漏”是致命的。
当RabbitMQ 队列拥有多个消费者时,队列收到的消息将以轮询(round-robin)的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者。这种方式很是适合扩展,并且它是专门为并发程序设计的。若是如今负载加剧,那么只须要建立更多的消费者来消费处理消息便可。
不少时候轮询的分发机制也不是那么优雅。默认状况下,若是有n个消费者,那么RabbitMQ会将第m条消息分发给第m%n (取余的方式)个消费者, RabbitMQ 无论消费者是否消费并己经确认了消息。试想一下,若是某些消费者任务繁重,来不及消费那么多的消息,而某些其余消费者因为某些缘由(好比业务逻辑简单、机器性能卓越等)很快地处理完了所分配到的消息,进而进程空闲,这样就会形成总体应用吞吐量的降低。那么该如何处理这种状况呢?这里就要用到channel.BasicQos(int prefetchCount)这个方法,channel.BasicQos方法容许限制信道上的消费者所能保持的最大未确认消息的数量。
举例说明,在订阅消费队列以前,消费端程序调用了channel.BasicQos(5),以后订阅了某个队列进行消费。RabbitMQ 会保存一个消费者的列表,每发送一条消息都会为对应的消费者计数,若是达到了所设定的上限,那么RabbitMQ 就不会向这个消费者再发送任何消息。直到消费者确认了某条消息以后, RabbitMQ 将相应的计数减1,以后消费者能够继续接收消息,直到再次到达计数上限。
注意:Basic.Qos 的使用对于拉模式的消费方式无效.
void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
global参数 | AMQP 0-9-1 | RabbitMQ |
---|---|---|
false | 信道上全部的消费者都须要听从prefetchCount 的限信道上新的消费者须要听从prefetchCount 的限定值 | 信道上新的消费者须要听从prefetchCount 的限定值 |
true | 当前通讯链路( Connection) 上全部的消费者都需信道上全部的消费者都须要听从prefetchCount的限定值 | 信道上全部的消费者须要听从prefetchCount 的限定值 |
注意:
//伪代码 Consumer consumer1 = ...; Consumer consumer2 = ...; channel.BasicQos(10) ; channel.BasicConsume("my-queue1" , false , consumer1); channel.BasicConsume("my-queue2" , false , consumer2); //两个消费者各自的能接收到的未确认消息的上限都为10 。
//伪代码 Channel channel = ...; Consumer consumerl = ...; Consumer consumer2 = ...; channel.BasicQos(3 , false); channel.BasicQos(5 , true); channel.BasicConsume("queuel" , false , consumerl) ; channel.BasicConsume("queue2" , false , consumer2) ; //这里每一个消费者最多只能收到3个未确认的消息,两个消费者能收到的未确认的消息个数之和的上限为5
消息的顺序性是指消费者消费到的消息和发送者发布的消息的顺序是一致的。举个例子,不考虑消息重复的状况,若是生产者发布的消息分别为msgl、msg二、msg3,那么消费者必然也是按照msgl、msg二、msg3的顺序进行消费的。
目前不少资料显示RabbitMQ的消息可以保障顺序性,这是不正确的,或者说这个观点有很大的局限性。在不使用任何RabbitMQ的高级特性,也没有消息丢失、网络故障之类异常的状况发生,而且只有一个消费者的状况下,最好也只有一个生产者的状况下能够保证消息的顺序性。若是有多个生产者同时发送消息,没法肯定消息到达Broker 的先后顺序,也就没法验证消息的顺序性。
那么哪些状况下RabbitMQ 的消息顺序性会被打破呢?下面介绍几种常见的情形。
若是生产者使用了事务机制,在发送消息以后遇到异常进行了事务回滚,那么须要从新补偿发送这条消息,若是补偿发送是在另外一个线程实现的,那么消息在生产者这个源头就出现了错序。一样,若是启用publisher confirm时,在发生超时、中断,又或者是收到RabbitMQ的BasicNack命令时,那么一样须要补偿发送,结果与事务机制同样会错序。或者这种说法有些牵强,咱们能够执拗地认为消息的顺序性保障是从存入队列以后开始的,而不是在发迭的时候开始的。
考虑另外一种情形,若是生产者发送的消息设置了不一样的超时时间,井且也设置了死信队列,总体上来讲至关于一个延迟队列,那么消费者在消费这个延迟队列的时候,消息的顺序必然不会和生产者发送消息的顺序一致。
若是消息设置了优先级,那么消费者消费到的消息也必然不是顺序性的。
若是一个队列按照先后顺序分有msg1, msg二、msg三、msg4这4 个消息,同时有ConsumerA和ConsumerB 这两个消费者同时订阅了这个队列。队列中的消息轮询分发到各个消费者之中,ConsumerA 中的消息为msg1和msg3,ConsumerB中的消息为msg二、msg4。ConsumerA收到消息msg1以后并不想处理而调用了BasicNack/BasicReject将消息拒绝,与此同时将requeue设置为true,这样这条消息就能够从新存入队列中。消息msg1以后被发送到了ConsumerB中,此时ConsumerB己经消费了msg二、msg4,以后再消费msg1.这样消息顺序性也就错乱了。
包括但不只限于以上几种情形会使RabbitMQ 消息错序。若是要保证消息的顺序性,须要业务方使用的时候作进一步的处理。如在消息体内添加全局有序标识等。加群探讨学习:874811168 免费获取java视频资料一份
消息可靠传输通常是业务系统接入消息中间件时首要考虑的问题,通常消息中间件的消息
传输保障分为三个层级。
RabbitMQ 支持其中的"最多一次"和"最少一次"。其中"最少一次"投递实现须要考虑如下这个几个方面的内容:
"最多一次"的方式就无须考虑以上那些方面,生产者随意发送,消费者随意消费,不过这样很难确保消息不会重复消费。
"刚好一次"是RabbitMQ目前没法保障的(目前我也不知道哪一个中间件可以保证)。消费者在消费完一条消息以后向RabbitMQ 发送确认BasicAck命令,此时因为网络断开或者其余缘由形成RabbitMQ并无收到这个确认命令,那么RabbitMQ不会将此条消息标记删除。在从新创建链接以后,消费者仍是会消费到这一条消息,这就形成了重复消费。再考虑一种状况,生产者在使用publisher confirm机制的时候,发送完一条消息等待RabbitMQ 返回确认通知,此时网络断开,生产者捕获到异常状况,为了确保消息可靠性选择从新发送,这样RabbitMQ中就有两条一样的消息,在消费的时候,消费者就会重复消费。而解决重复消费能够经过消费者幂等等方式来解决。
本篇文章,咱们思考了使用RabbitMQ过程当中须要注意的几个问题,而前两篇文章对RabbitMQ的概念以及如何使用作了简单的介绍,相信通过这些介绍已经对RabbitMQ有了基本的了解。但这些远远不够,想要更好的利用好RabbitMQ还须要结合咱们的业务场景来更多的去使用它(切记不要为了使用技术而使用技术!)。关于RabbitMQ的运维篇,会在之后的文章中继续给你们分享。