咱们在项目中使用RabbitMQ时,可能会遇到这样的问题:假如在一个订单系统中,用户付款成功,此时咱们往RabbitMQ消息队列中发送一条消息,而后指望消息消费者修改订单状态,可是最终实际订单状态却并无被成功修改.遇到这种问题咱们排查的思路以下:java
1️⃣.消息是否被成功的发送到消息队列?web
2️⃣.消息是否有丢失的状况?spring
3️⃣.消息是否被成功的消费了?数据库
在生产环境中是不容许出现消息发送/消费错误的状况的,由于这可能会给企业带来巨大的损失.本文将介绍RabbitMQ如何保证消息的可靠性(生产者保证消息可靠投递,消费者保证消息可靠消费,RabbitMQ持久化).编程
1️⃣.设置交换机、队列和消息都为持久化;性能优化
2️⃣.生产者消息确认机制;服务器
3️⃣.消费者消息确认机制;app
4️⃣.死信队列.负载均衡
在生产过程当中,不免会发生服务器宕机的事情,RabbitMQ也不例外.可能因为某种特殊状况下的异常而致使RabbitMQ宕机从而重启,那么这个时候对于消息队列里的数据,包括交换机、队列以及队列中存在的消息的恢复就显得尤其重要了.RabbitMQ自己带有持久化机制,包括交换机、队列以及消息的持久化.持久化的主要机制就是将信息写入磁盘,当RabbtiMQ服务宕机重启后,从磁盘中读取存入的持久化信息,恢复数据(固然凡事都不是100%的,只能尽最大程度的保证消息不会丢失).ide
持久化: 保证服务器重启的时候消息不丢失,重点解决服务器的异常崩溃而致使的消息丢失问题.可是,将全部的消息都设置为持久化,会严重影响RabbitMQ的性能,写入硬盘的速度比写入内存的速度慢的不仅一点点.对于可靠性要求不是那么高的消息能够不持久化以提升总体的吞吐率.在选择是否要将消息进行持久化时,须要在可靠性和吞吐量之间作一个权衡.
对于某些应用场景,如大流量的订单交易系统,为了避免影响性能,咱们能够不设置持久化.可是咱们会定时扫描数据库中的未发送成功的消息,进行重试发送.
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel =connection.createChannel(); #该交换机默认没有持久化 channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
使用这种方法声明的交换机,默认不是持久化的,在服务器重启以后,交换机会消失.咱们在管理台的Exchange页签下查看交换机,能够看到使用上述方法声明的交换机,Features一列是空的,即没有任何附加属性.

咱们换用另外一种方法声明交换机:
try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection=factory.newConnection(); Channel channel = connection.createChannel(); #该交换机实现了持久化 channel.exchangeDeclare(EXCHANGE_NAME,"fanout", true); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }
查看一下方法的说明:
/** * Actively declare a non-autodelete exchange with no extra arguments * @see com.rabbitmq.client.AMQP.Exchange.Declare * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk * @param exchange the name of the exchange * @param type the exchange type * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart) * @throws java.io.IOException if an error is encountered * @return a declaration-confirm method to indicate the exchange was successfully declared */ Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
咱们能够看到第三个参数durable,若是为true时则表示要作持久化,当服务重启时,交换机依然存在.因此使用该方法声明的交换机是下面这个样子的(作测试的时候,须要先在管理台删掉原来的同名交换机).D表示durable,鼠标放在上边会显示为true.

与交换机的持久化相同,队列的持久化也是经过durable参数实现的,默认生成的随机队列不是持久化的.前面示例中声明的带有咱们自定义名字的队列都是持久化的.
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); #第个参数用于进行持久化设置 channel.queueDeclare(QUEUE_NAME, true, false, false, null);
看一下方法的定义:
/** * Declare a queue * @see com.rabbitmq.client.AMQP.Queue.Declare * @see com.rabbitmq.client.AMQP.Queue.DeclareOk * @param queue the name of the queue * @param durable true if we are declaring a durable queue (the queue will survive a server restart) * @param exclusive true if we are declaring an exclusive queue (restricted to this connection) * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) * @param arguments other properties (construction arguments) for the queue * @return a declaration-confirm method to indicate the queue was successfully declared * @throws java.io.IOException if an error is encountered */ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
第二个参数跟交换机方法的参数同样,true表示作持久化.当RabbitMQ服务重启时,队列依然存在.
这里说一下后边的三个参数:
exclusive是排他队列:若是一个队列被声明为排他队列,那么这个队列只能被第一次声明他的链接所见,并在链接断开的时候自动删除.这里有三点须要说明:
1️⃣.同一个链接的不一样channel,是能够访问同一链接下建立的排他队列的;
2️⃣.排他队列只能被声明一次,其余链接不容许声明同名的排他队列;
3️⃣.即便排他队列是持久化的,当链接断开或者客户端退出时,排他队列依然会被删除.
autoDelete是自动删除:为true时,当没有任何消费者订阅该队列时,队列会被自动删除;
arguments:其它参数.
消息的持久化是指当消息从交换机发送到队列以后,被消费者消费以前,服务器忽然宕机重启,消息仍然存在.消息持久化的前提是队列持久化,假如队列不是持久化,那么消息的持久化毫无心义.
经过以下代码设置消息的持久化:
channel.basicPublish(EXCHANGE_NAME,"",MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
其中MessageProperties.PERSISTENT_TEXT_PLAIN
是设置持久化的参数.
咱们查看basicPublish方法的定义.
/** * Publish a message * @see com.rabbitmq.client.AMQP.Basic.Publish * @param exchange the exchange to publish the message to * @param routingKey the routing key * @param props other properties for the message - routing headers etc * @param body the message body * @throws java.io.IOException if an error is encountered */ void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
在看下BasicProperties的类型
public static class BasicProperties { private String contentType; private String contentEncoding; private Map<String,Object> headers; #deliveryMode是设置消息持久化的参数 private Integer deliveryMode; private Integer priority; private String correlationId; private String replyTo; private String expiration; private String messageId; private Date timestamp; private String type; private String userId; private String appId; private String clusterId;
其中deliveryMode是设置消息持久化的参数,等于1表示不设置持久化,等于2设置持久化.PERSISTENT_TEXT_PLAIN
是进行实例化的一个deliveryMode=2的常量对象,便于编程.
public static final BasicProperties PERSISTENT_TEXT_PLAIN = new BasicProperties("text/plain",null,null,2,0,null, null,null,null,null,null, null,null,null);
设置了队列的持久化和消息的持久化以后,当服务器宕机重启,存在队列中未发送的消息会依然存在.
持久化的消息在到达队列时就被写入到磁盘,而且若是能够,持久化的消息也会在内存中保存一份备份,这样能够提升必定的性能,只有在内存吃紧的时候才会从内存中清除.
非持久化的消息通常只保存在内存中,在内存吃紧的时候会被换入到磁盘中,以节省内存空间.
以上就是关于RabbitMQ中持久化的一些内容,可是并不会严格的100%保证信息不会丢失.
RabbitMQ的消息确认有两种:
消息发送确认: 这种是用来确认生产者将消息发送到交换机,交换机传递到队列的过程当中,消息是否成功投递.发送确认分为两步,一是确认是否到达交换机,二是确认是否到达队列.
消费接收确认: 这种是确认消费者是否成功消费了队列中的消息.
经过实现ConfirmCallBack
接口,消息发送到交换机Exchange后触发该回调.

使用该功能须要开启确认,spring-boot中配置以下:
spring.rabbitmq.publisher-confirms = true
经过实现ReturnCallback
接口,若是消息从交换机发送到对应的队列失败时触发(好比根据发送消息时指定的routingKey找不到队列时会触发该回调).

使用该功能须要开启确认,spring-boot中配置以下:
spring.rabbitmq.publisher-returns = true
AcknowledgeMode.NONE: 不确认
AcknowledgeMode.AUTO: 自动确认
AcknowledgeMode.MANUAL: 手动确认
spring-boot中配置方法:
spring.rabbitmq.listener.simple.acknowledge-mode = manual
3.2.1 成功确认
void basicAck(long deliveryTag, boolean multiple) throws IOException; #deliveryTag:该消息的index; #multiple: 是否批量. true: 将一次性ack全部小于deliveryTag的消息. #消费者成功处理后,调用 #channel.basicAck(message.getMessageProperties().getDeliveryTag(), false) #方法对消息进行确认.
3.2.2 失败确认
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException; #deliveryTag:该消息的index. #multiple:是否批量. true:将一次性拒绝全部小于deliveryTag的消息. #requeue:被拒绝的消息是否从新进入队列.
void basicReject(long deliveryTag, boolean requeue) throws IOException; #deliveryTag:该消息的index. #requeue: 被拒绝的是否从新入队列.
channel.basicNack
与 channel.basicReject
的区别在于basicNack能够批量拒绝多条消息,而basicReject一次只能拒绝一条消息.
当消息发送出去以后,咱们如何知道消息有没有正确到达exchange呢?若是在这个过程当中,消息丢失了,咱们根本不知道发生了什么,也不知道是什么缘由致使消息发送失败了.
为解决这个问题,主要有以下两种方案:
经过事务机制实现;
经过生产者消息确认机制(publisher confirm)实现.
可是使用事务机制实现会严重下降RabbitMQ的消息吞吐量,咱们采用一种轻量级的方案---生产者消息确认机制.
简而言之,就是 生产者发送的消息一旦被投递到全部匹配的队列以后,就会发送一个确认消息给生产者,这就使得生产者知晓消息已经正确到达了目的地.
若是消息和队列是持久化存储的,那么确认消息会在消息写入磁盘以后发出.
Mandatory参数:当Mandatory参数设为true时,若是目的不可达,会发送消息给生产者,生产者经过一个回调函数来获取该信息.
为了保证消息从队列可靠地到达消费者,RabbitMQ提供了消费者的消息确认机制(message acknowledgement).采用消息确认机制以后,消费者就有足够的时间来处理消息,不用担忧处理消息过程当中消费者进程挂掉后消息丢失的问题.由于RabbitMQ会一直等待并持有消息,直到消费者确认了该消息.
DLX,Dead Letter Exchange
的缩写,又被称为死信邮箱、死信交换机.DLX就是一个普通的交换机,和通常的交换机没有任何区别.
当一个消息在一个队列中变成死信(dead message)时,经过这个交换机将死信发送到死信队列中(指定好相关参数,RabbitMQ会自动发送).
1️⃣.消息被拒绝(basic.reject或basic.nack)而且requeue=false;
2️⃣.消息TTL过时;
3️⃣.队列达到最大长度(队列满了,没法再添加数据到mq中).
在定义业务队列的时候,能够考虑指定一个死信交换机,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上,这样就方便咱们查看消息失败的缘由.
定义业务(普通)队列的时候指定参数:
@Bean public Queue helloQueue() { //将普通队列绑定到私信交换机上 Map<String, Object> args = new HashMap<>(2); args.put(DEAD_LETTER_EXCHANGE_KEY, deadExchangeName); args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey); Queue queue = new Queue(queueName, true, false, false, args); return queue; }
建立一个SpringBoot项目,在该项目中建立一个新的模块,项目结构如图:

注意:本案例是把消息发送和消息接收的实现写在了同一个项目模块中了,没有用两个模块实现.
在模块的pom.xml中添加以下依赖包.
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
# 开启发送确认 spring.rabbitmq.publisher-confirms=true # 开启发送失败退回 spring.rabbitmq.publisher-returns=true
# 开启ACK spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring: application: name: rabbitmq-reliability rabbitmq: host: localhost port: 5672 username: guest password: guest # 开启发送确认 publisher-confirms: true # 开启发送失败退回 publisher-returns: true # 开启ACK listener: simple: acknowledge-mode: manual
@Configuration public class RabbitConfig { public final static String queueName = "hello_queue"; /** * 死信队列: */ public final static String deadQueueName = "dead_queue"; public final static String deadRoutingKey = "dead_routing_key"; public final static String deadExchangeName = "dead_exchange"; /** * 死信队列 交换机标识符 */ public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange"; /** * 死信队列交换机绑定键标识符 */ public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key"; @Bean public Queue helloQueue() { //将普通队列绑定到私信交换机上 Map<String, Object> args = new HashMap<>(2); args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName); args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey); Queue queue = new Queue(queueName, true, false, false, args); return queue; } /** * 死信队列: */ @Bean public Queue deadQueue() { Queue queue = new Queue(deadQueueName, true); return queue; } @Bean public DirectExchange deadExchange() { return new DirectExchange(deadExchangeName); } @Bean public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) { return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey); } }

注释: hello_queue就配置了死信交换机、死信队列.
@Component public class HelloSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback{ @Autowired private RabbitTemplate rabbitTemplate; public void send(String exchange, String routingKey) { String context = "你好如今是 " + new Date(); System.out.println("send content = " + context); this.rabbitTemplate.setMandatory(true); this.rabbitTemplate.setConfirmCallback(this); this.rabbitTemplate.setReturnCallback(this); this.rabbitTemplate.convertAndSend(exchange, routingKey, context); } /** * 确认生产者是否把消息成功的发送到了交换机; * 确认后回调: * @param correlationData * @param ack * @param cause */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { System.out.println("send ack fail, cause = " + cause); } else { System.out.println("send ack success"); } } /** * 交换机中的消息是否被成功的发送到队列. * 失败后return回调: * * @param message * @param replyCode * @param replyText * @param exchange * @param routingKey */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey); } }
@Component @RabbitListener(queues = RabbitConfig.queueName) public class HelloReceiver { @RabbitHandler public void process(String hello, Channel channel, Message message) throws IOException { try { Thread.sleep(2000); System.out.println("睡眠2s"); } catch (InterruptedException e) { e.printStackTrace(); } try { //告诉服务器收到这条消息 已经被我消费了 能够在队列删掉;不然消息服务器觉得这条消息没处理掉 后续还会在发 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); System.out.println("receiver success = " + hello); } catch (Exception e) { e.printStackTrace(); //丢弃这条消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); System.out.println("receiver fail"); } } }
分为4种场景来测试.建立一个Controller,建立几个接口方法进行测试.

@RequestMapping("/send1") @ResponseBody public String send1() { helloSender.send(null, RabbitConfig.queueName); return "success"; }
@RequestMapping("/send2") @ResponseBody public String send2() { helloSender.send("fail-exchange", RabbitConfig.queueName); return "success"; }
@RequestMapping("/send3") @ResponseBody public String send3() { helloSender.send(null, "fail-queue"); return "success"; }
@RequestMapping("/send4") @ResponseBody public String send4() { helloSender.send("fail-exchange", "fail-queue"); return "success"; }
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
测试结果: 消息被正常消费,消息从队列中删除.
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
测试结果: 消息会被重复消费,一直保留在队列当中.
当执行这行代码的时候:
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
消息会被加入到死信队列中:

除了咱们上面讲的基本可靠性保证外,其实还有不少性能优化方案、可靠性保证方案:集群监控、流控、镜像队列、HAProxy+Keeplived高可靠负载均衡.