1.上一篇介绍了在 SpringBoot 中怎么使用 RabbitMQ 来实现 RPC 功能,分享了可能踩到的坑及解决办法;
2.本篇主要介绍消息可能会存在丢失的场景及解决思路,基本上涵盖了可能会遇到的全部的场景。
持久化能够提升 RabbitMQ 的可靠性,以防止在异常状况(好比:重启、关机、宕机等)下的数据丢失。 RabbitMQ 持久化分为三部分:交换机的持久化、队列的持久化、消息的持久化。java
交换机持久化是指将交换机的属性数据存储在磁盘上,当 MQ 的服务器发生意外或关闭以后,在重启 RabbitMQ 时不须要从新手动或执行代码去建立交换机了,交换机会自动被建立,至关于一直存在。
在建立交换机的时候将durable
参数设置为true
便可。 好比,我声明一个类型为 direct 的交换机:spring
/** * 设置交换机,类型为 direct * @return DirectExchange */ @Bean DirectExchange myExchange() { return new DirectExchange(QueueConstants.QUEUE\_EXCHANGE\_NAME, true, false); }
durable
参数设置为true
,则交换机的元数据会被存储在磁盘上,对于一个长期使用的交换机来讲,建议将其设置为持久化。若是不将队列设置为持久化,那么在 RabbitMQ 服务重启以后,相关队列的元数据会丢失,数据也会丢失。队列都没有了,消息也找不到地方存储了。
一样,在建立队列的时候将durable
参数设置为true
便可。数据库
/** * 建立队列 */ @Bean public Queue myQueue() { return new Queue(QueueConstants.RPC_QUEUE1); }
RabbitMQ 的消息是依附于队列存在的,因此要想消息持久化,那么前提是队列也必须设置持久化。segmentfault
在建立消息的时候,添加一个持久化消息的属性(将delivery_mode
设置为 2)。服务器
在 SpringBoot 中使用 rabbitTemplate 发送的消息默认就是持久化的,由于默认已经设置为 delivery_mode = 2
,下面咱们经过查看源码来验证一下。微信
1> sendAndReceive
生产者发送消息的时候会使用 rabbitTemplate 的 sendAndReceive 接口来发送消息:网络
@Nullable public Message sendAndReceive(String exchange, String routingKey, Message message) throws AmqpException { return this.sendAndReceive(exchange, routingKey, message, (CorrelationData)null); }
2> Message
第三个参数 Message 有一个 MessageProperties 属性源码分析
打开 Message.class:性能
public class Message implements Serializable { private static final long serialVersionUID = -7177590352110605597L; private static final String ENCODING = Charset.defaultCharset().name(); private static final Set<String> whiteListPatterns = new LinkedHashSet(Arrays.asList("java.util.*", "java.lang.*")); private final MessageProperties messageProperties; private final byte[] body;
3> MessageProperties
打开 MessageProperties.class :this
static { DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT; DEFAULT_PRIORITY = 0; }
MessageDeliveryMode.class:
public enum MessageDeliveryMode { NON_PERSISTENT, PERSISTENT; private MessageDeliveryMode() { } public static int toInt(MessageDeliveryMode mode) { switch(mode) { case NON_PERSISTENT: return 1; case PERSISTENT: return 2; default: return -1; } }
经过源码查看在 SpringBoot 中使用 rabbimqTemplate 发送的消息默认就是持久化的消息。
1.不知道生产者发送的消息到底是否已经到达 RabbitMQ Server; > 2.不知道生产者发送的消息是否已经成功的分配到队列中去。
开启消息发送确认,经过 ConfirmCallback 接口 和 ReturnCallback 接口 来保障。
备注
具体的操做方式能够参考 RabbitMQ系列之消息确认机制 这篇文章。
消费者收到消息还没来得及处理服务就宕机了。
消费端开启消息确认(ACK),将消息设置为手动确认:
# 开启 ACK(消费者接收到消息时手动确认) spring.rabbitmq.listener.simple.acknowledge-mode=manual
这样虽然服务宕机,可是在重启以后,消费者仍然会消费到该条数据。
备注
具体的操做方式能够参考 RabbitMQ 系列之消息确认机制 这篇文章。
持久化的消息成功存入 RabbitMQ 以后,若是在存入磁盘的这个过程当中 RabbitMQ 服务节点宕机、异常重启等,消息还没来得及存入磁盘。
可使用 RabbitMQ 的镜集群像模式进行部署,若是主节点在这个特殊的时间段内挂掉了,会自动切换到从节点,这样就保证了高可用性,除非整个集群都挂掉。
备注
了解更多关于 RabbitMQ 镜像集群模式能够参考 RabbitMQ系列之部署模式 这篇文章。
好比设置为持久化的消息,在保存到磁盘的过程当中,当前队列节点挂了,存储节点的磁盘也挂了。
欢迎你们关注个人微信公众号阅读更多文章: