标签: RabbitMQ shovel-plugin ConfirmCallback RabbitMQ消息投递node
在使用 RabbitMQ 的时候,做为消息发送方但愿杜绝任何消息丢失或者投递失败场景。RabbitMQ 为咱们提供了两个选项用来控制消息的投递可靠性模式。服务器
rabbitmq 整个消息投递的路径为:
producer->rabbitmq broker cluster->exchange->queue->consumer网络
message 从 producer 到 rabbitmq broker cluster 则会返回一个 confirmCallback 。
message 从 exchange->queue 投递失败则会返回一个 returnCallback 。咱们将利用这两个 callback 控制消息的最终一致性和部分纠错能力。架构
在建立 connectionFactory 的时候设置 PublisherConfirms(true) 选项,开启 confirmcallback 。tcp
CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setPublisherConfirms(true);//开启confirm模式
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory); rabbitTemplate.setConfirmCallback((data, ack, cause) -> { if (!ack) { log.error("消息发送失败!" + cause + data.toString()); } else { log.info("消息发送成功,消息ID:" + (data != null ? data.getId() : null)); } });
咱们来看下 ConfirmCallback 接口。插件
public interface ConfirmCallback { /** * Confirmation callback. * @param correlationData correlation data for the callback. * @param ack true for ack, false for nack * @param cause An optional cause, for nack, when available, otherwise null. */ void confirm(CorrelationData correlationData, boolean ack, String cause); }
重点是 CorrelationData 对象,每一个发送的消息都须要配备一个 CorrelationData 相关数据对象,CorrelationData 对象内部只有一个 id 属性,用来表示当前消息惟一性。code
发送的时候建立一个 CorrelationData 对象。orm
User user = new User(); user.setID(1010101L); user.setUserName("plen"); rabbitTemplate.convertAndSend(exchange, routing, user, message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); return message; }, new CorrelationData(user.getID().toString()));
这里将 user ID 设置为当前消息 CorrelationData id 。固然这里是纯粹 demo,真实场景是须要作业务无关消息 ID 生成,同时要记录下这个 id 用来纠错和对帐。server
消息只要被 rabbitmq broker 接收到就会执行 confirmCallback,若是是 cluster 模式,须要全部 broker 接收到才会调用 confirmCallback。对象
被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息必定会被投递到目标 queue 里。因此须要用到接下来的 returnCallback 。
confrim 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。在有些业务场景下,咱们须要保证消息必定要投递到目标 queue 里,此时就须要用到 return 退回模式。
一样建立 ConnectionFactory 到时候须要设置 PublisherReturns(true) 选项。
CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setPublisherReturns(true);//开启return模式
rabbitTemplate.setMandatory(true);//开启强制委托模式 rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> log.info(MessageFormat.format("消息发送ReturnCallback:{0},{1},{2},{3},{4},{5}", message, replyCode, replyText, exchange, routingKey)));
这样若是未能投递到目标 queue 里将调用 returnCallback ,能够记录下详细到投递数据,按期的巡检或者自动纠错都须要这些数据。
RabbitMQ 在跨机房集成提供了一个不错的插件 shovel 。使用 shovel-plugin 插件很是方便,shovel 能够接受机房之间的网络断开、机器下线等不稳定因素。
这里有两个 broker :
10.211.55.3 rabbit_node1
10.211.55.4 rabbit_node2
咱们但愿将发送给 rabbit_node1 plen.queue 的消息传输到 rabbit_node2 plen.queue 中。咱们先开启 rabbit_node1 的 shovel-plugin。
先看下当前 RabbitMQ 版本是否安装了 shovel-plugin,若是有的话直接开启。
rabbitmq-plugins list rabbitmq-plugins enable rabbitmq_shovel rabbitmq-plugins enable rabbitmq_shovel_management
而后就能够在 Admin 面板里看到这个设置选项,怎么设置这里就不介绍了。主要就是配置下 amqp 协议地址,amqp://user:password@server-name/my-vhost 。
若是配置没有问题的话,应该是这样的一个状态,说明已经顺利链接到 rabbit_node2 broker 。
咱们来看下 rabbit_node1 和 rabbit_node2 的 Connections 面板。
rabbit_node1(10.211.55.3):
rabbit_node2(10.211.55.4):
RabbitMQ shovel-plugin 插件在 rabbit_node1 broker 建立了两个 tcp 链接,端口 39544 链接是用来消费 plen.queue 里的消息,端口 55706 链接是用来推送消息给 rabbit_node2 。
咱们来看下 rabbit_node1 tcp 链接状态:
tcp6 0 0 10.211.55.3:5672 10.211.55.3:39544 ESTABLISHED tcp 0 0 10.211.55.3:55706 10.211.55.4:5672 ESTABLISHED
rabbit_node2 tcp 链接状态:
tcp6 0 0 10.211.55.4:5672 10.211.55.3:55706 ESTABLISHED
为了验证 shovel-plugin 稳定性,咱们将 rabbit_node2 下线。
而后再发送消息,发现消息会如今 rabbit_node1 plen.queue 里待着,一旦 shovel-plugin 链接恢复将消费 rabbit_node1 plen.queue 消息,而后投递给 rabbit_node2 plen.queue 。
做者:王清培 (沪江集团资深JAVA架构师)