RabbitMQ学习笔记(4、RabbitMQ队列)

目录:

  • 消息路由失败了会怎样
  • 备份交换器
  • TTL与DLX
  • 如何实现延迟队列
  • RabbitMQ的RPC实现
  • 持久化
  • 事务
  • 发送方确认机制

消息路由失败了会怎样:

在RabbitMQ中,若是消息路由失败了,通常会有两种状况。要么是把消息回退给客户端处理,要么就把消息丢弃。java

处理逻辑是根据basicPublish方法的mandatoryimmediate两个参数来控制。git

一、mandatory:当mandatory=true时,若是交换器没法根据自身类型和路由键匹配到符合条件的队列,便会调用Basic.Return命令将消息会推给生产者;当mandatory=false时,不知足条件则丢弃此条消息。github

1 channel.addReturnListener(new ReturnListener() { 2     public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, 3                              AMQP.BasicProperties properties, byte[] body) throws IOException { 4         // 具体处理逻辑
5  } 6 });

二、immediate:当immediate=true时,交换器将消息路由到队列后,发现此队列上不存在任何消费者,那么这条消息将不会放入到队列中。当路由键匹配的全部队列都没有消费者时,改消息将会经过Basic.Return返回给生产者。redis

备份交换器:

备份交换器能够将未被路由到的消息存储在RabbitMQ中,在须要它的时候再去使用。api

 1 public class AlternateProduct {  2 
 3     private static final String EXCHANGE_NAME = "alternate.exchange";  4     private static final String EXCHANGE_BAK_NAME = "alternate-bak.exchange";  5     
 6     private static final String QUEUE_NAME = "alternate.queue";  7     private static final String QUEUE_BAK_NAME = "alternate-bak.queue";  8 
 9     private static final String ROUTING_KEY_NAME = "alternate.routing.key"; 10 
11     public static void main(String[] args) throws IOException, TimeoutException { 12         Connection connection = RabbitMqUtils.getConnection(); 13         Channel channel = connection.createChannel(); 14 
15         channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, false, getExchangeDeclareArgs()); 16         // fanout类型,放款路由限制
17         channel.exchangeDeclare(EXCHANGE_BAK_NAME, BuiltinExchangeType.FANOUT, false, false, false, null); 18 
19         channel.queueDeclare(QUEUE_NAME, false, false, false, null); 20         channel.queueDeclare(QUEUE_BAK_NAME, false, false, false, null); 21 
22  channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY_NAME); 23         // 由于交换器QUEUE_BAK_NAME设置fanout类型,因此能够没必要关心路由键,故随便写可能将消息路由到对应的队列中
24         channel.queueBind(QUEUE_BAK_NAME, EXCHANGE_BAK_NAME, "123"); 25 
26         // 发消息时路由键设置一个不存在的"",让其路由不到,从而把消息发到备份队列中
27         channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, 28                 "alternate".getBytes()); 29 
30  RabbitMqUtils.close(connection, channel); 31  } 32 
33     private static Map<String, Object> getExchangeDeclareArgs() { 34         Map<String, Object> result = new HashMap<String, Object>(1); 35         result.put("alternate-exchange", EXCHANGE_BAK_NAME); 36         return result; 37  } 38 }

关于备份交换器的注意点:服务器

一、若是备份交换器不存在,客户端和RabbitMQ客户端都不会出现异常,可是消息会丢失。dom

二、若是备份交换器没有绑定任何队列,客户端和RabbitMQ客户端都不会出现异常,可是消息会丢失。异步

三、若是备份交换器没有匹配到任何队列,客户端和RabbitMQ客户端都不会出现异常,可是消息会丢失。async

四、若是备份交换器和mandatory一块儿使用,且备份交换器有效,此时mandatory将无效。分布式

TTL与DLX:

一、TTL:过时时间,有队列过时时间消息过时时间

队列过时时间

经过设置队列的过时时间,来使队列中因此的消息都具备过时时间。

消息过时时间

设置消息的BasicProperties props属性值来控制消息的过时时间。

1 AMQP.BasicProperties.Builder publishBuilder = new AMQP.BasicProperties.Builder(); 2 // expiration单位ms
3 publishBuilder.expiration("10000"); 4 
5 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY_NAME, publishBuilder.build(), 6         "ttl".getBytes());

对于第一种TTL来讲,队列一但过时就会删除调;但对于第二种TTL来讲,队列过时不会立刻删除,而是等队列要被消费时再判断是否要删除。

那为何会不同呢,咱们都知道mq对性能的要求是很是高的,若是第二种ddl的方式也要及时删除的话势必要扫描整个队列,这样的话,若队列长度较大是性能便会很是的差。

而第一种为何能够作到及时删除呢,咱们知道队列具备先进先出的特性,因此先入队的确定要比后入队的要先过时,因此只要删除头部的就好啦。

而第二种的消息过时时间都是不固定的,考虑到MQ的性能,因此采用了上述的方式。

二、DLX:死信交换器,全称Dead Letter Exchange

变为死信队列的有如下几种状况:

  • 消息被拒,且requeue=false
  • 队列过时或队列达到最大长度

注意:DLX也是一个正常的交换器,和通常队列没有区别,它能在任何的队列上被指定。

如何实现延迟队列:

本模块讲述RabbitMQ,仅提供RabbitMQ的实现,大佬们有兴趣能够实现其它几种方式。

延迟队列是指将消息发送到队列后,等待一段时间后再进行消费。场景:饿了么外卖下单后,超过15分钟订单失效。

延迟队列场景的时间方式有四种:

一、DB轮询:经过job或其它逻辑将订单表的必要字段查出(如:orderId、createTime、status),当订单超过xx时间,将状态置为失效。

)优势:实现简单、无技术难点、异常恢复、支持分布式/集群环境

)缺点:影响DB性能、时效性查、效率低

二、JDK DelayQueue:java api提供的延迟队列的实现,经过poll()、take()方法获取超时任务

)优势:实现简单、性能较好

)缺点:异常恢复困难、分布式/集群实现困难(基于JVM内存)

三、Redis sortedSet:经过zset类型的score来实现

)优势:解耦、异常恢复、扩展性强、支持分布式/集群环境

)缺点:增长了redis维护成本、占用带宽

四、RabbitMQ TTL + DLX:使用RabbitMQ的过时时间和死信队列实现

 实现:delay-message

)优势:解耦、异常恢复、扩展性强、支持分布式/集群环境

)缺点:增长了RabbitMQ维护成本、占用带宽

RabbitMQ的RPC实现:

RabbitMQ也能够实现RPC,客户端发送消息,服务端接收消息。

replayTo:设置回调队列,用于客户端响应服务端的回调消息。

correlationId:RPC请求和响应的关联id。

 1 public class RpcServer {  2 
 3     private static final String QUEUE_NAME = "rpc.queue";  4 
 5     public static void main(String[] args) throws IOException, TimeoutException {  6         Connection connection = RabbitMqUtils.getRpcConnection();  7         final Channel channel = connection.createChannel();  8         // 建立请求处理队列,用于服务端接收客户端RPC请求
 9         channel.queueDeclare(QUEUE_NAME, true, false, false, null); 10 
11         System.out.println("等待RPC请求..."); 12 
13         // 服务端监听客户端发送的RPC请求
14         channel.basicConsume(QUEUE_NAME, new DefaultConsumer(channel) { 15  @Override 16             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) 17                     throws IOException { 18                 String correlationId = properties.getCorrelationId(); 19                 String message = ""; 20 
21                 try { 22                     message = new String(body); 23                     System.err.println(format("service recv message:{0}, corrId:{1}", message, correlationId)); 24                 } catch (Exception e) { 25  e.printStackTrace(); 26                 } finally { 27                     AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() 28  .correlationId(correlationId) 29  .build(); 30 
31                     // 使用默认exchange,容许经过routingKey指定message将被发送给哪一个queue
32                     channel.basicPublish("", properties.getReplyTo(), props, (message + "--is done.").getBytes("UTF-8")); 33                     channel.basicAck(envelope.getDeliveryTag(), false); 34  } 35  } 36  }); 37  } 38 }
 1 public class RpcClient {  2 
 3     private static final String QUEUE_NAME = "rpc.queue";  4 
 5     public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {  6         final Connection connection = RabbitMqUtils.getConnection();  7         Channel channel = connection.createChannel();  8 
 9         // 随机建立corrId
10         final String collId = UUID.randomUUID().toString(); 11         // 客户端建立匿名队列,用于响应服务端请求
12         String callbackQueueName = channel.queueDeclare().getQueue(); 13 
14         // 客户端发送消息;使用默认exchange(exchange=""),容许经过routingKey指定message将被发送给哪一个queue
15         channel.basicPublish("", QUEUE_NAME, getBasicPublishProperties(collId, callbackQueueName), 16                 "hello world".getBytes()); 17         // 客户端接收服务端响应的消息
18         channel.basicConsume(callbackQueueName, new DefaultConsumer(channel) { 19  @Override 20             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 21                 if (collId.equals(properties.getCorrelationId())) { 22                     System.out.println(format("client recv message:{0}, corrId:{1}", new String(body), collId)); 23                 } else { 24                     System.out.println("不是本次请求的消息"); 25  } 26  } 27  }); 28 
29         TimeUnit.SECONDS.sleep(1); 30 
31  RabbitMqUtils.close(connection, channel); 32  } 33 
34     private static AMQP.BasicProperties getBasicPublishProperties(String corrId, String callbackQueueName) { 35         return new AMQP.BasicProperties().builder() 36  .correlationId(corrId) 37  .replyTo(callbackQueueName).build(); 38  } 39 }

持久化:

在RabbitMQ中交换器、队列、消息都设置为持久化就能保持消息不丢失了嘛?

固然不,状况以下:

一、当autoAck设置为true的时候,消费者接收到消息后还没来得及处理就宕机了。

解决:autoAck设为false,消费者处理完消息后再通知服务端删除消息。

二、再RabbitMQ持久化到磁盘中的这段时间,RabbitMQ服务器宕机了。

解决:服务端确认机制、镜像队列(后面章节会描述)。

事务:

一、开启事务:channel.txSelect()

二、提交事务:channel.txCommit()

三、回滚事务:channel.txRollback()

事务和db的事务很类似,不细说。

发送方确认机制:

AMQP协议提供了事务机制来保证消息能真正成功的到达RabbitMQ,但事务机制会严重的影响到RabbitMQ的吞吐量,因此RabbitMQ引入了一种轻量的方式,发送方确认机制。

客户端使用方式:

一、将信道设置发送方确认方式:channel.confirmSelect()。

二、确认消息是否发送成功

)boolean waitForConfirms() throws InterruptedException;

)boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException;

)void waitForConfirmsOrDie() throws IOException, InterruptedException;

)void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException;

发送方确认消息成功的三种方式:

 1 public class PublisherConfirmProduct {  2 
 3     private static final String EXCHANGE_NAME = "demo.exchange";  4     private static final String ROUTING_KEY = "demo.routingkey";  5     private static final String QUEUE_NAME = "demo.queue";  6     private static final String MESSAGE = "Hello World!";  7 
 8     /**
 9  * 单条确认  10      */
 11     public static void commonConfirm() throws Exception {  12         Connection connection = RabbitMqUtils.getConnection();  13         Channel channel = initChannel(connection);  14 
 15  channel.confirmSelect();  16         for (int i = 0; i < 100; i++) {  17  channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, MESSAGE.getBytes());  18             if (channel.waitForConfirms()) {  19                 // 逐条确认是否发送成功
 20                 System.out.println("send success!");  21  }  22  }  23 
 24  RabbitMqUtils.close(connection, channel);  25  }  26 
 27     /**
 28  * 批量确认  29      */
 30     public static void batchConfirm() throws Exception {  31         Connection connection = RabbitMqUtils.getConnection();  32         Channel channel = initChannel(connection);  33 
 34  channel.confirmSelect();  35         for (int i = 0; i < 100; i++) {  36  channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, MESSAGE.getBytes());  37  }  38 
 39         // 批量确认是否发送成功,若是某一次确认失败这一批都要从新发送
 40         if (channel.waitForConfirms()) {  41             System.out.println("send success!");  42  }  43 
 44  RabbitMqUtils.close(connection, channel);  45  }  46 
 47     /**
 48  * 异步确认  49      */
 50     public static void asyncConfirm() throws Exception {  51         Connection connection = RabbitMqUtils.getConnection();  52         Channel channel = initChannel(connection);  53         channel.basicQos(1);  54 
 55  channel.confirmSelect();  56 
 57         // 定义一个未确认消息集合
 58         final SortedSet<Long> unConfirmSet = Collections.synchronizedNavigableSet(new TreeSet<>());  59         for (int i = 0; i < 100; i++) {  60  channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, MESSAGE.getBytes());  61  unConfirmSet.add(channel.getNextPublishSeqNo());  62  }  63 
 64         channel.addConfirmListener(new ConfirmListener() {  65  @Override  66             public void handleNack(long deliveryTag, boolean multiple) throws IOException {  67                 System.err.println(format("拒绝消息 deliveryTag:{0}, multiple:{1}", deliveryTag, multiple));  68  }  69 
 70  @Override  71             public void handleAck(long deliveryTag, boolean multiple) throws IOException {  72                 System.err.println(format("确认消息 deliveryTag:{0}, multiple:{1}", deliveryTag, multiple));  73                 if (multiple) {  74                     // multiple为true,则deliveryTag以前的全部消息所有被确认
 75                     unConfirmSet.headSet(deliveryTag + 1).clear();  76                 } else {  77                     // 不然只确认一条消息
 78  unConfirmSet.remove(deliveryTag);  79  }  80  }  81  });  82 
 83         TimeUnit.SECONDS.sleep(5);  84  System.out.println(unConfirmSet.size());  85 
 86  RabbitMqUtils.close(connection, channel);  87  }  88 
 89     private static Channel initChannel(Connection connection) throws IOException {  90         Channel channel = connection.createChannel();  91         channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, null);  92         channel.queueDeclare(QUEUE_NAME, true, false, false, null);  93  channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);  94         return channel;  95  }  96 
 97     public static void main(String[] args) throws Exception {  98 // commonConfirm();  99 // batchConfirm();
100  asyncConfirm(); 101  } 102 }
相关文章
相关标签/搜索