这篇文章主要讲 RabbitMQ 中 消费者 ack 以及 生产者 confirms。html
如上图,生产者把消息发送到 RabbitMQ,而后 RabbitMQ 再把消息投递到消费者。git
生产者和 RabbitMQ,以及 RabbitMQ 和消费者都是经过 TCP 链接,可是他们之间是经过信道(Channel)传递数据的。多个线程共享一个链接,可是每一个线程拥有独自的信道。github
问题:怎么保证 RabbitMQ 投递的消息被成功投递到了消费者?redis
RabbitMQ 投递的消息,刚投递一半,产生了网络抖动,就有可能到不了消费者。spring
解决办法:浏览器
RabbitMQ 对消费者说:“若是你成功接收到了消息,给我说确认收到了,否则我就当你没有收到,我还会从新投递”网络
在 RabbitMQ 中,有两种 acknowledgement 模式。app
这也称做发后即忘模式。ide
在这种模式下,RabbitMQ 投递了消息,在投递成功以前,若是消费者的 TCP 链接 或者 channel 关闭了,这条消息就会丢失。spring-boot
会有丢失消息问题。
在这种模式下,RabbitMQ 投递了消息,在投递成功以前,若是消费者的 TCP 链接 或者 channel 关闭了,致使这条消息没有被 acked,RabbitMQ 会自动把当前消息从新入队,再次投递。
会有重复投递消息的问题,因此消费者得准备好处理重复消息的问题,就是所谓的:幂等性。
为了启用 手动 ack 模式,消费者须要实现 ChannelAwareMessageListener
接口。
@Component public class Consumer implements ChannelAwareMessageListener { @Autowired private MessageConverter messageConverter; @Override public void onMessage(Message message, Channel channel) throws Exception { MessageProperties messageProperties = message.getMessageProperties(); // 表明投递的标识符,惟一标识了当前信道上的投递,经过 deliveryTag ,消费者就能够告诉 RabbitMQ 确认收到了当前消息,见下面的方法 long deliveryTag = messageProperties.getDeliveryTag(); // 若是是重复投递的消息,redelivered 为 true Boolean redelivered = messageProperties.getRedelivered(); // 获取生产者发送的原始消息 Object originalMessage = messageConverter.fromMessage(message); Console.log("consume message = {} , deliveryTag = {} , redelivered = {}" , originalMessage, deliveryTag, redelivered); // 表明消费者确认收到当前消息,第二个参数表示一次是否 ack 多条消息 channel.basicAck(deliveryTag, false); // 表明消费者拒绝一条或者多条消息,第二个参数表示一次是否拒绝多条消息,第三个参数表示是否把当前消息从新入队 // channel.basicNack(deliveryTag, false, false); // 表明消费者拒绝当前消息,第二个参数表示是否把当前消息从新入队 // channel.basicReject(deliveryTag,false); } }
channel.basicAck
表明消费者确认收到当前消息,语义上表示消费者成功处理了当前消息。
channel.basicNack
表明消费者拒绝一条或者多条消息。basicNack 算是 basicReject 的一个扩展,由于 basicReject 不能一次拒绝多条消息。
channel.basicReject
表明消费者拒绝这条消息,语义上表示消费者没有处理当前消息。
对于 basicNack 和 basicReject ,若是参数 boolean requeue
传入 false
,消息仍是会从队列里面删除。这三个方法只是语义上的不一样。
deliveryTag
deliveryTag 是 64 bit long 值,从 1 开始,不停的递增 1。不一样的 channel 有独立的 deliveryTag。好比有两个消费者,你会发现,都是从 1 开始递增,互不影响。
因为上面建立的消费者,没有指明监听那个队列,因此还须要建立一个 MessageListenerContainer
。
@Bean public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, ChannelAwareMessageListener listener) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); // 指定消费者 container.setMessageListener(listener); // 指定监听的队列 container.setQueueNames(QUEUE_NAME); // 设置消费者的 ack 模式为手动确认模式 container.setAcknowledgeMode(AcknowledgeMode.MANUAL); container.setPrefetchCount(300); return container; }
这样就开启了消费者手动 ack 模式。
若是开启了消费者手动 ack 模式,可是又没有调用手动确认方法(好比:channel.basicAck),那问题就大了,RabbitMQ 会在当前 channel 上一直阻塞,等待消费者 ack。
问题:怎么保证生产者发送的消息被 RabbitMQ 成功接收?
生产者发送的消息,刚发送一半,产生了网络抖动,就有可能到不了 RabbitMQ。
解决办法:
生产者对 RabbitMQ 说:“若是你成功接收到了消息,给我说确认收到了,否则我就当你没有收到”
/** * 自定义消息元数据 */ @NoArgsConstructor @Data public class RabbitMetaMessage implements Serializable{ /** * 是不是 returnCallback */ private boolean returnCallback; /** * 承载原始消息数据数据 */ private Object payload; public RabbitMetaMessage(Object payload) { this.payload = payload; } }
先把消息存储到 redis,再发送到 rabbitmq
@RestController public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private RedisTemplate redisTemplate; @Autowired private DefaultKeyGenerator keyGenerator; @GetMapping("/sendMessage") public Object sendMessage() { new Thread(() -> { HashOperations hashOperations = redisTemplate.opsForHash(); for (int i = 0; i < 1; i++) { String id = keyGenerator.generateKey() + ""; String value = "message " + i; RabbitMetaMessage rabbitMetaMessage = new RabbitMetaMessage(value); // 先把消息存储到 redis hashOperations.put(RedisConfig.RETRY_KEY, id, rabbitMetaMessage); Console.log("send message = {}", value); // 再发送到 rabbitmq rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, RabbitMQConfig.ROUTING_KEY, value, (message) -> { message.getMessageProperties().setMessageId(id); return message; }, new CorrelationData(id)); } }).start(); return "ok"; } }
@Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("192.168.238.132", 5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); // 设置 生产者 confirms connectionFactory.setPublisherConfirms(true); // 设置 生产者 Returns connectionFactory.setPublisherReturns(true); return connectionFactory; }
@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // 必须设置为 true,否则当 发送到交换器成功,可是没有匹配的队列,不会触发 ReturnCallback 回调 // 并且 ReturnCallback 比 ConfirmCallback 先回调,意思就是 ReturnCallback 执行完了才会执行 ConfirmCallback rabbitTemplate.setMandatory(true); // 设置 ConfirmCallback 回调 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { Console.log("ConfirmCallback , correlationData = {} , ack = {} , cause = {} ", correlationData, ack, cause); // 若是发送到交换器都没有成功(好比说删除了交换器),ack 返回值为 false // 若是发送到交换器成功,可是没有匹配的队列(好比说取消了绑定),ack 返回值为仍是 true (这是一个坑,须要注意) if (ack) { String messageId = correlationData.getId(); RabbitMetaMessage rabbitMetaMessage = (RabbitMetaMessage) redisTemplate.opsForHash().get(RedisConfig.RETRY_KEY, messageId); Console.log("rabbitMetaMessage = {}", rabbitMetaMessage); if (!rabbitMetaMessage.isReturnCallback()) { // 到这一步才能彻底保证消息成功发送到了 rabbitmq // 删除 redis 里面的消息 redisTemplate.opsForHash().delete(RedisConfig.RETRY_KEY, messageId); } } }); // 设置 ReturnCallback 回调 // 若是发送到交换器成功,可是没有匹配的队列,就会触发这个回调 rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { Console.log("ReturnCallback unroutable messages, message = {} , replyCode = {} , replyText = {} , exchange = {} , routingKey = {} ", message, replyCode, replyText, exchange, routingKey); // 从 redis 取出消息,设置 returnCallback 设置为 true String messageId = message.getMessageProperties().getMessageId(); RabbitMetaMessage rabbitMetaMessage = (RabbitMetaMessage) redisTemplate.opsForHash().get(RedisConfig.RETRY_KEY, messageId); rabbitMetaMessage.setReturnCallback(true); redisTemplate.opsForHash().put(RedisConfig.RETRY_KEY, messageId, rabbitMetaMessage); }); return rabbitTemplate; }
必须 rabbitTemplate.setMandatory(true)
,否则当 发送到交换器成功,可是没有匹配的队列,不会触发 ReturnCallback 回调。并且 ReturnCallback 比 ConfirmCallback 先回调。
如何模拟 发送到交换器成功,可是没有匹配的队列,先把项目启动,而后再把队列解绑,再发送消息,就会触发 ReturnCallback 回调,并且发现消息也丢失了,没有到任何队列。
这样就解绑了。
运行项目,而后打开浏览器,输入 http://localhost:9999/sendMessage
控制台打出以下日志
这样就触发了 ReturnCallback 回调 ,从 redis 取出消息,设置 returnCallback 设置为 true。你会发现 ConfirmCallback 的 ack 返回值仍是 true。
这里有个须要注意的地方,若是发送到交换器成功,可是没有匹配的队列(好比说取消了绑定),ack 返回值为仍是 true (这是一个坑,须要注意,就像上面那种状况!!!)。因此不能单靠这个来判断消息真的发送成功了。这个时候会先触发 ReturnCallback 回调,咱们把 returnCallback 设置为 true,因此还得判断 returnCallback 是否为 true,若是为 ture,表示消息发送不成功,false 才能彻底保证消息成功发送到了 rabbitmq。
如何模拟 ack 返回值为 false,先把项目启动,而后再把交换器删除,就会发现 ConfirmCallback 的 ack 为 false。
运行项目,而后打开浏览器,输入 http://localhost:9999/sendMessage
控制台打出以下日志
你会发现 ConfirmCallback 的 ack 返回值才是 false。
不能单单依靠 ConfirmCallback 的 ack 返回值为 true,就判定当前消息发送成功了。
Consumer Acknowledgements and Publisher Confirms
因为本人知识和能力有限,文中若有没说清楚的地方,但愿你们能在评论区指出,以帮助我将博文写得更好。