一. 为何有消息确认机制git
在RabbitMq中,一个消息从产生到最终的消息接受,中间大体会有三个环节,首先是消息到达交换机、而后是消息经过交换机到达队列,最后消费者消费绑定的队列消息。github
可是在这个过程当中,若是出现网络或者系统的异常,就会致使消息不能被正常消费。若是不能正常消费消息,会形成两方面的问题。spring
1.1 在服务端
消息到达队列,可是没有消费者去消费,就会形成消息积压,被积压的消息会存入缓存,直到有消费者进行消费。若是一直没有消费者进行消费,那么就会直接将内存占满,影响服务器性能。缓存
1.2 消费端
一个消息一旦被消费后,那么就会从队列删除。若是说消息已经到达消费者,可是消费者处理消息以前系统出现了异常,那么就至关于这条消息丢失了,是个很大的问题。springboot
因此RabbitMq才会出现消息确认机制。对应的也是服务端客客户端两个方面解决服务器
2、 怎么使用消息确认机制
2.1 消息发送确认
发送的确认也是分为两个步骤:到交换机的确认 ConfirmCallback 和到队列的确认 ReturnCallback网络
这些确认机制默认都是不开启的,在SpringBoot 项目中,咱们能够在配置文件中开启:app
spring.rabbitmq.publisher-confirms = true
spring.rabbitmq.publisher-returns = true
或者 在配置链接工厂的时候开启:dom
@Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost("/"); //开启到交换机的确认 connectionFactory.setPublisherConfirms(true); //开启到队列的确认 connectionFactory.setPublisherReturns(true); return connectionFactory; }
在代码中实现 RabbitTemplate.ConfirmCallback 接口,若是消息被交换机正常接受,就会回调confirm 方法,参数的含义经过代码能够知晓。ide
实现 RabbitTemplate.ReturnCallback 接口,若是消息不能被发送到队列,就会调用ReturnedMessage 方法。
注意:一个是接收成功调用,一个是接收失败调用
@Component public class ASender implements RabbitTemplate.ReturnCallback,RabbitTemplate.ConfirmCallback { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private RabbitTemplate rabbitTemplate; /** * 回调 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { logger.info(" 回调id:" + correlationData); if (ack) { logger.info("消息成功消费"); } else { logger.info("消息消费失败:" + cause); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { logger.info("消息内容:{}", new String(message.getBody())); logger.info("回复文本:{},回复代码:{}", replyText, replyCode); logger.info("交换器名称:{},路由键:{}", exchange, routingKey); } @PostConstruct public void init(){ rabbitTemplate.setReturnCallback(this); rabbitTemplate.setConfirmCallback(this); } public void sendMsg(String content) { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(EXCHANGE_C, "aa.apple.big", content, correlationId); } }
2.2 消息接受确认
消费端消息经过 ACK 确认是否被正确接收,每一个 Message 都要被确认(acknowledged),能够手动去 ACK 或自动 ACK
ACK 确认模式分为三种:
- AcknowledgeMode.NONE:自动确认
- AcknowledgeMode.AUTO:根据状况确认
- AcknowledgeMode.MANUAL:手动确认
默认是自动确认,开启手动确认的方式也是两种方式:
配置文件配置:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
另外一种是在RabbitListenerContainerFactory配置:
@Bean public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); //开启手动 ack return factory; }
在客户端接受消息:
@RabbitHandler public void processMessage2(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) { System.out.println(message); try { channel.basicAck(tag, false); // 确认消息 logger.info("消费者成功确认" + message); } catch (IOException e) { e.printStackTrace(); } }
确认 basicAck 参数解释:
- deliveryTag(惟一标识 ID):当一个消费者向 RabbitMQ 注册后,会创建起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它表明了 RabbitMQ 向该 Channel 投递的这条消息的惟一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel
- multiple:为了减小网络流量,手动确承认以被批处理,当该参数为 true 时,则能够一次性确认 delivery_tag 小于等于传入值的全部消息
部分参考: https://www.jianshu.com/p/2c5eebfd0e95