RabbitMQ 的消息确认机制以下:java
从图中咱们能够看出:git
这两个机制都是收到 TCP 协议的启发,它们对于数据安全相当重要。
下面就分别从生产者、消费者两个方面结合实例来认识消息确认机制。github
备注:web
咱们先来看一下RabbitMQ 消息投递和接收的一个完整链路以下:spring
消息投递的链路用文字表示:producer->rabbitmq broker cluster->exchange->queue->consumer
segmentfault
因为:安全
在编码时咱们能够用两个选项用来控制消息投递的可靠性:服务器
confirmCallback
;returnCallback
咱们能够利用这两个 callback 接口来控制消息的一致性和处理一部分的异常状况。微信
server.port=10420 spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.username=guest spring.rabbitmq.password=guest # 开启发送确认 spring.rabbitmq.publisher-confirms=true # 开启发送失败退回(消息有没有找到合适的队列) spring.rabbitmq.publisher-returns=true
在 RabbitConfig 配置类里,定义 RabbitTemplate Bean,使用 callback 接口:网络
/** * RabbitMQ配置 * * @author lyf * @公众号 全栈在路上 * @GitHub https://github.com/liuyongfei1 * @date 2020-05-17 17:20 **/ @Slf4j @Configuration public class RabbitConfig { @Autowired CachingConnectionFactory cachingConnectionFactory; @Bean RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory); // 消息只要被 rabbitmq broker 接收到就会执行 confirmCallback // 若是是 cluster 模式,须要全部 broker 接收到才会调用 confirmCallback // 被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息必定会被投递到目标 queue 里 rabbitTemplate.setConfirmCallback((data, ack, cause) -> { String msgId = data.getId(); if (ack) { log.info(msgId + ": 消息发送成功"); } else { log.info(msgId + ": 消息发送失败"); } }); // confirm 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。 // 在有些业务场景下,咱们须要保证消息必定要投递到目标 queue 里,此时就须要用到 return 退回模式 // 这样若是未能投递到目标 queue 里将调用 returnCallback,能够记录下详细到投递数据,按期的巡检或者自动纠错都须要这些数据 rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { log.info(MessageFormat.format("消息发送失败,ReturnCallback:{0},{1},{2},{3},{4},{5}", message, replyCode, replyText, exchange, routingKey)); // TODO 作消息发送失败时的处理逻辑 }); return rabbitTemplate; } /** * 声明队列 * 参数说明: * durable 是否持久化,默认是false(持久化队列则数据会被存储在磁盘上,当消息代理重启时数据不会丢失;暂存队列只对当前链接有效) * exclusive 默认是false,只能被当前建立的链接使用,并且当链接关闭后队列即被删除。此参考优先级高于durable * autoDelete 默认是false,是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 * 通常设置一下队列的持久化就好,其他两个就是默认false * * @return Queue **/ @Bean Queue myQueue() { return new Queue(QueueConstants.QUEUE\_NAME, true); } // 设置交换机,类型为 direct @Bean DirectExchange myExchange() { return new DirectExchange(QueueConstants.QUEUE\_EXCHANGE\_NAME, true, false); } // 绑定:将交换机和队列绑定,并设置路由匹配键 @Bean Binding queueBinding() { return BindingBuilder.bind(myQueue()).to(myExchange()).with(QueueConstants.QUEUE\_ROUTING\_KEY\_NAME); }
在 ProducerController 里,主要干了如下几件事:
sendDirectMessage
,经过请求该接口,能够实现生产者发送消息的功能;CorrelationData
,该对象内部只有一个 id 属性,用来表示消息的惟一性;/** * 消息生产端 * @公众号 全栈在路上 * @GitHub https://github.com/liuyongfei1 * @author lyf * @date 2020-05-17 18:30 **/ @RestController public class ProducerController { /\*\* \* RabbitTemplate提供了发送/接收消息的方法 \*/ @Autowired RabbitTemplate rabbitTemplate; /** * 生产消息 * * @Author Liuyongfei * @Date 上午12:12 2020/5/20 * @param test * @param test2 * @return java.lang.String **/ @GetMapping("/sendDirectMessage") public String sendDirectMessage(String test,Integer test2) { // 生成消息的惟一id String msgId = UUID.randomUUID().toString(); String messageData = "hello,this is rabbitmq demo message"; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); // 定义要发送的消息对象 Map<String,Object\> messageObj = new HashMap<>(); messageObj.put("msgId",msgId); messageObj.put("messageData",messageData); messageObj.put("createTime",createTime); rabbitTemplate.convertAndSend(QueueConstants.QUEUE\_EXCHANGE\_NAME,QueueConstants.QUEUE\_ROUTING\_KEY\_NAME, messageObj,new CorrelationData(msgId)); return "message send ok"; } }
setConfirmCallback
方法内部打上断点;至此,生产者消息确认结束,且经过运行的实例,咱们可以得出结论:本次生产的消息已经正确无误的投递到了队列中去。
消费者确认指的就是 RabbitMQ 须要确认消息到底有没有被收到,来肯定要不要将该条消息从队列中删除掉。这就须要消费者来告诉 RabbitMQ,有如下两种方式:
消费者在消费消息的时候,若是设定应答模式为自动,则消费者收到消息后,消息就会当即被 RabbitMQ 从 队列中删除掉。
所以,在实际开发者,咱们基本上是将消费应答模式设置为手动确认更为稳当一些。
消费者在收到消息后:
server.port=10421 spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.username=guest spring.rabbitmq.password=guest # 开启 ACK(消费者接收到消息时手动确认) spring.rabbitmq.listener.simple.acknowledge-mode=manual
ConsumerController 里主要干了如下几件事儿:
@RabbitListener
来监听队列;deliveryTag
;channel.basicAck
来确认消息已经消费;channel.basicNack
把消费失败的消息从新放入到队列中去。/** * 消息消费端 * @公众号 全栈在路上 * @GitHub https://github.com/liuyongfei1 * @author Liuyongfei * @date 2020-05-21 18:00 **/ @Component public class ConsumerController { @RabbitListener(queues = {QueueConstants.QUEUE\_NAME}) public void handler(Message message, Channel channel) throws IOException { System.out.println("收到消息:" + message.toString()); MessageHeaders headers = message.getHeaders(); Long tag = (Long) headers.get(AmqpHeaders.DELIVERY\_TAG); try { // 手动确认消息已消费 channel.basicAck(tag,false); } catch (IOException e) { // 把消费失败的消息从新放入到队列 channel.basicNack(tag, false, true); e.printStackTrace(); } } }
至此,消费者消息确认结束。你们能够在 ConsumerController 里添加一些测试代码来触发异常,体验一下 channel.basicNack
的做用。这里我就再也不一一测试。