RabbitMQ 可靠投递

RabbitMQ 可靠投递

标签: RabbitMQ shovel-plugin ConfirmCallback RabbitMQ消息投递node


  • 背景
  • confirmCallback 确认模式
  • returnCallback 未投递到 queue 退回模式
  • shovel-plugin 跨机房可靠投递

背景

在使用 RabbitMQ 的时候,做为消息发送方但愿杜绝任何消息丢失或者投递失败场景。RabbitMQ 为咱们提供了两个选项用来控制消息的投递可靠性模式。服务器

rabbitmq 整个消息投递的路径为:
producer->rabbitmq broker cluster->exchange->queue->consumer网络

messageproducerrabbitmq broker cluster 则会返回一个 confirmCallback
messageexchange->queue 投递失败则会返回一个 returnCallback 。咱们将利用这两个 callback 控制消息的最终一致性和部分纠错能力。架构

confirmCallback 确认模式

在建立 connectionFactory 的时候设置 PublisherConfirms(true) 选项,开启 confirmcallbacktcp

CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setPublisherConfirms(true);//开启confirm模式
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
        if (!ack) {
               log.error("消息发送失败!" + cause + data.toString());
        } else {
            log.info("消息发送成功,消息ID:" + (data != null ? data.getId() : null));
        }
    });

咱们来看下 ConfirmCallback 接口。插件

public interface ConfirmCallback {

        /**
         * Confirmation callback.
         * @param correlationData correlation data for the callback.
         * @param ack true for ack, false for nack
         * @param cause An optional cause, for nack, when available, otherwise null.
         */
        void confirm(CorrelationData correlationData, boolean ack, String cause);

    }

重点是 CorrelationData 对象,每一个发送的消息都须要配备一个 CorrelationData 相关数据对象,CorrelationData 对象内部只有一个 id 属性,用来表示当前消息惟一性。code

发送的时候建立一个 CorrelationData 对象。orm

User user = new User();
user.setID(1010101L);
user.setUserName("plen");

rabbitTemplate.convertAndSend(exchange, routing, user,
        message -> {
        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
            return message;
        },
new CorrelationData(user.getID().toString()));

这里将 user ID 设置为当前消息 CorrelationData id 。固然这里是纯粹 demo,真实场景是须要作业务无关消息 ID 生成,同时要记录下这个 id 用来纠错和对帐。server

消息只要被 rabbitmq broker 接收到就会执行 confirmCallback,若是是 cluster 模式,须要全部 broker 接收到才会调用 confirmCallback对象

broker 接收到只能表示 message 已经到达服务器,并不能保证消息必定会被投递到目标 queue 里。因此须要用到接下来的 returnCallback

returnCallback 未投递到queue退回模式

confrim 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。在有些业务场景下,咱们须要保证消息必定要投递到目标 queue 里,此时就须要用到 return 退回模式。

一样建立 ConnectionFactory 到时候须要设置 PublisherReturns(true) 选项。

CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setPublisherReturns(true);//开启return模式
rabbitTemplate.setMandatory(true);//开启强制委托模式

rabbitTemplate.setReturnCallback((message, replyCode, replyText,
                    exchange, routingKey) ->
    log.info(MessageFormat.format("消息发送ReturnCallback:{0},{1},{2},{3},{4},{5}", message, replyCode, replyText, exchange, routingKey)));

这样若是未能投递到目标 queue 里将调用 returnCallback ,能够记录下详细到投递数据,按期的巡检或者自动纠错都须要这些数据。

shovel-plugin 跨机房可靠投递

RabbitMQ 在跨机房集成提供了一个不错的插件 shovel 。使用 shovel-plugin 插件很是方便,shovel 能够接受机房之间的网络断开、机器下线等不稳定因素。

这里有两个 broker

10.211.55.3 rabbit_node1
10.211.55.4 rabbit_node2

咱们但愿将发送给 rabbit_node1 plen.queue 的消息传输到 rabbit_node2 plen.queue 中。咱们先开启 rabbit_node1shovel-plugin

先看下当前 RabbitMQ 版本是否安装了 shovel-plugin,若是有的话直接开启。

rabbitmq-plugins  list
rabbitmq-plugins  enable rabbitmq_shovel
rabbitmq-plugins  enable rabbitmq_shovel_management

而后就能够在 Admin 面板里看到这个设置选项,怎么设置这里就不介绍了。主要就是配置下 amqp 协议地址,amqp://user:password@server-name/my-vhost

若是配置没有问题的话,应该是这样的一个状态,说明已经顺利链接到 rabbit_node2 broker


咱们来看下 rabbit_node1rabbit_node2Connections 面板。
rabbit_node1(10.211.55.3):

rabbit_node2(10.211.55.4):

RabbitMQ shovel-plugin 插件在 rabbit_node1 broker 建立了两个 tcp 链接,端口 39544 链接是用来消费 plen.queue 里的消息,端口 55706 链接是用来推送消息给 rabbit_node2

咱们来看下 rabbit_node1 tcp 链接状态:

tcp6       0      0 10.211.55.3:5672        10.211.55.3:39544       ESTABLISHED
tcp        0      0 10.211.55.3:55706       10.211.55.4:5672        ESTABLISHED

rabbit_node2 tcp 链接状态:

tcp6       0      0 10.211.55.4:5672        10.211.55.3:55706       ESTABLISHED

为了验证 shovel-plugin 稳定性,咱们将 rabbit_node2 下线。

而后再发送消息,发现消息会如今 rabbit_node1 plen.queue 里待着,一旦 shovel-plugin 链接恢复将消费 rabbit_node1 plen.queue 消息,而后投递给 rabbit_node2 plen.queue

做者:王清培 (沪江集团资深JAVA架构师)

相关文章
相关标签/搜索