RabbitMQ系列之消息确认机制

  1. 上一篇介绍了 RabbitMQ 中的一些基本概念,并经过 SpringBoot 工程整合 RabbitMQ,作了一个小 demo;
  2. 那么 RabbitMQ 是怎么知道消息到底有没有被消费者消费,生产者是怎么知道本身发送的消息时真的已经发送到 RabbitMQ 中了呢?
  3. 本篇经过实例演示去介绍一下 RabbitMQ 的消息确认机制,阅读完本篇内容上面的这些疑问就迎刃而解,并且也有助于后边咱们理解 RabbitMQ 的消息为何会出现重复消费的问题。
  4. 文中实例会在文末提供下载地址。

一. 为何要有消息确认

  1. 因为网络可能以不可预知的方式出现故障,且检测故障可能须要耗费一些时间;
  2. 所以不能保证发送的消息可以到达对等方或由它成功地处理。

二. 消息确认流程

RabbitMQ 的消息确认机制以下:
RabbitMQ消息确认.pngjava

从图中咱们能够看出:git

  • 生产者发送消息到 RabbitMQ Server 后,RabbitMQ Server 须要对生产者进行消息 Confirm 确认;
  • 消费者消费消息后须要对 RabbitMQ Server 进行消息 ACK 确认。

这两个机制都是收到 TCP 协议的启发,它们对于数据安全相当重要。
下面就分别从生产者、消费者两个方面结合实例来认识消息确认机制。github

备注:web

  1. 在 RabbitMQ 中 有两种事务机制来确保消息的安全送达,分别是事务机制和确认机制;
  2. 事务机制须要每一个消息或一组消息发布、提交的通道设置为事务性的,所以会很是耗费性能,下降了 Rabbitmq 的消息吞吐量;
  3. 所以咱们在实际生产中一般采用确认机制,下面的实例演示就采用确认机制来进行编码。

三. 生产者确认

1. 消息投递和消息确认链路

咱们先来看一下RabbitMQ 消息投递和接收的一个完整链路以下:
RabbitMQ消息推送到接收.pngspring

2. 消息投递可靠性保证

消息投递的链路用文字表示:
producer->rabbitmq broker cluster->exchange->queue->consumersegmentfault

因为:安全

  1. 生产者向 RabbitMQ Server 发出的消息可能会在发送途中丢失或者须要通过必定的延迟后才能成功发送到 RabbitMQ Server;
  2. 所以,须要 RabbitMQ 告诉生产者,生产者才能知道本身发布的消息是否已经送达。

在编码时咱们能够用两个选项用来控制消息投递的可靠性:服务器

  • 消息从 producer 到 RabbitMQ broker cluster 成功,则会返回一个 confirmCallback
  • 消息从 exchange 到 queue 投递失败,则会返回一个 returnCallback

咱们能够利用这两个 callback 接口来控制消息的一致性和处理一部分的异常状况。微信

3. 开启 confirm 和 return 确认

server.port=10420

spring.rabbitmq.host=127.0.0.1

spring.rabbitmq.username=guest

spring.rabbitmq.password=guest

# 开启发送确认

spring.rabbitmq.publisher-confirms=true

# 开启发送失败退回(消息有没有找到合适的队列)

spring.rabbitmq.publisher-returns=true

4. 使用 callback 接口来确保消息投递状态

在 RabbitConfig 配置类里,定义 RabbitTemplate Bean,使用 callback 接口:网络

/**

 * RabbitMQ配置

 *

 * @author lyf

 * @公众号 全栈在路上

 * @GitHub https://github.com/liuyongfei1

 * @date 2020-05-17 17:20

**/

@Slf4j

@Configuration

public class RabbitConfig {

@Autowired

CachingConnectionFactory cachingConnectionFactory;

@Bean

RabbitTemplate rabbitTemplate() {

RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);

// 消息只要被 rabbitmq broker 接收到就会执行 confirmCallback
// 若是是 cluster 模式,须要全部 broker 接收到才会调用 confirmCallback
// 被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息必定会被投递到目标 queue 里

rabbitTemplate.setConfirmCallback((data, ack, cause) -> {

String msgId = data.getId();

if (ack) {

log.info(msgId + ": 消息发送成功");

} else {

log.info(msgId + ": 消息发送失败");

}

});

// confirm 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。
// 在有些业务场景下,咱们须要保证消息必定要投递到目标 queue 里,此时就须要用到 return 退回模式
// 这样若是未能投递到目标 queue 里将调用 returnCallback,能够记录下详细到投递数据,按期的巡检或者自动纠错都须要这些数据
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {

log.info(MessageFormat.format("消息发送失败,ReturnCallback:{0},{1},{2},{3},{4},{5}", message, replyCode,

replyText, exchange, routingKey));

// TODO 作消息发送失败时的处理逻辑

});

return rabbitTemplate;

}

/**

 * 声明队列

 * 参数说明:

 * durable 是否持久化,默认是false(持久化队列则数据会被存储在磁盘上,当消息代理重启时数据不会丢失;暂存队列只对当前链接有效)

 * exclusive 默认是false,只能被当前建立的链接使用,并且当链接关闭后队列即被删除。此参考优先级高于durable

 * autoDelete 默认是false,是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。

 * 通常设置一下队列的持久化就好,其他两个就是默认false

 *

 * @return Queue

**/

@Bean

Queue myQueue() {

return new Queue(QueueConstants.QUEUE\_NAME, true);

}

// 设置交换机,类型为 direct

@Bean

DirectExchange myExchange() {

return new DirectExchange(QueueConstants.QUEUE\_EXCHANGE\_NAME, true, false);

}

// 绑定:将交换机和队列绑定,并设置路由匹配键

@Bean

Binding queueBinding() {

return BindingBuilder.bind(myQueue()).to(myExchange()).with(QueueConstants.QUEUE\_ROUTING\_KEY\_NAME);

}

5. 消息生产端

在 ProducerController 里,主要干了如下几件事:

  • 提供了一个 Rest接口sendDirectMessage,经过请求该接口,能够实现生产者发送消息的功能;
  • 在该接口内部使用了 CorrelationData,该对象内部只有一个 id 属性,用来表示消息的惟一性;
  • 使用 rabbitTemplate.convertAndSend 像 RabbitMQ 发送消息(这里使用的rabbitTemplate 就是在 RabbitConfig 里被重写的 RabbitTemplate)。
/**

 * 消息生产端

 * @公众号 全栈在路上

 * @GitHub https://github.com/liuyongfei1

 * @author lyf

 * @date 2020-05-17 18:30

**/

@RestController

public class ProducerController {

/\*\*

\* RabbitTemplate提供了发送/接收消息的方法

\*/

@Autowired

RabbitTemplate rabbitTemplate;

/**

 * 生产消息

 *

 * @Author Liuyongfei

 * @Date 上午12:12 2020/5/20

 * @param test

 * @param test2

 * @return java.lang.String

**/

@GetMapping("/sendDirectMessage")

public String sendDirectMessage(String test,Integer test2) {

// 生成消息的惟一id

String msgId = UUID.randomUUID().toString();

String messageData = "hello,this is rabbitmq demo message";

String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

// 定义要发送的消息对象

Map<String,Object\> messageObj = new HashMap<>();

messageObj.put("msgId",msgId);

messageObj.put("messageData",messageData);

messageObj.put("createTime",createTime);

rabbitTemplate.convertAndSend(QueueConstants.QUEUE\_EXCHANGE\_NAME,QueueConstants.QUEUE\_ROUTING\_KEY\_NAME,

messageObj,new CorrelationData(msgId));

return "message send ok";

}

}

6. 生产消息

  1. 保存代码,在 RabbitConfig 里的 setConfirmCallback方法内部打上断点;
  2. 重启服务后,使用 PostMan 请求生产消息接口:http://你的域名:10420/sendDirectMessage,生产消息,并将消息发送给 RabbitMQ:
    postman.png
  3. 而后打开 RabbitMQ 管理界面,找到对应的队列,会发现:
    rabbitmq-web1.png
    关于 Read,Total 的状态表明什么意思,能够翻看上一篇文章。
  4. 在 IDEA 里,服务在启动后直接停在断点处:
    confirmBack.png
    也就说明咱们生产的消息已经成功的到达了 RabbitMQ Server 里。
  5. 继续执行断点调试的绿色箭头,发现 setReturnCallback 方法里的断点没有执行到,也就说明了咱们生产的消息已经被交换机顺利的投递到队列里去了

总结

至此,生产者消息确认结束,且经过运行的实例,咱们可以得出结论:本次生产的消息已经正确无误的投递到了队列中去。

四. 消费者确认

消费者确认指的就是 RabbitMQ 须要确认消息到底有没有被收到,来肯定要不要将该条消息从队列中删除掉。这就须要消费者来告诉 RabbitMQ,有如下两种方式:

1. 自动应答

消费者在消费消息的时候,若是设定应答模式为自动,则消费者收到消息后,消息就会当即被 RabbitMQ 从 队列中删除掉。
所以,在实际开发者,咱们基本上是将消费应答模式设置为手动确认更为稳当一些。

2. 手动应答

消费者在收到消息后:

  • 能够在既定的正常状况下进行确认(告诉 RabbitMQ,我已经消费过该消息了,你能够删除该条数据了);
  • 能够在既定的异常状况下不进行确认(RabbitMQ 会继续保留该条数据),这样下一次能够继续消费该条数据。

3. 开启手动应答

server.port=10421

spring.rabbitmq.host=127.0.0.1

spring.rabbitmq.username=guest

spring.rabbitmq.password=guest

# 开启 ACK(消费者接收到消息时手动确认)

spring.rabbitmq.listener.simple.acknowledge-mode=manual

4. 消息消费者

ConsumerController 里主要干了如下几件事儿:

  1. 使用 @RabbitListener 来监听队列;
  2. 从消息头里拿到消息的惟一表示 deliveryTag
  3. 使用 channel.basicAck 来确认消息已经消费;
  4. 若是有异常,使用 channel.basicNack 把消费失败的消息从新放入到队列中去。
/**

 * 消息消费端

 * @公众号 全栈在路上

 * @GitHub https://github.com/liuyongfei1

 * @author Liuyongfei

 * @date 2020-05-21 18:00

**/

@Component

public class ConsumerController {

@RabbitListener(queues = {QueueConstants.QUEUE\_NAME})

public void handler(Message message, Channel channel) throws IOException {

System.out.println("收到消息:" + message.toString());

MessageHeaders headers = message.getHeaders();

Long tag = (Long) headers.get(AmqpHeaders.DELIVERY\_TAG);

try {

// 手动确认消息已消费

channel.basicAck(tag,false);

} catch (IOException e) {

// 把消费失败的消息从新放入到队列

channel.basicNack(tag, false, true);

e.printStackTrace();

}

}

}

5. 消费消息

  1. 重启消费端服务,停在断点处:

    consumer1.png

  2. 查看 RabbitMQ 管理界面会发现 队列的 Ready 和 Total 仍然是 1,说明咱们的手动应答设置生效:
  3. 点击 Debug 的绿色箭头继续像下执行,查看 RabbitMQ 管理界面:

    rabbitmq-web3.png

  4. 几秒后再次查看 RabbitMQ 管理界面:
    rabbitmq-web4.png
    会发现:Ready 变为0,Unacked 为 0,Total 为 0。 说明该条数据已经被成功消费。

总结

至此,消费者消息确认结束。你们能够在 ConsumerController 里添加一些测试代码来触发异常,体验一下 channel.basicNack 的做用。这里我就再也不一一测试。

五. demo下载地址

  • https://github.com/liuyongfei1/blog-demo
  • 在本篇实例中,我将消息生产端和消费端部署为两个单独的服务,你们克隆完毕后请切换到 feature/rabbitmq-confirm 分支进行启动测试。
  • 欢迎你们关注扫描二维码或 添加微信公众号:全栈在路上
    微信公众号二维码.jpg
相关文章
相关标签/搜索