在 jdk
的 juc
工具包中,提供了一种延迟队列 DelayQueue
。延迟队列用处很是普遍,好比咱们最多见的场景就是在网购或者外卖平台中发起一个订单,若是不付款,通常 15
分钟后就会被关闭,这个直接用定时任务是很差实现的,由于每一个用户下单的时间并不肯定,因此这时候就须要用到延迟队列。html
延迟队列自己也是队列,只不过这个队列是延迟的,意思就是说当咱们把一条消息放入延迟队列,消息并不会马上出队,而是会在到达指定时间以后(或者说过了指定时间)才会出队,从而被消费者消费。java
RabbitMQ
中的死信队列就是用来存储特定条件下的消息,那么假如咱们把这个条件设定为指定时间过时(设定带TTL
的消息或者队列),就能够用来实现延迟队列的功能。浏览器
TtlDelayRabbitConfig
配置类(省略了包名和导入),消息最开始发送至 ttl
消息队列,这个队列中全部的消息在 5
秒后过时,后期后会进入死信队列:@Configuration public class TtlDelayRabbitConfig { //路由ttl消息交换机 @Bean("ttlDelayFanoutExchange") public FanoutExchange fanoutExchange(){ return new FanoutExchange("TTL_DELAY_FANOUT_EXCHANGE"); } //ttl消息队列 @Bean("ttlDelayQueue") public Queue ttlQueue(){ Map<String, Object> map = new HashMap<String, Object>(); map.put("x-message-ttl", 5000);//队列中全部消息5秒后过时 map.put("x-dead-letter-exchange", "TTL_DELAY_DEAD_LETTER_FANOUT_EXCHANGE");//过时后进入死信队列 return new Queue("TTL_QUEUE",false,false,false,map); } //Fanout交换机和productQueue绑定 @Bean public Binding bindTtlFanoutExchange(@Qualifier("ttlDelayQueue") Queue queue, @Qualifier("ttlDelayFanoutExchange") FanoutExchange fanoutExchange){ return BindingBuilder.bind(queue).to(fanoutExchange); } //fanout死信交换机 @Bean("ttlDelayDeadLetterExchange") public FanoutExchange deadLetterExchange(){ return new FanoutExchange("TTL_DELAY_DEAD_LETTER_FANOUT_EXCHANGE"); } //死信队列 @Bean("ttlDelayDeadLetterQueue") public Queue ttlDelayDeadLetterQueue(){ return new Queue("TTL_DELAY_DEAD_LETTER_FANOUT_QUEUE"); } //死信队列和死信交换机绑定 @Bean public Binding deadLetterQueueBindExchange(@Qualifier("ttlDelayDeadLetterQueue") Queue queue, @Qualifier("ttlDelayDeadLetterExchange") FanoutExchange fanoutExchange){ return BindingBuilder.bind(queue).to(fanoutExchange); } }
TtlDelayConsumer
类,监听死信队列,这里收到的消息都是生产者生产消息以后的 5
秒,也就是延迟了 5
秒的消息:@Component public class TtlDelayConsumer { @RabbitHandler @RabbitListener(queues = "TTL_DELAY_DEAD_LETTER_FANOUT_QUEUE") public void fanoutConsumer(String msg){ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("【延迟队列】【" + sdf.format(new Date()) + "】收到死信队列消息:" + msg); } }
DelayQueueController
类作生产者来发送消息:@RestController @RequestMapping("/delay") public class DelayQueueController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping(value="/ttl/send") public String clearVipInfo(@RequestParam(value = "msg",defaultValue = "no message") String msg){ rabbitTemplate.convertAndSend("TTL_DELAY_FANOUT_EXCHANGE","",msg); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("消息发送成功【" + sdf.format(new Date()) + "】"); return "succ"; } }
http://localhost:8080/delay/ttl/send?msg=测试ttl延迟队列
进行测试,能够看到每条消息都是在发送 5
秒以后才能收到消息:假如咱们实际中,有的消息是 10
分钟过时,有的是 20
分钟过时,这时候咱们就须要创建多个队列,一旦时间维度很是庞大,那么就须要维护很是多的队列。说到这里,可能不少人会有疑问,咱们能够针对单条信息设置过时时间,大可没必要去定义多个队列?app
然而事实真的是如此吗?接下来咱们经过一个例子来验证下。分布式
TtlDelayRabbitConfig
类中的队列定义函数 x-message-ttl
属性去掉,不过须要注意的是咱们须要先把这个队列后台删除掉,不然同名队列重复建立无效:@Bean("ttlDelayQueue") public Queue ttlQueue(){ Map<String, Object> map = new HashMap<String, Object>(); // map.put("x-message-ttl", 5000);//注释掉这个属性,队列不设置过时时间 map.put("x-dead-letter-exchange", "TTL_DELAY_DEAD_LETTER_FANOUT_EXCHANGE");//过时后进入死信队列 return new Queue("TTL_QUEUE",false,false,false,map); }
DelayQueueController
类中的发送消息方法修改一下,对每条信息设置过时时间:@GetMapping(value="/ttl/send") public String ttlMsgSend(@RequestParam(value = "msg",defaultValue = "no message") String msg, @RequestParam(value = "time") String millTimes){ MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration(millTimes);//单条消息设置过时时间,单位:毫秒 Message message = new Message(msg.getBytes(), messageProperties); rabbitTemplate.convertAndSend("TTL_DELAY_FANOUT_EXCHANGE","",message); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("消息发送成功【" + sdf.format(new Date()) + "】"); return "succ"; }
2
条消息发送,一条 10
秒过时,一条 5
秒过时,先发送 10
秒的:http://localhost:8080/delay/ttl/send?msg=10秒过时消息&time=10000 http://localhost:8080/delay/ttl/send?msg=5秒过时消息&time=5000
咱们看到,两条消息都是 10
秒后过时,这是巧合吗?并非,这是由于 RabbitMQ
中的机制就是若是前一条消息没有出队,那么即便后一条消息已经失效,也必需要等前一条消息出队以后才能出队,因此这就是为何通常都尽可能避免同一个队列单条消息设置不一样过时时间的作法。函数
经过以上两个例子,使用死信队列来实现延迟队列,咱们能够获得几个很明显的缺点:工具
10
分钟过时,有的 20
分钟过时等),则须要建立不一样的交换机和队列来实现消息的路由。TTL
时可能会形成消息的阻塞。由于当前一条消息没有出队,后一条消息即便到期了也不能出队。为了不 TTL
和死信队列可能形成的问题,因此就很是有必要用一种新的更好的方案来替代实现延迟队列,这就是延时队列插件。测试
在 RabbitMQ
的 3.5.7
版本以后,提供了一个插件(rabbitmq-delayed-message-exchange
)来实现延迟队列 ,同时需保证 Erlang/OPT
版本为 18.0
以后。ui
RabbitMQ
版本在 3.5.7-3.7.x
的能够执行如下命令进行下载(也能够直接经过浏览器下载):wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez
若是 RabbitMQ
是 3.8
以后的版本,能够点击这里,找到延迟队列对应版本的插件,而后下载。插件
plugins
目录下,执行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
命令启动插件。若是要禁止该插件,则能够执行命令 rabbitmq-plugins disable rabbitmq_delayed_message_exchange
(启用插件后须要重启 RabbitMQ
才会生效)。PluginDelayRabbitConfig
配置类:@Configuration public class PluginDelayRabbitConfig { @Bean("pluginDelayExchange") public CustomExchange pluginDelayExchange() { Map<String, Object> argMap = new HashMap<>(); argMap.put("x-delayed-type", "direct");//必需要配置这个类型,能够是direct,topic和fanout //第二个参数必须为x-delayed-message return new CustomExchange("PLUGIN_DELAY_EXCHANGE","x-delayed-message",false, false, argMap); } @Bean("pluginDelayQueue") public Queue pluginDelayQueue(){ return new Queue("PLUGIN_DELAY_QUEUE"); } @Bean public Binding pluginDelayBinding(@Qualifier("pluginDelayQueue") Queue queue,@Qualifier("pluginDelayExchange") CustomExchange customExchange){ return BindingBuilder.bind(queue).to(customExchange).with("delay").noargs(); } }
PluginDelayConsumer
:@Component public class PluginDelayConsumer { @RabbitHandler @RabbitListener(queues = "PLUGIN_DELAY_QUEUE")//监听延时队列 public void fanoutConsumer(String msg){ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("【插件延迟队列】【" + sdf.format(new Date()) + "】收到消息:" + msg); } }
DelayQueueController
类,新增一个方法:@GetMapping(value="/plugin/send") public String pluginMsgSend(@RequestParam(value = "msg",defaultValue = "no message") String msg){ MessageProperties messageProperties = new MessageProperties(); messageProperties.setHeader("x-delay",5000);//延迟5秒被删除 Message message = new Message(msg.getBytes(), messageProperties); amqpTemplate.convertAndSend("PLUGIN_DELAY_EXCHANGE","delay",message);//交换机和路由键必须和配置文件类中保持一致 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("消息发送成功【" + sdf.format(new Date()) + "】"); return "succ"; }
http://localhost:8080/delay/plugin/send?msg=插件延迟队列消息
进行测试,能够看到,消息在延时 5
秒以后被消费:延迟队列的使用很是普遍,若是是单机部署,能够考虑使用 jdk
自带的 DelayQueue
,分布式部署能够采用 RabbitMQ
,Redis
等中间件来实现延迟队列。本文主要介绍了如何利用 RabbitMQ
实现两种延迟队列的两种方案,固然本文的例子只是引导,并无开启回调等消息确认模式,若是想了解 RabbitMQ
消息的可靠性传输的,能够点击这里。