背景git
何为延迟队列?github
顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而通常的队列,消息一旦入队了以后就会被消费者立刻消费。spring
场景一:在订单系统中,一个用户下单以后一般有30分钟的时间进行支付,若是30分钟以内没有支付成功,那么这个订单将进行一场处理。这是就可使用延时队列将订单信息发送到延时队列。springboot
场景二:用户但愿经过手机远程遥控家里的智能设备在指定的时间进行工做。这时候就能够将用户指令发送到延时队列,当指令设定的时间到了再将指令推送到只能设备。网络
延迟队列能作什么?ide
延迟队列多用于须要延迟工做的场景。最多见的是如下两种场景:spring-boot
一、延迟消费。好比:post
二、延迟重试。好比消费者从队列里消费消息时失败了,可是想要延迟一段时间后自动重试。学习
若是不使用延迟队列,那么咱们只能经过一个轮询扫描程序去完成。这种方案既不优雅,也不方便作成统一的服务便于开发人员使用。可是使用延迟队列的话,咱们就能够垂手可得地完成。测试
如何实现?
别急,在下文中,咱们将详细介绍如何利用Spring Boot加RabbitMQ来实现延迟队列。
本文出现的示例代码都已push到Github仓库中:https://github.com/Lovelcp/blog-demos/tree/master/spring-boot-rabbitmq-delay-queue
实现思路
在介绍具体的实现思路以前,咱们先来介绍一下RabbitMQ的两个特性,一个是Time-To-Live Extensions,另外一个是Dead Letter Exchanges。
Time-To-Live Extensions
RabbitMQ容许咱们为消息或者队列设置TTL(time to live),也就是过时时间。TTL代表了一条消息可在队列中存活的最大时间,单位为毫秒。也就是说,当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在通过TTL秒后“死亡”,成为Dead Letter。若是既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。更多资料请查阅官方文档。
Dead Letter Exchange
刚才提到了,被设置了TTL的消息在过时后会成为Dead Letter。其实在RabbitMQ中,一共有三种消息的“死亡”形式:
若是队列设置了Dead Letter Exchange(DLX),那么这些Dead Letter就会被从新publish到Dead Letter Exchange,经过Dead Letter Exchange路由到其余队列。更多资料请查阅官方文档。
流程图
聪明的你确定已经想到了,如何将RabbitMQ的TTL和DLX特性结合在一块儿,实现一个延迟队列。
针对于上述的延迟队列的两个场景,咱们分别有如下两种流程图:
延迟消费
延迟消费是延迟队列最为经常使用的使用模式。以下图所示,生产者产生的消息首先会进入缓冲队列(图中红色队列)。经过RabbitMQ提供的TTL扩展,这些消息会被设置过时时间,也就是延迟消费的时间。等消息过时以后,这些消息会经过配置好的DLX转发到实际消费队列(图中蓝色队列),以此达到延迟消费的效果。
延迟重试
延迟重试本质上也是延迟消费的一种,可是这种模式的结构与普通的延迟消费的流程图较为不一样,因此单独拎出来介绍。
以下图所示,消费者发现该消息处理出现了异常,好比是由于网络波动引发的异常。那么若是不等待一段时间,直接就重试的话,极可能会致使在这期间内一直没法成功,形成必定的资源浪费。那么咱们能够将其先放在缓冲队列中(图中红色队列),等消息通过一段的延迟时间后再次进入实际消费队列中(图中蓝色队列),此时因为已通过了“较长”的时间了,异常的一些波动一般已经恢复,这些消息能够被正常地消费。
代码实现
接下来咱们将介绍如何在Spring Boot中实现基于RabbitMQ的延迟队列。咱们假设读者已经拥有了Spring Boot与RabbitMQ的基本知识。
初始化工程
首先咱们在Intellij中建立一个Spring Boot工程,而且添加spring-boot-starter-amqp扩展。
配置队列
从上述的流程图中咱们能够看到,一个延迟队列的实现,须要一个缓冲队列以及一个实际的消费队列。又因为在RabbitMQ中,咱们拥有两种消息过时的配置方式,因此在代码中,咱们一共配置了三条队列:
咱们经过Java Config的方式将上述的队列配置为Bean。因为咱们添加了spring-boot-starter-amqp扩展,Spring Boot在启动时会根据咱们的配置自动建立这些队列。为了方便接下来的测试,咱们将delay_queue_per_message_ttl以及delay_queue_per_queue_ttl的DLX配置为同一个,且过时的消息都会经过DLX转发到delay_process_queue。
delay_queue_per_message_ttl
首先介绍delay_queue_per_message_ttl的配置代码:
1 2 3 4 5 6 7 |
|
其中,x-dead-letter-exchange声明了队列里的死信转发到的DLX名称,x-dead-letter-routing-key声明了这些死信在转发时携带的routing-key名称。
delay_queue_per_queue_ttl
相似地,delay_queue_per_queue_ttl的配置代码:
1 2 3 4 5 6 7 8 |
|
delay_queue_per_queue_ttl队列的配置比delay_queue_per_message_ttl队列的配置多了一个x-message-ttl,该配置用来设置队列的过时时间。
delay_process_queue
delay_process_queue的配置最为简单:
1 2 3 4 5 |
|
配置Exchange
配置DLX
首先,咱们须要配置DLX,代码以下:
1 2 3 4 |
|
而后再将该DLX绑定到实际消费队列即delay_process_queue上。这样全部的死信都会经过DLX被转发到delay_process_queue:
1 2 3 4 5 6 |
|
配置延迟重试所需的Exchange
从延迟重试的流程图中咱们能够看到,消息处理失败以后,咱们须要将消息转发到缓冲队列,因此缓冲队列也须要绑定一个Exchange。在本例中,咱们将delay_process_per_queue_ttl做为延迟重试里的缓冲队列。具体代码是如何配置的,这里就不赘述了,你们能够查阅我Github中的代码。
定义消费者
咱们建立一个最简单的消费者ProcessReceiver,这个消费者监听delay_process_queue队列,对于接受到的消息,他会:
另外,咱们还须要新建一个监听容器用于存放消费者,代码以下:
1 2 3 4 5 6 7 8 |
|
至此,咱们前置的配置代码已经所有编写完成,接下来咱们须要编写测试用例来测试咱们的延迟队列。
编写测试用例
延迟消费场景
首先咱们编写用于测试TTL设置在消息上的测试代码。
咱们借助spring-rabbit包下提供的RabbitTemplate类来发送消息。因为咱们添加了spring-boot-starter-amqp扩展,Spring Boot会在初始化时自动地将RabbitTemplate当成bean加载到容器中。
解决了消息的发送问题,那么又该如何为每一个消息设置TTL呢?这里咱们须要借助MessagePostProcessor。
MessagePostProcessor一般用来设置消息的Header以及消息的属性。咱们新建一个ExpirationMessagePostProcessor类来负责设置消息的TTL属性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
|
而后在调用RabbitTemplate的convertAndSend方法时,传入ExpirationMessagePostPorcessor便可。咱们向缓冲队列中发送3条消息,过时时间依次为1秒,2秒和3秒。具体的代码以下所示:
1 2 3 4 5 6 7 8 9 10 |
|
细心的朋友必定会问,为何要在代码中加一个CountDownLatch呢?这是由于若是没有latch阻塞住测试方法的话,测试用例会直接结束,程序退出,咱们就看不到消息被延迟消费的表现了。
那么相似地,测试TTL设置在队列上的代码以下:
1 2 3 4 5 6 7 8 9 |
|
咱们向缓冲队列中发送3条消息。理论上这3条消息会在4秒后同时过时。
延迟重试场景
咱们一样还需测试延迟重试场景。
1 2 3 4 5 6 7 8 |
|
咱们向delay_process_queue发送3条会触发FAIL的消息,理论上这3条消息会在4秒后自动重试。
查看测试结果
延迟消费场景
延迟消费的场景测试咱们分为了TTL设置在消息上和TTL设置在队列上两种。首先,咱们先看一下TTL设置在消息上的测试结果:
从上图中咱们能够看到,ProcessReceiver分别通过1秒、2秒、3秒收到消息。测试结果代表消息不只被延迟消费了,并且每条消息的延迟时间是能够被个性化设置的。TTL设置在消息上的延迟消费场景测试成功。
而后,TTL设置在队列上的测试结果以下图:
从上图中咱们能够看到,ProcessReceiver通过了4秒的延迟以后,同时收到了3条消息。测试结果代表消息不只被延迟消费了,同时也证实了当TTL设置在队列上的时候,消息的过时时间是固定的。TTL设置在队列上的延迟消费场景测试成功。
延迟重试场景
接下来,咱们再来看一下延迟重试的测试结果:
ProcessReceiver首先收到了3条会触发FAIL的消息,而后将其移动到缓冲队列以后,过了4秒,又收到了刚才的那3条消息。延迟重试场景测试成功。
总结
本文首先介绍了延迟队列的概念以及用途,而且经过代码详细讲解了如何经过Spring Boot和RabbitMQ实现一个延迟队列。但愿本文可以对你们平时的学习和工做能有所启发和帮助。也但愿你们多多支持脚本之家。
原文连接:http://www.kissyu.org/