SpringBoot实战电商项目mall(35k+star)地址: https://github.com/macrozheng/mall
RabbitMQ实现延迟消息的方式有两种,一种是使用死信队列
实现,另外一种是使用延迟插件
实现。死信队列
实现咱们之前曾经讲过,具体参考《mall整合RabbitMQ实现延迟消息》,此次咱们讲个更简单的,使用延迟插件
实现。html
学习本文须要对RabbitMQ有所了解,还不了解的朋友能够看下:《花了3天总结的RabbitMQ实用技巧,有点东西!》java
首先咱们须要下载并安装RabbitMQ的延迟插件。
rabbitmq_delayed_message_exchange
便可找到咱们须要下载的插件,下载和RabbitMQ配套的版本,不要弄错;plugins
目录下;sbin
目录下,使用以下命令启用延迟插件;rabbitmq-plugins enable rabbitmq_delayed_message_exchange
接下来咱们须要在SpringBoot中实现延迟消息功能,此次依然沿用商品下单的场景。好比说有个用户下单了,他60分钟不支付订单,订单就会被取消,这就是一个典型的延迟消息使用场景。
pom.xml
文件中添加AMQP
相关依赖;<!--消息队列相关依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
application.yml
添加RabbitMQ的相关配置;spring: rabbitmq: host: localhost # rabbitmq的链接地址 port: 5672 # rabbitmq的链接端口号 virtual-host: /mall # rabbitmq的虚拟host username: mall # rabbitmq的用户名 password: mall # rabbitmq的密码 publisher-confirms: true #若是对异步消息须要回调必须设置为true
/** * 消息队列配置 * Created by macro on 2018/9/14. */ @Configuration public class RabbitMqConfig { /** * 订单延迟插件消息队列所绑定的交换机 */ @Bean CustomExchange orderPluginDirect() { //建立一个自定义交换机,能够发送延迟消息 Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange(), "x-delayed-message",true, false,args); } /** * 订单延迟插件队列 */ @Bean public Queue orderPluginQueue() { return new Queue(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getName()); } /** * 将订单延迟插件队列绑定到交换机 */ @Bean public Binding orderPluginBinding(CustomExchange orderPluginDirect,Queue orderPluginQueue) { return BindingBuilder .bind(orderPluginQueue) .to(orderPluginDirect) .with(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey()) .noargs(); } }
x-delay
头来设置消息从交换机发送到队列的延迟时间;/** * 取消订单消息的发出者 * Created by macro on 2018/9/14. */ @Component public class CancelOrderSender { private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class); @Autowired private AmqpTemplate amqpTemplate; public void sendMessage(Long orderId,final long delayTimes){ //给延迟队列发送消息 amqpTemplate.convertAndSend(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange(), QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //给消息设置延迟毫秒值 message.getMessageProperties().setHeader("x-delay",delayTimes); return message; } }); LOGGER.info("send delay message orderId:{}",orderId); } }
/** * 取消订单消息的处理者 * Created by macro on 2018/9/14. */ @Component @RabbitListener(queues = "mall.order.cancel.plugin") public class CancelOrderReceiver { private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class); @Autowired private OmsPortalOrderService portalOrderService; @RabbitHandler public void handle(Long orderId){ LOGGER.info("receive delay message orderId:{}",orderId); portalOrderService.cancelOrder(orderId); } }
/** * 前台订单管理Service * Created by macro on 2018/8/30. */ @Service public class OmsPortalOrderServiceImpl implements OmsPortalOrderService { private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class); @Autowired private CancelOrderSender cancelOrderSender; @Override public CommonResult generateOrder(OrderParam orderParam) { //todo 执行一系类下单操做,具体参考mall项目 LOGGER.info("process generateOrder"); //下单完成后开启一个延迟消息,用于当用户没有付款时取消订单(orderId应该在下单后生成) sendDelayMessageCancelOrder(11L); return CommonResult.success(null, "下单成功"); } @Override public void cancelOrder(Long orderId) { //todo 执行一系类取消订单操做,具体参考mall项目 LOGGER.info("process cancelOrder orderId:{}",orderId); } private void sendDelayMessageCancelOrder(Long orderId) { //获取订单超时时间,假设为60分钟(测试用的30秒) long delayTimes = 30 * 1000; //发送延迟消息 cancelOrderSender.sendMessage(orderId, delayTimes); } }
30s
,咱们设置的延迟时间。2020-06-08 13:46:01.474 INFO 1644 --- [nio-8080-exec-1] c.m.m.t.s.i.OmsPortalOrderServiceImpl : process generateOrder 2020-06-08 13:46:01.482 INFO 1644 --- [nio-8080-exec-1] c.m.m.tiny.component.CancelOrderSender : send delay message orderId:11 2020-06-08 13:46:31.517 INFO 1644 --- [cTaskExecutor-4] c.m.m.t.component.CancelOrderReceiver : receive delay message orderId:11 2020-06-08 13:46:31.520 INFO 1644 --- [cTaskExecutor-4] c.m.m.t.s.i.OmsPortalOrderServiceImpl : process cancelOrder orderId:11
咱们以前使用过死信队列的方式,这里咱们把两种方式作个对比,先来聊下这两种方式的实现原理。
死信队列是这样一个队列,若是消息发送到该队列并超过了设置的时间,就会被转发到设置好的处理超时消息的队列当中去,利用该特性能够实现延迟消息。git
经过安装插件,自定义交换机,让交换机拥有延迟发送消息的能力,从而实现延迟消息。github
因为死信队列方式须要建立两个交换机(死信队列交换机+处理队列交换机)、两个队列(死信队列+处理队列),而延迟插件方式只需建立一个交换机和一个队列,因此后者使用起来更简单。spring
https://github.com/macrozheng...bash
mall项目全套学习教程连载中,关注公众号第一时间获取。app