文章末尾有源代码地址
https://www.sunnyblog.top/detail.html?id=1265257400324063232
本章节主要实现消息的延迟消费,在学习延迟消费以前必须先了解RabbitMQ两个基本概念,消息的TTL和死信Exchange,经过这二者的组合来实现消息的延迟消费。
不想看原理讲解的,直接经过标题6看代码实现html
消息的TTL就是消息的存活时间。RabbitMQ能够对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也能够对每个单独的消息作单独的设置。超过了这个时间,咱们认为这个消息就死了,称之为死信。app
这里咱们以最熟悉的12306购票为例进行案例场景的分析,12306购票步骤以下:学习
在RabbitMQConfig中建立队列、交换机以及绑定关系测试
@Configuration
public class RabbitMQConfig {ui
/** * 测试发送消息到MQ * @return
*/ @Bean public Queue testHello() { return new Queue(SysConstant.QUEUE_TEST_HELLO); } /** * 死信交换机 * @return */ @Bean public DirectExchange sysOrderDelayExchange() { return new DirectExchange(SysConstant.SYS_ORDER_DELAY_EXCHANGE); } /** * 死信队列 * @return */ @Bean public Queue sysOrderDelayQueue() { Map<String, Object> map = new HashMap<String, Object>(16); map.put("x-dead-letter-exchange",SysConstant.SYS_ORDER_RECEIVE_EXCHANGE); //指定死信送往的交换机 map.put("x-dead-letter-routing-key", SysConstant.SYS_ORDER_RECEIVE_KEY); //指定死信的routingkey return new Queue(SysConstant.SYS_ORDER_DELAY_QUEUE, true, false, false, map); } /** * 给死信队列绑定死信交换机 * @return */ @Bean public Binding sysOrderDelayBinding() { return BindingBuilder.bind(sysOrderDelayQueue()).to(sysOrderDelayExchange()).with(SysConstant.SYS_ORDER_DELAY_KEY); } /** * 死信接收交换机,用于接收死信队列的消息 * @return */ @Bean public DirectExchange sysOrderReceiveExchange() { return new DirectExchange(SysConstant.SYS_ORDER_RECEIVE_EXCHANGE); } /** * 死信接收队列 * @return */ @Bean public Queue sysOrderReceiveQueue() { return new Queue(SysConstant.SYS_ORDER_RECEIVE_QUEUE); } /** * 死信接收交换机绑定接收死信队列消费队列 * @return */ @Bean public Binding sysOrdeReceiveBinding() { return BindingBuilder.bind(sysOrderReceiveQueue()).to(sysOrderReceiveExchange()).with(SysConstant.SYS_ORDER_RECEIVE_KEY); } }
发送延时消息到死信交换器方法spa
@Service
public class MsgService {日志
@Autowired private RabbitTemplate rabbitTemplate; /** * 发送延时消息到mq * @param exchange 死信交换机 * @param routeKey 路由key * @param data 发送数据 * @param delayTime 过时时间,单位毫秒
*/ public void sendDelayMsgToMQ(String exchange, String routeKey, String data,int delayTime) { rabbitTemplate.convertAndSend(exchange, routeKey, data, message -> { message.getMessageProperties().setExpiration(delayTime + ""); return message; }); } }
监听队列消息ReceiveMsgListener类code
/**orm
* 获取到的延时消息 * 这里接收到消息进行对应的业务处理(例如:取消订单,关闭支付,回滚库存等 ...) * @param msg
*/ @RabbitListener(queues = SysConstant.SYS_ORDER_RECEIVE_QUEUE) @RabbitHandler public void getdelayMsg(String msg) { log.info("MQ接收消息时间:{},消息内容:{}", DateUtil.formatDateTime(DateUtil.date()),msg); log.info("------->这里实现订单关闭、支付关闭、回滚库存业务逻辑..."); }
建立Controller向队列发送消息,设置过时时间10秒htm
@RestController
@RequestMapping("mq")
@Slf4j
public class MQController {
@Autowired private MsgService msgService; @GetMapping("sendMsg") public String sendMsg() { log.info("发送延时消息时间:" + DateUtil.formatDateTime(DateUtil.date())); OrderInfo orderInfo = new OrderInfo(); orderInfo.setOrderId(IdUtil.fastSimpleUUID()); orderInfo.setOrderState("待支付"); orderInfo.setPayMoney(999.88); msgService.sendDelayMsgToMQ(SysConstant.SYS_ORDER_DELAY_EXCHANGE,SysConstant.SYS_ORDER_DELAY_KEY, JSONUtil.toJsonStr(orderInfo),10*1000);//1分钟 return JSONUtil.toJsonStr("发送延时消息成功"); }
}
https://www.sunnyblog.top/index.html?tagId=1264009609236971520
详细开发技术文档尽在 点击这里查看技术文档 ;更多技术文章: https://www.sunnyblog.top;任何疑问加QQ群咨询:534073451