默认状况下,exchange、queue、message 等数据都是存储在内存中的,这意味着若是 RabbitMQ 重启、关闭、宕机时全部的信息都将丢失。bash
RabbitMQ 提供了持久化来解决这个问题,持久化后,若是 RabbitMQ 发送 重启、关闭、宕机,下次起到时 RabbitMQ 会从硬盘中恢复exchange、queue、message 等数据。服务器
RabbitMQ 持久化包含3个部分post
queue 的持久化能保证自己的元数据不会因异常而丢失,可是不能保证内部的 message 不会丢失。要确保 message 不丢失,还须要将 message 也持久化性能
若是 exchange 和 queue 都是持久化的,那么它们之间的 binding 也是持久化的。ui
若是 exchange 和 queue 二者之间有一个持久化,一个非持久化,就不容许创建绑定。spa
注意:一旦肯定了 exchange 和 queue 的 durable,就不能修改了。若是非要修改,惟一的办法就是删除原来的 exchange 或 queue 后,重现建立 code
若是将全部的消息都进行持久化操做,这样会严重影响 RabbitMQ 的性能。写入磁盘的速度可比写入内存的速度要慢不少。因此须要在可靠性和吞吐量之间作权衡。队列
将 exchange、queue 和 message 都进行持久化操做后,也不能保证消息必定不会丢失,消息存入RabbitMQ 以后,还须要一段时间才能存入硬盘。RabbitMQ 并不会为每条消息都进行同步存盘,若是在这段时间,服务器宕机或者重启,消息还没来得及保存到磁盘当中,就会丢失。对于这种状况,能够引入 RabiitMQ 镜像队列机制。内存
经过代码实现 RabbitMQ 持久化路由
原生的 RabbitMQ 客户端须要完成三个步骤。
第一步,设置交换器的持久化
// 三个参数分别为 交换器名、交换器类型、是否持久化
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
复制代码
第二步,设置队列的持久化
// 参数1 queue :队列名
// 参数2 durable :是否持久化
// 参数3 exclusive :仅建立者可使用的私有队列,断开后自动删除
// 参数4 autoDelete : 当全部消费客户端链接断开后,是否自动删除队列
// 参数5 arguments
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
复制代码
第三步,设置消息的持久化
// 参数1 exchange :交换器
// 参数2 routingKey : 路由键
// 参数3 props : 消息的其余参数,其中 MessageProperties.PERSISTENT_TEXT_PLAIN 表示持久化
// 参数4 body : 消息体
channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
复制代码
Spring RabbitMQ 是对原生的 RabbitMQ 客户端的封装。通常状况下,咱们只须要定义 exchange 和 queue 的持久化。
配置交换机持久化
/**
* value 交换机名称
* type 交换机类型,默认 direct
* durable 是否持久化,默认 true
* autoDelete 是否自动删除,默认 false
* internal 是否为内部交换机,默认为 false
*/
@Exchange(value = "exchangeName", type = "direct", durable = "true", autoDelete = "false", internal = "false")
复制代码
配置队列持久化
/**
* value 队列名称
* durable 是否持久化
* exclusive 否为独占队列
* autoDelete 是否自动删除
*/
@Queue(value = "queryName", durable = "true", exclusive = "false", autoDelete = "false")
复制代码
一个用例
/**
* 消费消息
*/
@Component
public class ConsumerMessageListener {
/**
* 监听指定队列
*
* @param message 消息体
* @param headers 消息头
* @param channel 通道
* @return
* @RabbitListener 指定了 exchange 、key、Queue 后,若是 Rabbitmq 没有会去建立
*/
@RabbitListener(bindings = @QueueBinding(
exchange = @Exchange(value = "exchangeName", type = "direct", durable = "true", autoDelete = "false", internal = "false"),
key = "routingKeyValue",
value = @Queue(value = "queryName", durable = "true", exclusive = "false", autoDelete = "false")
))
public void listenerMessage(String message, @Headers Map<String, Object> headers, Channel channel)
throws IOException {
System.out.println(message);
System.out.println(headers);
//手动 ack
channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG), false);
}
}
复制代码
正常思路想 exchange 和 queue 的持久化应该在消息发送端配置,其实也能够配置在消息消费端,RabbitListener 回去检查 exchange 和 queue,若是不存在则建立