背景:
在一些应用场景中,程序并不须要同步执行,例如用户注册以后的邮件或者短信通知提醒。这种场景的实现则是在当前线程,开启一个新线 程,当前线程在开启新线程以后会继续往下执行,无需等待新线程执行完成。 但例如一些须要延时的场景则不仅是开启新线程执行如此简单了。譬如提交订单后在15分钟内没有完成支付,订单须要关闭,这种情 况,是否只开启一个异步线程就不适用了呢。
那么就单单实如今提交订单后的15分钟内,若是没有完成支付,系统关闭订单。有哪些可行的方案呢。html
方案:
使用定时任务轮询订单表,查询出订单建立了15分钟以上而且未支付的订单,若是有查询出此类订单则执行关闭。java
缺点:假设每1分钟轮询一次,则会存在秒级偏差,若是秒级轮询,则会极其消耗性能,影响程序的健壮性。
提交订单时开启一个新线程,而新线程直接休眠15分钟,休眠结束后开始执行订单关闭web
缺点:若是在线程休眠时,重启了整个服务,那么会怎样呢?
使用延时消息队列spring
缺点:须要额外部署消息中间件
综上考虑:使用延时消息队列则成为最佳选择,消息延时发布以后,保存在消息中间件中,在15分钟后才会正式发布至队列,延时队列监听器在15分钟后监听到消息时,才开始执行,而这期间,即便项目重启也没有关系。json
注意:这里不采用网上流传的死信队列转发,而是采用rabbitmq3.7+版本的延时队列插件,因此务必安装3.7+版本并启用延时队列插件。
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.4.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: admin password: admin listener: direct: acknowledge-mode: MANUAL simple: acknowledge-mode: MANUAL
package cn.rongyuan.config; import java.util.HashMap; import java.util.Map; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @title rabbitmq配置类 * @author zengzp * @time 2018年8月20日 上午10:46:43 * @Description */ @Configuration public class RabbitConfig { // 支付超时延时交换机 public static final String Delay_Exchange_Name = "delay.exchange"; // 超时订单关闭队列 public static final String Timeout_Trade_Queue_Name = "close_trade"; @Bean public Queue delayPayQueue() { return new Queue(RabbitConfig.Timeout_Trade_Queue_Name, true); } // 定义广播模式的延时交换机 无需绑定路由 @Bean FanoutExchange delayExchange(){ Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "direct"); FanoutExchange topicExchange = new FanoutExchange(RabbitConfig.Delay_Exchange_Name, true, false, args); topicExchange.setDelayed(true); return topicExchange; } // 绑定延时队列与交换机 @Bean public Binding delayPayBind() { return BindingBuilder.bind(delayPayQueue()).to(delayExchange()); } // 定义消息转换器 @Bean Jackson2JsonMessageConverter jsonMessageConverter() { return new Jackson2JsonMessageConverter(); } // 定义消息模板用于发布消息,而且设置其消息转换器 @Bean RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) { final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(jsonMessageConverter()); return rabbitTemplate; } @Bean RabbitAdmin rabbitAdmin(final ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } }
@Autowired RabbitTemplate rabbitTemplate; // 经过广播模式发布延时消息 延时30分钟 持久化消息 消费后销毁 这里无需指定路由,会广播至每一个绑定此交换机的队列 rabbitTemplate.convertAndSend(RabbitConfig.Delay_Exchange_Name, "", trade.getTradeCode(), message ->{ message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); message.getMessageProperties().setDelay(30 * (60*1000)); // 毫秒为单位,指定此消息的延时时长 return message; });
package cn.rongyuan.mq.consumer; import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; import cn.rongyuan.config.RabbitConfig; import cn.rongyuan.service.TradeService; import cn.rongyuan.util.ExceptionUtil; /** * @title 消息消费端 * @author zengzp * @time 2018年8月20日 上午11:00:26 * @Description */ @Component public class PayTimeOutConsumer { @Autowired TradeService tradeService; private Logger logger = LoggerFactory.getLogger(getClass()); @RabbitListener(queues = RabbitConfig.Timeout_Trade_Queue_Name) public void process(String tradeCode, Message message, Channel channel) throws IOException{ try { logger.info("开始执行订单[{}]的支付超时订单关闭......", tradeCode); tradeService.cancelTrade(tradeCode); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); logger.info("超时订单处理完毕"); } catch (Exception e) { logger.error("超时订单处理失败:{}", ExceptionUtil.getMessage(e)); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } } }
参考资料:
一、spring amqp 官方文档:https://docs.spring.io/spring-amqp/docs/2.0.0.M2/reference/htmlsingle/#delayed-message-exchange 二、rabbitmq 官方文档:http://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/