rabbit - producer的confirm和consumer的ack模式

本篇和你们分享的是关于rabbit的生产和消费方的一些实用的操做;正如文章标题,主要内容如producer的confirm和consumer的ack,这二者使用的模式都是用来保证数据完整性,防止数据丢失。spring

  • producer的confirm模式
  • consumer的ack模式

producer的confirm模式

首先,有这样一种业务场景1:a系统在作活动前,须要给用户的手机发送一条活动内容短信但愿用户来参加,由于用户量有点大,因此经过往短信mq中插入数据方式,让短信服务来消费mq发短信;编程

此时插入mq消息的服务为了保证给全部用户发消息,而且要在短期内插入完成(所以用到了异步插入方式(快速)),咱们就须要知道每次插入mq是否成功,若是不成功那咱们能够收集失败的信息后补发(所以confirm模式排上了用场);如图设计:springboot

 在springboot中可使用基于amqp封装的工厂类来开启confirm模式,而后经过RabbitTemplate模板来设置回调函数,以下代码:网络

 1     ///region producer生产 - confirm模式
 2 
 3     public RabbitTemplate getRabbitTemplate(RabbitTemplate.ConfirmCallback confirmCallback) {
 4         return this.getRabbitTemplate(this.connectionFactory(), confirmCallback);
 5     }
 6 
 7     public RabbitTemplate getRabbitTemplate(CachingConnectionFactory connectionFactory, RabbitTemplate.ConfirmCallback confirmCallback) {
 8         RabbitTemplate template = new RabbitTemplate(connectionFactory);
 9         //product开启confirm模式
10         connectionFactory.setPublisherConfirms(true);
11         //设置confirm回调处理
12         template.setConfirmCallback(confirmCallback);
13         return template;
14     }
15     ///endregion

这里经过RabbitTemplate.ConfirmCallback函数编程来传递咱们自定义的回调方法,以下收集confirm返回的结果信息:异步

1         RabbitUtil rabbitUtil = new RabbitUtil(this.getFirstNode().getLink());
2         RabbitTemplate template = rabbitUtil.getRabbitTemplate((a, b, c) -> {
3             System.out.println("firstNodeTpl - ConfirmCallback的Id:" + a.getId() + ";状态:" + b + ";信息:" + c);
4         });

最后再经过RabbitTemplate实例的convertAndSend方法发送mq信息,咱们可以在日志中看到以下记录的信息:函数

这里的状态true:表示send成功,false:表示send失败;一般false的时候信息c会有响应的错误提示,这里把网络断开,以下错误提示:fetch

consumer的ack模式

再来,有这样一种场景2:短信服务去消费mq队列信息时,假若服务调用的运营商发送短信接口异常了(短信运营商接口欠费),咱们此时的短信是发送失败的,用户也收不到短信,可是在默认(默认开启ack)前提下mq消息已经被消费了rabbit中没有记录了(kafka例外);想要mq消息在业务逻辑异常时还存在,那么可使用ack方式;this

在springboot中可使用基于amqp封装的工厂类关闭自动ack模式,改成手动ack方式;只有当业务代码流程走完后,最后经过代码设置ack标识,来通知rabbit消息能够丢弃了;若是设置了手动模式后,又没有提交ack标识,那么mq中的消息一直存在没法释放(每次consumer消费后,rabbit会把noack的消息重复放入队列中):spa

 1     ///region consumer监听 - 手动ack
 2     public SimpleRabbitListenerContainerFactory listenerContainerFactory() {
 3         return this.listenerContainerFactory(this.connectionFactory());
 4     }
 5 
 6     public SimpleRabbitListenerContainerFactory listenerContainerFactory(ConnectionFactory connectionFactory) {
 7         SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
 8         factory.setConnectionFactory(connectionFactory);
 9         //代码手动ack
10         factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
11         //开启消费者数量
12         factory.setConcurrentConsumers(2);
13         //每次接受数据量,默认250
14         factory.setPrefetchCount(300);
15         return factory;
16     }
17     ///endregion

经过链接工厂设置手动ack方式,而后获取mq消息后,走完正常业务逻辑,最后再手动通知ack释放消息,以下:设计

1     @RabbitListener(containerFactory = "firstNodeListener", queues = {"${shenniu.rabbits.firstNode.queue}"})
2     private void firstNodeListener(String msg, Channel channel, Message message) {
3         try {
4             long deliverTag = message.getMessageProperties().getDeliveryTag();
5             System.out.println("firstNodeListener - 消费消息 [" + deliverTag + "] - " + msg);
6             channel.basicAck(deliverTag, true);
7         } catch (Exception ex) {
8         }
9     }

这里ack主要根据mq消息的惟一编号(deliverTag)来通知;若是咱们不设置ack确认,那么消息状态会是这样以下rabbit管理后台:

相关文章
相关标签/搜索