RabbitMQ 持久化

默认状况下,exchange、queue、message 等数据都是存储在内存中的,这意味着若是 RabbitMQ 重启、关闭、宕机时全部的信息都将丢失。bash

RabbitMQ 提供了持久化来解决这个问题,持久化后,若是 RabbitMQ 发送 重启、关闭、宕机,下次起到时 RabbitMQ 会从硬盘中恢复exchange、queue、message 等数据。服务器

持久化

RabbitMQ 持久化包含3个部分post

  • exchange 持久化,在声明时指定 durable 为 true
  • queue 持久化,在声明时指定 durable 为 true
  • message 持久化,在投递时指定 delivery_mode=2(1是非持久化)

queue 的持久化能保证自己的元数据不会因异常而丢失,可是不能保证内部的 message 不会丢失。要确保 message 不丢失,还须要将 message 也持久化性能

若是 exchange 和 queue 都是持久化的,那么它们之间的 binding 也是持久化的。ui

若是 exchange 和 queue 二者之间有一个持久化,一个非持久化,就不容许创建绑定。spa

注意:一旦肯定了 exchange 和 queue 的 durable,就不能修改了。若是非要修改,惟一的办法就是删除原来的 exchange 或 queue 后,重现建立 code

拓展

若是将全部的消息都进行持久化操做,这样会严重影响 RabbitMQ 的性能。写入磁盘的速度可比写入内存的速度要慢不少。因此须要在可靠性和吞吐量之间作权衡。队列

将 exchange、queue 和 message 都进行持久化操做后,也不能保证消息必定不会丢失,消息存入RabbitMQ 以后,还须要一段时间才能存入硬盘。RabbitMQ 并不会为每条消息都进行同步存盘,若是在这段时间,服务器宕机或者重启,消息还没来得及保存到磁盘当中,就会丢失。对于这种状况,能够引入 RabiitMQ 镜像队列机制。内存

代码实现

经过代码实现 RabbitMQ 持久化路由

原生的实现方式

原生的 RabbitMQ 客户端须要完成三个步骤。

第一步,设置交换器的持久化

// 三个参数分别为 交换器名、交换器类型、是否持久化
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);  
复制代码

第二步,设置队列的持久化

// 参数1 queue :队列名  
// 参数2 durable :是否持久化  
// 参数3 exclusive :仅建立者可使用的私有队列,断开后自动删除  
// 参数4 autoDelete : 当全部消费客户端链接断开后,是否自动删除队列  
// 参数5 arguments  
channel.queueDeclare(QUEUE_NAME, true, false, false, null);  
复制代码

第三步,设置消息的持久化

// 参数1 exchange :交换器  
// 参数2 routingKey : 路由键  
// 参数3 props : 消息的其余参数,其中 MessageProperties.PERSISTENT_TEXT_PLAIN 表示持久化  
// 参数4 body : 消息体  
channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());  
复制代码

Spring RabbitMQ 的实现方式

Spring RabbitMQ 是对原生的 RabbitMQ 客户端的封装。通常状况下,咱们只须要定义 exchange 和 queue 的持久化。

配置交换机持久化

/**
 * value      交换机名称
 * type       交换机类型,默认 direct
 * durable    是否持久化,默认 true
 * autoDelete 是否自动删除,默认 false
 * internal   是否为内部交换机,默认为 false
 */
@Exchange(value = "exchangeName", type = "direct", durable = "true", autoDelete = "false", internal = "false")
复制代码

配置队列持久化

/**
 * value      队列名称
 * durable    是否持久化
 * exclusive  否为独占队列
 * autoDelete 是否自动删除
 */
@Queue(value = "queryName", durable = "true", exclusive = "false", autoDelete = "false")
复制代码

一个用例

/**
 * 消费消息
 */
@Component
public class ConsumerMessageListener {
    /**
     * 监听指定队列
     *
     * @param message 消息体
     * @param headers 消息头
     * @param channel 通道
     * @return
     * @RabbitListener 指定了 exchange 、key、Queue 后,若是 Rabbitmq 没有会去建立
     */
    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = "exchangeName", type = "direct", durable = "true", autoDelete = "false", internal = "false"),
            key = "routingKeyValue",
            value = @Queue(value = "queryName", durable = "true", exclusive = "false", autoDelete = "false")
    ))
    public void listenerMessage(String message, @Headers Map<String, Object> headers, Channel channel)
            throws IOException {
        System.out.println(message);
        System.out.println(headers);
        //手动 ack
        channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG), false);
    }
}
复制代码

正常思路想 exchange 和 queue 的持久化应该在消息发送端配置,其实也能够配置在消息消费端,RabbitListener 回去检查 exchange 和 queue,若是不存在则建立

相关文章

RabbitMQ 之消息的可靠性投递

RabbitMQ 之消息的可靠性消费

相关文章
相关标签/搜索