MQ发布确认的三种策略| RabbitMQ系列(六)

这是我参与8月更文挑战的第11天,活动详情查看:8月更文挑战java


相关文章

RabbitMQ系列汇总:RabbitMQ系列编程


前言

  • 生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,全部在该信道上面发布的消息都将会被指派一个惟一的 ID(从 1 开始),一旦消息被投递到全部匹配的队列以后。安全

  • broker 就会发送一个确认给生产者(包含消息的惟一 ID),这就使得生产者知道消息已经正确到达目的队列了,若是消息和队列是可持久化的,那么确认消息会在将消息写入磁盘以后发出。markdown

  • broker 回传 给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也能够设置 basic.ack 的 multiple 域,表示到这个序列号以前的全部消息都已经获得了处理。并发

  • confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就能够在等信 道返回确认的同时继续发送下一条消息,当消息最终获得确认以后。dom

  • 生产者应用即可以经过回调方法来处理该确认消息,若是 RabbitMQ 由于自身内部错误致使消息丢失,就会发送一条 nack 消息,生产者应用程序一样能够在回调方法中处理该 nack 消息。异步

  • 开启发布确认方式函数

    • 发布确认默认是没有开启的,若是要开启须要调用方法 confirmSelect,每当你要想使用发布 确认,都须要在 channel 上调用该方法高并发

    • //开启发布确认
      channel.confirmSelect();
      复制代码

1、单个确认发布

  • 这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息以后只有它 被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认 的时候才返回,若是在指定时间范围内这个消息没有被确认那么它将抛出异常。post

  • 生产者

    • /** * 这是一个测试的生产者 *@author DingYongJun *@date 2021/8/1 */
      public class DyProducerTest_dingyuefabu {
      
          //设置执行次数
          public static final int MESSAGE_COUNT = 888;
          /** * 这里为了方便,咱们使用main函数来测试 * 纯属看你我的选择 * @param args */
          public static void main(String[] args) throws Exception {
              //单个发布确认执行
              publishMessageIndividually();
          }
      
          /** * 单个发布确认 */
          public static void publishMessageIndividually() throws Exception {
              Channel channel = RabbitMqUtils.getChannel();
              String queueName = UUID.randomUUID().toString();
              channel.queueDeclare(queueName, false, false, false, null);
              //开启发布确认
              channel.confirmSelect();
              long begin = System.currentTimeMillis();
              for (int i = 0; i < MESSAGE_COUNT; i++) {
                  String message = i + "";
                  channel.basicPublish("", queueName, null, message.getBytes());
                  //服务端返回 false 或超时时间内未返回,生产者能够消息重发
                  boolean flag = channel.waitForConfirms();
                  if(flag){
                      System.out.println("消息发送成功");
                  }
              }
              long end = System.currentTimeMillis();
              System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) +
                      "ms");
          }
      }
      复制代码
  • 执行结果

    • image-20210803162231439.png
    • image-20210803162251045.png
  • 这种确认方式有一个最大的缺点就是:发布速度特别的慢,由于若是没有确认发布的消息就会 阻塞全部后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。固然对于某 些应用程序来讲这可能已经足够了。

  • 固然,如今跟你说慢,你莫得感知,下面几种综合起来对比你就会发现他的效率有多低了!

2、批量确认发布

  • 与单个等待确认消息相比,先发布一批消息而后一块儿确承认以极大地 提升吞吐量。

  • 生产者

    • /** * 批量发布确认 */
      public static void publishMessageBatch() throws Exception {
              Channel channel = RabbitMqUtils.getChannel();
              //队列名使用uuid来获取不重复的值,不须要本身再进行命名了。
              String queueName = UUID.randomUUID().toString();
              channel.queueDeclare(queueName, false, false, false, null);
              //开启发布确认
              channel.confirmSelect();
              //批量确认消息大小
              int batchSize = 88;
              //未确认消息个数
              int outstandingMessageCount = 0;
              long begin = System.currentTimeMillis();
              for (int i = 0; i < MESSAGE_COUNT; i++) {
                  String message = i + "";
                  channel.basicPublish("", queueName, null, message.getBytes());
                  outstandingMessageCount++;
                  if (outstandingMessageCount == batchSize) {
                      channel.waitForConfirms();//确认代码
                      outstandingMessageCount = 0;
                  }
      
              }
              //为了确保还有剩余没有确认消息 再次确认
              if (outstandingMessageCount > 0) {
                  channel.waitForConfirms();
              }
              long end = System.currentTimeMillis();
              System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) +
                      "ms");
          }
      复制代码
  • 执行结果

    • image-20210803163106931.png
    • image-20210803163121282.png
  • 缺点:当发生故障致使发布出现问题时,不知道是哪一个消息出现问题了,咱们必须将整个批处理保存在内存中,以记录重要的信息然后从新发布消息。

  • 固然这种方案仍然是同步的,也同样阻塞消息的发布。

3、异步确认发布

  • 异步确认虽然编程逻辑比上两个要复杂,可是性价比最高,不管是可靠性仍是效率都没得说, 他是利用回调函数来达到消息可靠性传递的,这个中间件也是经过函数回调来保证是否投递成功, 下面就让咱们来详细讲解异步确认是怎么实现的。

  • 生产者

    • /** * 异步发布确认 */
          public static void publishMessageAsync() throws Exception {
              try (Channel channel = RabbitMqUtils.getChannel()) {
                  String queueName = UUID.randomUUID().toString();
                  channel.queueDeclare(queueName, false, false, false, null);
                  //开启发布确认
                  channel.confirmSelect();
                  /** * 线程安全有序的一个哈希表,适用于高并发的状况 * 1.轻松的将序号与消息进行关联 * 2.轻松批量删除条目 只要给到序列号 * 3.支持并发访问 */
                  ConcurrentSkipListMap<Long, String> outstandingConfirms = new
                          ConcurrentSkipListMap<>();
                  /** * 确认收到消息的一个回调 * 1.消息序列号 * 2.true 能够确认小于等于当前序列号的消息 * false 确认当前序列号消息 */
                  ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
                      if (multiple) {
                          //返回的是小于等于当前序列号的未确认消息 是一个 map
                          ConcurrentNavigableMap<Long, String> confirmed =
                                  outstandingConfirms.headMap(sequenceNumber, true);
                          //清除该部分未确认消息
                          confirmed.clear();
                      }else{
                          //只清除当前序列号的消息
                          outstandingConfirms.remove(sequenceNumber);
                      }
                  };
                  ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
                      String message = outstandingConfirms.get(sequenceNumber);
                      System.out.println("发布的消息"+message+"未被确认,序列号"+sequenceNumber);
                  };
                  /** * 添加一个异步确认的监听器 * 1.确认收到消息的回调 * 2.未收到消息的回调 */
                  channel.addConfirmListener(ackCallback, null);
                  long begin = System.currentTimeMillis();
                  for (int i = 0; i < MESSAGE_COUNT; i++) {
                      String message = "消息" + i;
                      /** * channel.getNextPublishSeqNo()获取下一个消息的序列号 * 经过序列号与消息体进行一个关联 * 所有都是未确认的消息体 */
                      outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
                      channel.basicPublish("", queueName, null, message.getBytes());
                  }
                  long end = System.currentTimeMillis();
                  System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) +
                          "ms");
              }
          }
      复制代码
  • 执行结果

    • image-20210803163759179.png
    • image-20210803163826281.png
  • 很容易看出,这种方式速度快的飞起呀!

  • 如何处理未确认的消息?

    • 最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列, 好比说用 ConcurrentLinkedQueue 这个队列在 confirm callbacks 与发布线程之间进行消息的传递。

4、总结

  • 单独发布消息

    • 耗时:21210ms
    • 同步等待确认,简单,但吞吐量很是有限。
  • 批量发布消息

    • 耗时:525ms
    • 批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条 消息出现了问题。
  • 异步处理

    • 耗时:45ms
    • 最佳性能和资源使用,在出现错误的状况下能够很好地控制,可是实现起来稍微难些

路漫漫其修远兮,吾必将上下求索~

若是你认为i博主写的不错!写做不易,请点赞、关注、评论给博主一个鼓励吧~hahah

相关文章
相关标签/搜索