经历上一篇的基础API总结,其实RabbitMQ的基础使用,就不成问题了。可是要想稍微拔高,仍是要经历这一篇的洗礼。一直以来,我面试别人的时候,大多数面试者的简历中,都会写上熟练使用RabbitMQ,然而,我问出一个只要是消息中间件就老生常谈的话题的时候,几乎清一色的,都没有很好的说出来。这个问题就是:请介绍下,RabbitMQ如何保证消息的可靠性的。经过这一篇文章的总结,我想让本身达到对这个问题的细节覆盖全面的程度,至少一个架构师过来问我,我能有条理有逻辑的说明白,不会东一句西一句。做为RabbitMQ,这种咱们平常生产中使用频率至关高的消息中间件,我认为,对他的掌控,要更好,才能说明咱们对技术的追求,而不仅是CURD。java
这两个参数,是保证消息可靠性的第一扇门,咱们先来看看上篇文章中,消息发送的api源码:面试
/** * 发布一个消息到服务端。 * * @param mandatory 后面文章介绍 * @param immediate 后面文章介绍 */ void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
这两个参数,上一篇里面注释的,这一章咱们来解开编程
咱们看看如何获取到madatory为true的时候,消息没有被正确路由,返回给生产者的消息:api
channel.basicPublish("exchangeName", "routingKey", true, MessageProperties.PERSISTENT_TEXT_PLAIN, "test".getBytes()); channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); System.out.println("返回的结果是:" + msg); } });
mandatory主要保护的是:交换机是否能正确匹配到消息队列,immediate主要保护的是消息队列是否有消费者。经过这两个参数,能够保证消息在整个从发送到接收过程当中,全称掌控。缓存
RabbitMQ3.0以后去掉了immediate参数的支持,官方说法是会影响性能,增长代码复杂性,建议使用TTL(消息最大生存时间)和DLX(死信队列)来代替服务器
这东西主要应对mandatory参数不想去设置,而且,这个参数设置了,会增大代码的侵入性,那咱们又如何保障消息没有匹配的队列这种状况不丢失呢,就使用这个。下面是一段使用备份交换机的代码:架构
Map<String, Object> arguments = new HashMap<>(); // alternate-exchange这个参数就是设置具体的备份交换机是谁 arguments.put("alternate-exchange", "myAe"); channel.exchangeDeclare("nomalExchange", "direct", true, false, arguments); channel.exchangeDeclare("myAe","fanout",true,false,null); channel.queueDeclare("nomalQueue",true,false,false,null); channel.queueBind("nomalQueue","nomalExchange","normalKey"); channel.queueDeclare("unroutedQueue",true,false,false,null); channel.queueBind("unroutedQueue","myAe","");
这段代码的主要示意图以下:运维
有如下几点:异步
过时时间分为消息的过时时间和队列的过时时间ide
消息过时时间设置,有两种方式:
若是两个一块儿使用的话,会取较小的那个值。而且若是消息到了过时时间以后尚未消费者进行消费的话,就会变成死信。下面咱们首先来看看如何经过队列属性的方式设置过时时间:
Map<String, Object> arguments = new HashMap<>(); // x-message-ttl经过这个参数进行设置 arguments.put("x-message-ttl",6000); channel.queueDeclare("queueName",true,false,false,arguments);
画外音:固然还能够经过Policy与HTTPAPI的方式进行设置,可是我感受这两种偏运维,这里主要想写写开发视角,我就很少写这两种设置方式了
不设置这个参数,表示队列里面的消息不会过时,设置成0,除非消息立刻被消费者消费,不然将会被丢弃,这个设置0的特性能够部分代替immediate这个参数。下面咱们来看看直接设置消息的TTL:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.deliveryMode(2);// 持久化消息 builder.expiration("60000");//设置TTL=60000ms AMQP.BasicProperties properties = builder.build(); channel.basicPublish("exchangeName", "routingKey", properties, "123".getBytes());
两种过时效果,对消息删除的契机不太同样:
队列过时表示,这个队列上面没有任何的消费者,且队列没有被从新声明过,而且在过时时间段内未调用过Basic.Get命令。RabbitMQ会确保再过时时间到达后将队列删除,但不能保证动做有多么的及时,再RabbitMQ重启以后,过时时间将会被从新计算,下面是设置队列的过时时间:
Map<String, Object> arguments = new HashMap<>(); // x-expires经过这个参数进行设置 arguments.put("x-expires",6000); channel.queueDeclare("queueName",true,false,false,arguments);
一个消息,变成死信的时候,就会被发送到一个交换机里面,这个交换机就是DLX(死信交换机),绑定到DLX的队列就是死信队列。消息变成死信有以下几个状况:
其实DLX和通常交换机没区别,就是将一个普通的队列设置一下DLX的属性,而后这个队列里面编程死信的消息就会被发送到这个交换机上面。这个特性,咱们能够为DLX绑定一个队列,而后配合TTL等于0,来弥补3.0中去除掉的immediate参数的功能。下面是一段简单设置DLX的代码:
channel.exchangeDeclare("exchange.dlx", "direct", true, false, false, null); channel.exchangeDeclare("exchange.normal", "fanout", true, false, false, null); Map<String, Object> argument = new HashMap<>(); // 设置DLX argument.put("x-dead-letter-exchange", "exchange.dlx"); // 设置DLK,就是消息变成死信以后的路由键 argument.put("x-dead-letter-routing-key", "routingkey"); // 设置队列的过时时间 argument.put("x-message-ttl", 10000); channel.queueDeclare("queue.normal", false, false, false, argument); channel.queueBind("queue.normal", "exchange.normal", ""); channel.queueDeclare("queue.dlx", true, false, false, null); channel.queueBind("queue.dlx", "exchange.dlx", "routingkey"); channel.basicPublish("exchange.normal", "rk", MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx".getBytes());
核心的属性:
下面是这个死信队列的一个简单的图例:
接下来就能够引出延迟队列这个概念了,经过上面的TTL与DLX的详细解说,其实咱们彻底能够用这两个来实现延迟队列的功能。无非就是将消费者直接去消费死信队列里面的消息,而不是直接消费普通队列的消息。这样普通队列,咱们能够设置消息的TTL,而后,到了指定的过时时间,就会直接发送到DLX绑定的队列里面,这样,咱们消费者就能消费到了。这样就丁算是过了TTL毫秒,延迟收到消息。咱们彻底能够经过bindingKey来动态的指定不一样的队列,每一个队列设置不一样的TTL,每一个队列设置不一样的DLX,而后每一个DLX又是不一样的死信队列,这样,延迟消息就能够运行了。这里代码不写了,都是重复性的代码。给出延迟队列的图例:
这一部分对于学习整个RabbitMQ的高可用、消息可靠性具备相当重要的做用。在介绍生产者确认以前,咱们来看看,至今为止,咱们接触到的相关RabbitMQ实体,有哪几种持久化,与这几种持久化对应的效果:
其实咱们使用默认的属性封装的常量,已经封装了消息,咱们来看看源码:
public class MessageProperties { ...... /** Content-type "text/plain", deliveryMode 2 (persistent), priority zero */ public static final BasicProperties PERSISTENT_TEXT_PLAIN = new BasicProperties("text/plain", null, null, 2,// deliveryMode 0, null, null, null, null, null, null, null, null, null); }
可是,即便是上面提到的实体,咱们都进行了持久化,咱们仍是会有没法保证消息不会丢失的场景,下面说两个:
为了解决一些异常宕机或者其余状况致使的消息不可靠的场景,可使用如下两种技术来解决:
生产者确认又能够细分红两种:
下面咱们一个个来讲
首先说说,这种事务机制,其实会榨干RabbitMQ的全乎性能,彻底不推荐使用,不过做为一种机制,仍是要细说。与具体的事务操做相似,整个发送的事务,也是三步走:
下面就是正常事务发送消息的时序图:
下面是回滚的事务时序图:
下面是极简的一段代码:
try { channel.txSelect(); channel.basicPublish("exchange.normal", "rk", MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx".getBytes()); channel.txCommit(); } catch (IOException e) { e.printStackTrace(); channel.txRollback(); }
首先咱们来看第一种确认机制的代码:
channel.confirmSelect(); channel.basicPublish("exchange.normal", "rk", MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx".getBytes()); try { if(!channel.waitForConfirms()){ System.out.println("failed"); } } catch (InterruptedException e) { e.printStackTrace(); }
这种方式其实并不能增长吞吐量,由于是同一个线程进行同步确认的固然,咱们可使用一个容器,而且批量进行确认,增长吞吐量。下面是模板:
channel.confirmSelect(); int msgcount = 0; while (true) { channel.basicPublish("exchange.normal", "rk", MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx".getBytes()); // 将发出去的消息存储在一个容器里面 if(++msgcount>34) { msgcount = 0; try { if (channel.waitForConfirms()) { // 将缓存清空 continue; } // 将缓存中的消息重发 } catch (InterruptedException e) { e.printStackTrace(); // 将缓存中的消息重发 } } }
固然,最佳的方式,是经过异步的方式,注册监听器,来处理这种生产者确认的方式。咱们来看看具体的代码模板
channel.confirmSelect(); TreeSet<Long> confirmSet = new TreeSet<>(); channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("nack,seqNo"+deliveryTag+", nultiple:"+multiple); if(multiple){ confirmSet.headSet(deliveryTag-1).clear(); }else{ confirmSet.remove(deliveryTag); } } @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { if(multiple){ confirmSet.headSet(deliveryTag-1).clear(); }else{ confirmSet.remove(deliveryTag); } // 这里要从新发送消息 } }); while(true){ long nextSeq = channel.getNextPublishSeqNo(); channel.basicPublish("exchange.normal", "rk", MessageProperties.PERSISTENT_TEXT_PLAIN, "dlx".getBytes()); confirmSet.add(nextSeq); }
这一部分,主要说几个概念,也是对消息的消费颇有帮助的点
在消费者这一边能够经过一个方法,来设置Qos:
/** * 设置所谓的“服务质量” * * 这个设置主要可以限制在服务端发给消费者消息的时候,最大能保持多少未确认的消息, * 在一个信道上面。所以,Qos就提供了一种基于消费者数据流控制的手段。 * @param prefetchSize 服务端发送给消费者最大消息大小 (使用八进制表示),0表示不控制 * @param prefetchCount 最大服务端发送给消费者的未确认消息数,0表示不控制 * @param global true表示这个设置要应用到此Connection上的各个消费者上面 */ void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException; void basicQos(int prefetchCount, boolean global) throws IOException; void basicQos(int prefetchCount) throws IOException;
针对性的,咱们来讲说global这个参数:
针对global为true的时候要协调多个消费者,这种状况下很是消耗性能,RabbitMQ针对性的修改了global的定义:
可见,主要是把限制范围缩小了,从Connection级别到channel级别。
咱们先来看一段QueueingConsumer代码:
QueueingConsumer consumer = new QueueingConsumer(channel); // channel.basicQos(4); channel.basicConsumer("QueueName",false,"consumer_zzh",consumer); while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); // 对消息作业务逻辑处理 channel.basicAck(dlivery.getEnvelope().getDeliveryTag(),false); }
若是环境不是特别的“傲娇”,其实上面代码也没问题,可是要是一会儿来了很是大量的消息要消费,这个QueueingConsumer就是形成内存溢出状况,由于他内部使用了一个LinkedBlockingQueue,每次都是循环逐条的进行处理,这样,消息确定会堆积,内存占用一会儿就上去了。固然咱们可使用Qos来控制这一点。可是,这东西还会存在下面的缺陷:
因此为了不这么多问题,尽可能都要使用DefaultConsumer的方式进行消费
最后这部分,咱们收拢一下这一章中的一些点。首先咱们来看看消息中间件中消息可靠性的三个级别:
RabbitMQ支持其中的最多一次和最少一次。咱们来看看最少一次投递的时候,要考虑消息可靠性,要考虑如下几个方面:
最多一次,咱们只要生产者随意发送,消费者随意消费,不过这样很难确保消息的可靠性,不会丢失。另外在咱们的业务代码中,要确保消费者的幂等性,以防止消息的重复发送。
至此RabbitMQ的基础与高级的使用方式,已经讲解完了,下面一章节,咱们进入RabbitMQ原理级别的总结。因为是erlang写的,我本人也看不懂erlang,主要就是对核心的几个原理进行记录一下罢了,根本没有源码讲解,因此也请放松,不难,就看你努力不努力了。