延迟队列与死信队列时息息相关的,它具备特色:spring
队列,意味着内部的元素是有序的,元素的出队和入队是有方向性的,元素从一端进入,从另外一端取出ide
延时,这是最重要的特性,普通队列中的元素老是等着但愿被早点取出处理,而延时队列中的元素则是但愿等待特定时间后,消费者才能拿到这个消息进行消费。函数
TTL
是 RabbitMQ 中一个消息或者队列的属性,代表一条消息或者该队列中的全部消息的最大存活时间
,单位是毫秒。换句话说,若是一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息若是在 TTL 设置的时间内没有被消费,则会成为“死信”。若是同时配置了队列的 TTL 和消息的 TTL,那么较小的那个值将会被使用。post
也就是说咱们能够利用这个机制,让消息过时后,变成死信,就又交给咱们的死信交换机来处理了~ui
延时队列能够解决不少特定场景下,带时间属性的任务需求,如:订单建立半小时内未支付进行取消订单。。。this
有两种方式设置 TTL 值,插件
第一种是在建立队列的时候设置队列的 “x-message-ttl” 属性设计
@Bean public Queue delayQueue() { String queueName = "delay_queue"; Map<String, Object> args = new HashMap<>(1); args.put("x-message-ttl", "6000"); return new Queue(queueName, true, false, false, args); }
另外一种方式是针对每条消息设置 TTL3d
rabbitTemplate.convertAndSend(exchange, routingKey, (message) -> { message.getMessageProperties().setExpiration("6000"); return message; });
区别:code
设置了队列的 TTL 属性,那么一旦消息过时,就会被队列丢弃
给消息设置 TTL 属性,消息过时也不必定会立刻丢弃,由于消息是否过时是在即将投递到消费者以前断定的,若是队列存在消息积压问题,那么已过时的消息可能还会存活较长些时间
死信队列 + 消息TTL = 延迟队列
消息大体流向
delayMQ
延迟队列 (delay_queu_A) 设置 TTL 能让信息在延迟多久后成为死信,成为死信后的消息都会被投递到死信队列中,这样只须要消费者一直消费死信队列(dlx_queue_A) 里就行了,由于里面的消息都是但愿被处理的延迟后的消息。
声明交换机、队列以及他们的关系:
// 配置延迟队列 @Bean public TopicExchange delayExchange() { String exchangeName = "delay_exchange"; return new TopicExchange(exchangeName); } @Bean public Queue delayQueueA() { String queueName = "delay_queue_A"; // 设置死信发送至 dlx_exchange 交换机,设置路由键为 bind.dlx.A String dlxExchangeName = "dlx_exchange"; String bindDlxRoutingKeyA = "bind.dlx.A"; Map<String, Object> args = new HashMap<>(3); // 设置队列的延迟属性,6秒 args.put("x-message-ttl", 6000); args.put("x-dead-letter-exchange", dlxExchangeName); args.put("x-dead-letter-routing-key", bindDlxRoutingKeyA); return new Queue(queueName, true, false, false, args); } @Bean public Binding bindingDelayExchange() { String routingKey = "bind.delay.A"; return BindingBuilder.bind(delayQueueA()).to(delayExchange()).with(routingKey); } // 配置死信队列 @Bean public TopicExchange dlxExchange() { String exchangeName = "dlx_exchange"; return new TopicExchange(exchangeName); } @Bean public Queue dlxQueueA() { String queueName = "dlx_queue_A"; return new Queue(queueName); } @Bean public Binding bindingDlxExchange() { String routingKey = "#.A"; return BindingBuilder.bind(dlxQueueA()).to(dlxExchange()).with(routingKey); }
yml
spring: rabbitmq: host: 192.168.159.129 port: 5672 username: admin password: admin # 虚拟host 能够不设置,使用 server 默认 host virtual-host: listener: simple: default-requeue-rejected: acknowledge-mode: manual
消费者
@RabbitListener(queues = "dlx_queue_A") public void receiver2(@Payload String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException { System.out.println("当前时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + "死信队列 - dlx_queue_A 收到消息:" + msg); channel.basicAck(deliveryTag, false); }
生产者
@Test public void demo_10_Producer() { String exchange = "delay_exchange_A"; String routingKey = "delay.routing.key.A"; String msg = "发送给延迟队列 delay_queue_A 的消息"; System.out.println("当前时间:" + simpleDateFormat.format(new Date()) + "开始发送消息:" + msg); rabbitTemplate.convertAndSend(exchange, routingKey, msg); }
输出
当前时间:2020-09-27 21:10:20 开始发送消息:发送给延迟队列 delay_queue_A 的消息 当前时间:2020-09-27 21:10:26 死信队列 - dlx_queue_A 收到消息:发送给延迟队列 delay_queue_A 的消息
缺陷
若是这样使用的话,每增长一个新的时间需求,就要新增一个队列。如须要一个小时后处理,那么就须要增长 TTL 为一个小时的队列,若是此时消息的过时时间不肯定或者消息过时时间维度过多,在消费端咱们就要去监听多个消息队列,岂不是要增长无数个队列才能知足需求??
设计成一个通用的延时队列,咱们能够给不一样的消息设置不一样的 TTL 过时时间,以达到动态设置延迟时间。
声明交换机、队列以及它们的关系:
// 配置延迟队列 @Bean public TopicExchange delayExchange() { String exchangeName = "delay_exchange"; return new TopicExchange(exchangeName); } @Bean public Queue delayQueueB() { String queueName = "delay_queue_B"; // 设置死信发送至 dlx_exchange 交换机,设置路由键为 bind_dlx_B String dlxExchangeName = "dlx_exchange"; String bindDlxRoutingKeyB = "bind.dlx.B"; Map<String, Object> args = new HashMap<>(2); args.put("x-dead-letter-exchange", dlxExchangeName); args.put("x-dead-letter-routing-key", bindDlxRoutingKeyB); return new Queue(queueName, true, false, false, args); } @Bean public Binding bindingDelayExchange() { String routingKey = "bind.delay.B"; return BindingBuilder.bind(delayQueueB()).to(delayExchange()).with(routingKey); } // 配置死信队列 @Bean public TopicExchange dlxExchange() { String exchangeName = "dlx_exchange"; return new TopicExchange(exchangeName); } @Bean public Queue dlxQueueB() { String queueName = "dlx_queue_B"; return new Queue(queueName); } @Bean public Binding bindingDlxExchange() { String routingKey = "#.B"; return BindingBuilder.bind(dlxQueueB()).to(dlxExchange()).with(routingKey); }
消费者
@RabbitListener(queues = "dlx_queue_B") public void receiverB(@Payload String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException { System.out.println("当前时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 死信队列 - dlx_queue_B 收到消息:" + msg); channel.basicAck(deliveryTag, false); }
生产者
@Test public void producer_B() throws Exception { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String exchange = "delay_exchange"; String routingKey = "bind.delay.B"; String msg = "我是第一条消息"; // 延迟时间 String delayTime = "6000"; System.out.println("当前时间:" + simpleDateFormat.format(new Date()) + "开始发送消息:" + msg + " 延迟的时间为:" + delayTime); rabbitTemplate.convertAndSend(exchange, routingKey, msg, new MyMessagePostProcessor(delayTime)); Thread.sleep(30000L); }
MyMessagePostProcessor
/** * 由于要给消息设置 TTL,这里建立了一个 MessagePostProcessor 的实例来设置过时时间 */ class MyMessagePostProcessor implements MessagePostProcessor { // 延迟时间 毫秒 private String delayTime; MyMessagePostProcessor(String delayTime) { this.delayTime = delayTime; } @Override public Message postProcessMessage(Message message) throws AmqpException { // 设置延迟时间 message.getMessageProperties().setExpiration(delayTime); return message; } }
这里的 convertAndSend 使用的第四参数为 MessagePostProcessor,我这里采用构造函数的方式来动态设置消息的过时时间。
效果
当前时间:2020-09-27 21:20:20 开始发送消息:我是第一条消息 延迟的时间为:6000 当前时间:2020-09-27 21:20:26 死信队列 - dlx_queue_B 收到消息:我是第一条消息
缺陷
可是上面咱们也提到了消息过时也不必定会立刻丢弃。消息到了过时时间可能并不会按时“死亡“,由于 RabbitMQ 只会检查第一个消息是否过时,若是过时则丢到死信队列,索引若是第一个消息的延时时长很长,而第二个消息的延时时长很短,则第二个消息并不会优先获得执行。
例子:
生产者
@Test public void producer_B() throws Exception { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String exchange = "delay_exchange"; String routingKey = "bind.delay.B"; String msg = "我是第一条消息"; // 延迟时间 String delayTime = "6000"; System.out.println("当前时间:" + simpleDateFormat.format(new Date()) + "开始发送消息:" + msg + " 延迟的时间为:" + delayTime); rabbitTemplate.convertAndSend(exchange, routingKey, msg, new MyMessagePostProcessor(delayTime)); msg = "我是第二条消息"; // 修改延迟时间 delayTime = "3000"; System.out.println("当前时间:" + simpleDateFormat.format(new Date()) + "开始发送消息:" + msg + " 延迟的时间为:" + delayTime); rabbitTemplate.convertAndSend(exchange, routingKey, msg, new MyMessagePostProcessor(delayTime)); Thread.sleep(30000L); }
效果
当前时间:2020-09-27 21:23:20 开始发送消息:我是第一条消息 延迟的时间为:6000 当前时间:2020-09-27 21:23:20 开始发送消息:我是第二条消息 延迟的时间为:3000 当前时间:2020-09-27 21:23:26 死信队列 - dlx_queue_B 收到消息:我是第一条消息 当前时间:2020-09-27 21:23:26 死信队列 - dlx_queue_B 收到消息:我是第二条消息
能够看到但延迟最久的第一条信息消费后,紧跟其后的已通过期了的第二条消息也接着消费了
延时队列在须要延时处理的场景下很是有用,使用 RabbitMQ 来实现延时队列能够很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,经过RabbitMQ集群的特性,能够很好的解决单点故障问题,不会由于单个节点挂掉致使延时队列不可用或者消息丢失。
死信队列 DLX + TTL 的方式来实现延迟队列,这也是一种通用的作法。
无论哪一种方式都有各自的优缺点,根据业务状况来考虑。若是要实如今消息粒度上添加TTL,并使其在设置的TTL时间及时死亡,可使用 RabbitMQ 的 rabbitmq_delayed_message_exchange插件的方式实现。