Mq的使用中,延迟队列是不少业务都须要用到的,最近我也是刚在项目中用到,就在跟你们讲讲吧。git
延迟队列就是进入该队列的消息会被延迟消费的队列。而通常的队列,消息一旦入队了以后就会被消费者立刻消费。github
延迟队列能作什么?最多见的是如下两种场景:web
好比:用户生成订单以后,须要过一段时间校验订单的支付状态,若是订单仍未支付则须要及时地关闭订单;用户注册成功以后,须要过一段时间好比一周后校验用户的使用状况,若是发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。spring
好比消费者从队列里消费消息时失败了,可是想要延迟一段时间后自动重试。网络
若是不使用延迟队列,那么咱们只能经过一个轮询扫描程序去完成。这种方案既不优雅,也不方便作成统一的服务便于开发人员使用。可是使用延迟队列的话,咱们就能够垂手可得地完成。svg
在介绍具体思路钱,先介绍RabbitMQ的两个特性:Time-To-Live Extensions(消息存活时间) 和 Dead Letter Exchanges(死信交换机)学习
RabbitMQ容许咱们为消息或者队列设置TTL(time to live),也就是过时时间。TTL代表了一条消息可在队列中存活的最大时间,单位为毫秒。当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在通过TTL秒后“死亡”,成为Dead Letter(死信)。若是既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。测试
设置了TTL的消息在过时后会成为Dead Letter。其实在RabbitMQ中,一共有三种消息的“死亡”形式:ui
消息被拒绝。经过调用basic.reject或者basic.nack而且设置的requeue参数为false。code
消息由于设置了TTL而过时。
消息进入了一条已经达到最大长度的队列。
若是队列设置了Dead Letter Exchange(DLX),那么这些Dead Letter就会被从新publish(推送)到Dead Letter Exchange,经过Dead Letter Exchange路由到其余队列。
延迟消费是延迟队列最为经常使用的使用模式。以下图所示,生产者产生的消息首先会进入缓冲队列(图中红色队列)。经过RabbitMQ提供的TTL扩展,这些消息会被设置过时时间,等消息过时以后,这些消息会经过配置好的DLX转发到实际消费队列(图中蓝色队列),以此达到延迟消费的效果。
延迟重试本质上也是延迟消费的一种。
以下图所示,消费者发现该消息处理出现了异常,好比是由于网络波动引发的异常。那么若是不等待一段时间,直接就重试的话,极可能会致使在这期间内一直没法成功,形成必定的资源浪费。那么咱们能够将其先放在缓冲队列中(图中红色队列),等消息通过一段的延迟时间后再次进入实际消费队列中(图中蓝色队列),此时因为已通过了“较长”的时间了,异常的一些波动一般已经恢复,这些消息能够被正常地消费。
这里只贴上最主要的代码,所有的代码可查看github
Mq队列与交换机实例建立
/** * 缓冲队列 */ private String DELAY_BUFFER_QUEUE = "delay_buffer_queue"; /** * 实际消费交换机(DLX) */ private String DELAY_SERVICE_EXCHANGE = "delay_service_exchange"; /** * 实际消费队列 */ private String DELAY_SERVICE_QUEUE = "delay_service_queue"; /** * 消息过时时间 3秒 */ private Integer QUEUE_EXPIRATION = 3 * 1000; /** * 实际消费队列 * @return */ @Bean Queue delayServiceQueue(){ return QueueBuilder.durable(DELAY_SERVICE_QUEUE).build(); } /** * 实际消费交换机 * @return */ @Bean DirectExchange delayServiceExchange() { return new DirectExchange(DELAY_SERVICE_EXCHANGE); } /** * 实际消费队列绑定实际消费交换机(DLX) * @param delayServiceQueue * @param delayServiceExchange * @return */ @Bean Binding delayBinding(Queue delayServiceQueue, DirectExchange delayServiceExchange) { return BindingBuilder.bind(delayServiceQueue) .to(delayServiceExchange) .with(DELAY_SERVICE_QUEUE); } /** * 缓冲队列配置 * @return */ @Bean Queue delayBufferQueue(){ return QueueBuilder.durable(DELAY_BUFFER_QUEUE) // 死信交换机 DLX .withArgument("x-dead-letter-exchange", DELAY_SERVICE_EXCHANGE) // 目标routing key .withArgument("x-dead-letter-routing-key", DELAY_SERVICE_QUEUE) // 设置队列的过时时间 .withArgument("x-message-ttl", QUEUE_EXPIRATION) .build(); }
监听实际消费队列
@Component public class DelayMsgListener { @RabbitListener(queues="delay_service_queue") public void listenServiceMsg(Message message){ System.out.println(new Date()+ "收到延迟消息啦:"+new String(message.getBody())); } }
测试:发送消息到缓冲队列
@Test public void send1(){ System.out.println(new Date() +"发送延迟消息!!!"); amqpTemplate.convertAndSend("delay_buffer_queue","Hello!Delay Message!"); }
结果以下
能够看到,在发消息后3秒(TTL),实际消费队列接收到了消息并被消费
Mq队列与交换机实例建立
/** * 缓冲队列 */ private String RETRY_BUFFER_QUEUE = "retry_buffer_queue"; /** * 缓冲交换机 */ private String RETRY_BUFFER_EXCHANGE = "retry_buffer_exchange"; /** * 实际消费交换机(DLX) */ private String RETRY_SERVICE_EXCHANGE = "retry_service_exchange"; /** * 实际消费队列 */ private String RETRY_SERVICE_QUEUE = "retry_service_queue"; /** * 实际消费队列 * @return */ @Bean Queue retryServiceQueue(){ return QueueBuilder.durable(RETRY_SERVICE_QUEUE).build(); } /** * 实际消费交换机 * @return */ @Bean DirectExchange retryServiceExchange() { return new DirectExchange(RETRY_SERVICE_EXCHANGE); } /** * 实际消费队列绑定实际消费交换机(DLX) * @param retryServiceQueue * @param retryServiceExchange * @return */ @Bean Binding retryBinding(Queue retryServiceQueue, DirectExchange retryServiceExchange) { return BindingBuilder.bind(retryServiceQueue) .to(retryServiceExchange) .with(RETRY_SERVICE_QUEUE); } /** * 缓冲队列配置 * @return */ @Bean Queue retryBufferQueue(){ return QueueBuilder.durable(RETRY_BUFFER_QUEUE) // 死信交换机 DLX .withArgument("x-dead-letter-exchange", RETRY_SERVICE_EXCHANGE) // 目标routing key .withArgument("x-dead-letter-routing-key", RETRY_SERVICE_QUEUE) // 设置队列的过时时间 .withArgument("x-message-ttl", QUEUE_EXPIRATION) .build(); } /** * 缓冲交换机 * @return */ @Bean DirectExchange retryBufferExchange() { return new DirectExchange(RETRY_BUFFER_EXCHANGE); } /** * 缓冲队列绑定缓冲交换机 * @param retryBufferQueue * @param retryBufferQueue * @return */ @Bean Binding bufferBinding(Queue retryBufferQueue, DirectExchange retryBufferExchange) { return BindingBuilder.bind(retryBufferQueue) .to(retryBufferExchange) .with(RETRY_BUFFER_QUEUE); }
监听实际消费队列
@Component public class RetryMsgListener { /** * 缓冲队列 */ private String RETRY_BUFFER_QUEUE = "retry_buffer_queue"; /** * 缓冲交换机 */ private String RETRY_BUFFER_EXCHANGE = "retry_buffer_exchange"; @Autowired private MessagePropertiesConverter messagePropertiesConverter; @RabbitListener(queues="retry_service_queue") public void listenServiceMsg(@Payload Message message, Channel channel){ try { System.out.println(new Date() + "收到消息:" + new String(message.getBody())); //TODO 业务逻辑 //忽然出现异常 throw new RuntimeException("特殊异常"); }catch (Exception e){ Map<String,Object> headers = message.getMessageProperties().getHeaders(); try{ Long retryCount = getRetryCount(headers); //重试3次 if(retryCount < 3){ retryCount += 1; System.out.println("消费异常,准备重试,第"+retryCount+"次"); //转换为RabbitMQ 的Message Properties对象 AMQP.BasicProperties rabbitMQProperties = messagePropertiesConverter.fromMessageProperties( message.getMessageProperties(), "UTF-8"); //设置headers rabbitMQProperties.builder().headers(headers); //程序异常重试 //这里必须把rabbitMQProperties也传进来,不然死信队列没法识别是不是同一条信息,致使重试次数没法递增 channel.basicPublish(RETRY_BUFFER_EXCHANGE,RETRY_BUFFER_QUEUE,rabbitMQProperties, message.getBody()); }else { //TODO 重试失败,须要人工处理 (发送到失败队列或发邮件/信息) System.out.println("已重试3次,需人工处理!"); } }catch (IOException ioe){ System.out.println("消息重试失败!"); ioe.printStackTrace(); } } } /** * 获取重试次数 * 若是这条消息是死信,header中会有一个x-death的记录相关信息 * 其中包含死亡次数 * @param headers * @return */ private long getRetryCount(Map<String, Object> headers) { long retryCount = 0; if(null != headers) { if(headers.containsKey("x-death")) { List<Map<String, Object>> deathList = (List<Map<String, Object>>) headers.get("x-death"); if(!deathList.isEmpty()) { Map<String, Object> deathEntry = deathList.get(0); retryCount = (Long)deathEntry.get("count"); } } } return retryCount; } }
测试:发送消息到实际消费队列
@Test public void send2(){ System.out.println(new Date() +"发送延迟重试消息!!!"); //直接发消息到实际消费队列 amqpTemplate.convertAndSend("retry_service_queue","Hello!Retry Message!"); }
结果以下:
能够看到,消费异常后,重试了3次
延迟队列在实际业务中是常常被用到的,同窗们最好都学学哦,代码已上传github
https://github.com/zhangwenkang0/springcloud-learning-from-0-to-1/tree/master/rabbitmq-demo
若是以为不错,分享给你的朋友!
一个立志成大腿而天天努力奋斗的年轻人
伴学习伴成长,成长之路你并不孤单!