>前段时间在编写通用的消息通知服务时,因为须要实现相似通知失败时,须要延后几分钟再次进行发送,进行屡次尝试后,进入定时发送机制。此机制,在原先对接银联支付时,银联的异步通知也是相似的,在第一次通知失败后,支付标准服务会重发,最多发送五次,每次的间隔时间为一、四、八、16分钟等。本文就简单讲解下使用RabbitMQ实现延时消息队列功能。html
>在此以前,简单说明下基于RabbitMQ实现延时队列的相关知识及说明下延时队列的使用场景。java
>在不少的业务场景中,延时队列能够实现不少功能,此类业务中,通常上是非实时的,须要延迟处理的,须要进行重试补偿的。git
>自己在RabbitMQ
中是未直接提供延时队列功能的,但可使用TTL(Time-To-Live,存活时间)
和DLX(Dead-Letter-Exchange,死信队列交换机)
的特性实现延时队列的功能。github
>RabbitMQ
中能够对队列和消息分别设置TTL,TTL代表了一条消息可在队列中存活的最大时间。当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在TTL时间后**死亡
成为Dead Letter
**。若是既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。web
>上个知识点也提到了,设置了TTL
的消息或队列最终会成为Dead Letter
,当消息在一个队列中变成死信以后,它能被从新发送到另外一个交换机中,这个交换机就是DLX,绑定此DLX的队列就是死信队列。redis
一个消息变成死信通常上是因为如下几种状况;spring
因此,经过TTL
和DLX
的特性能够模拟实现延时队列的功能。当队列中的消息超时成为死信后,会把消息死信从新发送到配置好的交换机中,而后分发到真实的消费队列。故简单来讲,咱们能够建立2个队列,一个队列用于发送消息,一个队列用于消息过时后的转发的目标队列。数据库
>如下使用SpringBoot
集成RabbitMQ
进行实战说明,在进行http
消息通知时,若通知失败(地址不可用或者链接超时)时,将此消息转入延时队列中,待特定时间后进行从新发送。json
0.引入pom依赖api
<!-- rabbit --> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-amqp</artifactid> </dependency> <!-- 简化http操做 --> <dependency> <groupid>cn.hutool</groupid> <artifactid>hutool-http</artifactid> <version>4.5.16</version> </dependency> <dependency> <groupid>cn.hutool</groupid> <artifactid>hutool-json</artifactid> <version>4.5.16</version> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency>
1.编写rabbitmq
配置文件(关键配置) RabbitConfig.java
/** * * @ClassName 类名:RabbitConfig * @Description 功能说明: * <p> * TODO *</p> ************************************************************************ * @date 建立日期:2019年7月17日 * @author 建立人:oKong * @version 版本号:V1.0 *<p> ***************************修订记录************************************* * * 2019年7月17日 oKong 建立该类功能。 * *********************************************************************** *</p> */ @Configuration public class RabbitConfig { @Autowired ConnectionFactory connectionFactory; /** * 消费者线程数 设置大点 大几率是能通知到的 */ @Value("${http.notify.concurrency:50}") int concurrency; /** * 延迟队列的消费者线程数 可设置小点 */ @Value("${http.notify.delay.concurrency:20}") int delayConcurrency; @Bean public RabbitAdmin rabbitAdmin() { return new RabbitAdmin(connectionFactory); } @Bean public DirectExchange httpMessageNotifyDirectExchange(RabbitAdmin rabbitAdmin) { //durable 是否持久化 //autoDelete 是否自动删除,即服务端或者客服端下线后 交换机自动删除 DirectExchange directExchange = new DirectExchange(ApplicationConstant.HTTP_MESSAGE_EXCHANGE,true,false); directExchange.setAdminsThatShouldDeclare(rabbitAdmin); return directExchange; } //设置消息队列 @Bean public Queue httpMessageStartQueue(RabbitAdmin rabbitAdmin) { /* 建立接收队列,4个参数 name - 队列名称 durable - false,不进行持有化 exclusive - true,独占性 autoDelete - true,自动删除*/ Queue queue = new Queue(ApplicationConstant.HTTP_MESSAGE_START_QUEUE_NAME, true, false, false); queue.setAdminsThatShouldDeclare(rabbitAdmin); return queue; } //队列绑定交换机 @Bean public Binding bindingStartQuene(RabbitAdmin rabbitAdmin,DirectExchange httpMessageNotifyDirectExchange, Queue httpMessageStartQueue) { Binding binding = BindingBuilder.bind(httpMessageStartQueue).to(httpMessageNotifyDirectExchange).with(ApplicationConstant.HTTP_MESSAGE_START_RK); binding.setAdminsThatShouldDeclare(rabbitAdmin); return binding; } @Bean public Queue httpMessageOneQueue(RabbitAdmin rabbitAdmin) { Queue queue = new Queue(ApplicationConstant.HTTP_MESSAGE_ONE_QUEUE_NAME, true, false, false); queue.setAdminsThatShouldDeclare(rabbitAdmin); return queue; } @Bean public Binding bindingOneQuene(RabbitAdmin rabbitAdmin,DirectExchange httpMessageNotifyDirectExchange, Queue httpMessageOneQueue) { Binding binding = BindingBuilder.bind(httpMessageOneQueue).to(httpMessageNotifyDirectExchange).with(ApplicationConstant.HTTP_MESSAGE_ONE_RK); binding.setAdminsThatShouldDeclare(rabbitAdmin); return binding; } //-------------设置延迟队列--开始-------------------- @Bean public Queue httpDelayOneQueue() { //name - 队列名称 //durable - true //exclusive - false //autoDelete - false return QueueBuilder.durable("http.message.dlx.one") //如下是重点:当变成死信队列时,会转发至 路由为x-dead-letter-exchange及x-dead-letter-routing-key的队列中 .withArgument("x-dead-letter-exchange", ApplicationConstant.HTTP_MESSAGE_EXCHANGE) .withArgument("x-dead-letter-routing-key", ApplicationConstant.HTTP_MESSAGE_ONE_RK) .withArgument("x-message-ttl", 1*60*1000)//1分钟 过时时间(单位:毫秒),当过时后 会变成死信队列,以后进行转发 .build(); } //绑定到交换机上 @Bean public Binding bindingDelayOneQuene(RabbitAdmin rabbitAdmin, DirectExchange httpMessageNotifyDirectExchange, Queue httpDelayOneQueue) { Binding binding = BindingBuilder.bind(httpDelayOneQueue).to(httpMessageNotifyDirectExchange).with("delay.one"); binding.setAdminsThatShouldDeclare(rabbitAdmin); return binding; } //-------------设置延迟队列--结束-------------------- //建议将正常的队列和延迟处理的队列分开 //设置监听容器 @Bean("notifyListenerContainer") public SimpleRabbitListenerContainerFactory httpNotifyListenerContainer() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 手动ack factory.setConnectionFactory(connectionFactory); factory.setPrefetchCount(1); factory.setConcurrentConsumers(concurrency); return factory; } // 设置监听容器 @Bean("delayNotifyListenerContainer") public SimpleRabbitListenerContainerFactory httpDelayNotifyListenerContainer() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 手动ack factory.setConnectionFactory(connectionFactory); factory.setPrefetchCount(1); factory.setConcurrentConsumers(delayConcurrency); return factory; } }
ApplicationConstant.java
public class ApplicationConstant { /** * 发送http通知的 exchange 队列 */ public static final String HTTP_MESSAGE_EXCHANGE = "http.message.exchange"; /** * 配置消息队列和路由key值 */ public static final String HTTP_MESSAGE_START_QUEUE_NAME = "http.message.start"; public static final String HTTP_MESSAGE_START_RK = "rk.start"; public static final String HTTP_MESSAGE_ONE_QUEUE_NAME = "http.message.one"; public static final String HTTP_MESSAGE_ONE_RK = "rk.one"; /** * 通知队列对应的延迟队列关系,即过时队列以后发送到下一个的队列信息,能够根据实际状况添加,固然也能够根据必定规则自动生成 */ public static final Map<string,string> delayRefMap = new HashMap<string, string>() { /** * */ private static final long serialVersionUID = -779823216035682493L; { put(HTTP_MESSAGE_START_QUEUE_NAME, "delay.one"); } }; }
简单来讲,就是建立一个正常消息发送队列,用于接收http消息请求的参数,同时进行http请求。同时,建立一个延时队列,设置其x-dead-letter-exchange
、x-dead-letter-routing-key
和x-message-ttl
值,将其转发到正常的队列中。使用一个map对象维护一个关系,当正常消息异常时,须要发送的延时队列的队列名称,固然时间场景汇总,根据须要能够进行动态配置或者根据必定规则进行动态映射。
2.建立监听类,用于消息的消费操做,此处使用@RabbitListener
来消费消息(固然也可使用SimpleMessageListenerContainer
进行消息配置的),建立了一个正常消息监听和延时队列监听,因为通常上异常通知是低几率事件,可根据不一样的监听容器进行差别化配置。
/** * * @ClassName 类名:HttpMessagerLister * @Description 功能说明:http通知消费监听接口 * <p> * TODO *</p> ************************************************************************ * @date 建立日期:2019年7月17日 * @author 建立人:oKong * @version 版本号:V1.0 *<p> ***************************修订记录************************************* * * 2019年7月17日 oKong 建立该类功能。 * *********************************************************************** *</p> */ @Component @Slf4j public class HttpMessagerLister { @Autowired HttpMessagerService messagerService; @RabbitListener(id = "httpMessageNotifyConsumer", queues = {ApplicationConstant.HTTP_MESSAGE_START_QUEUE_NAME}, containerFactory = "notifyListenerContainer") public void httpMessageNotifyConsumer(Message message, Channel channel) throws Exception { doHandler(message, channel); } @RabbitListener(id= "httpDelayMessageNotifyConsumer", queues = { ApplicationConstant.HTTP_MESSAGE_ONE_QUEUE_NAME,}, containerFactory = "delayNotifyListenerContainer") public void httpDelayMessageNotifyConsumer(Message message, Channel channel) throws Exception { doHandler(message, channel); } private void doHandler(Message message, Channel channel) throws Exception { String body = new String(message.getBody(),"utf-8"); String queue = message.getMessageProperties().getConsumerQueue(); log.info("接收到通知请求:{},队列名:{}",body, queue); //消息对象转换 try { HttpEntity httpNotifyDto = JSONUtil.toBean(body, HttpEntity.class); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //发送通知 messagerService.notify(queue, httpNotifyDto); } catch(Exception e) { log.error(e.getMessage()); //ack channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } }
HttpMessagerService.java
:消息真正处理的类,此类是关键,这里未进行日志记录,真实场景中,强烈建议进行消息通知的日志存储,防止往后信息的查看,同时也能经过发送状态,在重试次数都失败后,进行定时再次发送功能,同时也有据可查。
@Component @Slf4j public class HttpMessagerService { @Autowired AmqpTemplate mqTemplate; public void notify(String queue,HttpEntity httpEntity) { //发起请求 log.info("开始发起http请求:{}", httpEntity); try { switch(httpEntity.getMethod().toLowerCase()) { case "POST": HttpUtil.post(httpEntity.getUrl(), httpEntity.getParams()); break; case "GET": default: HttpUtil.get(httpEntity.getUrl(), httpEntity.getParams()); } } catch (Exception e) { //发生异常,放入延迟队列中 String nextRk = ApplicationConstant.delayRefMap.get(queue); if(ApplicationConstant.HTTP_MESSAGE_ONE_QUEUE_NAME.equals(queue)) { //若已是最后一个延迟队列的消息队列了,则后续可直接放入数据库中 待后续定时策略进行再次发送 log.warn("http通知已经通知N次失败,进入定时进行发起通知,url={}", httpEntity.getUrl()); } else { log.warn("http从新发送通知:{}, 通知队列rk为:{}, 原队列:{}", httpEntity.getUrl(), nextRk, queue); mqTemplate.convertAndSend(ApplicationConstant.HTTP_MESSAGE_EXCHANGE, nextRk, cn.hutool.json.JSONUtil.toJsonStr(httpEntity)); } } } }
3.建立控制层服务(真实场景中,如SpringCloud
微服务中,通常上是建立个api接口,供其余服务进行调用)
@Slf4j @RestController @Api(tags = "http测试接口") public class HttpDemoController { @Autowired AmqpTemplate mqTemplate; @PostMapping("/send") @ApiOperation(value="send",notes = "发送http测试") public String sendHttp(@RequestBody HttpEntity httpEntity) { //发送http请求 log.info("开始发起http请求,发布异步消息:{}", httpEntity); mqTemplate.convertAndSend(ApplicationConstant.HTTP_MESSAGE_EXCHANGE, ApplicationConstant.HTTP_MESSAGE_START_RK, cn.hutool.json.JSONUtil.toJsonStr(httpEntity)); return "发送成功:url=" + httpEntity.getUrl(); } }
4.配置文件添加RabbitMQ
相关配置信息
spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ # 通知-消费者线程数 设置大点 大几率是能通知到的 http.notify.concurrency=150 # 延迟队列的消费者线程数 可设置小点 http.notify.delay.concurrency=10
5.编写启动类。
@SpringBootApplication @Slf4j public class DelayQueueApplication { public static void main(String[] args) throws Exception { SpringApplication.run(DelayQueueApplication.class, args); log.info("spring-boot-rabbitmq-delay-queue-chapter38服务启动!"); } }
6.启动服务。使用swagger
进行简单调用测试。
2019-07-20 23:52:23.792 INFO 65216 --- [nio-8080-exec-1] c.l.l.s.c.controller.HttpDemoController : 开始发起http请求,发布异步消息:HttpEntity(url=www.baidu.com, params={a=1}, method=get) 2019-07-20 23:52:23.794 INFO 65216 --- [TaskExecutor-97] c.l.l.s.chapter38.mq.HttpMessagerLister : 接收到通知请求:{"method":"get","params":{"a":1},"url":"www.baidu.com"},队列名:http.message.start 2019-07-20 23:52:23.794 INFO 65216 --- [TaskExecutor-97] c.l.l.s.c.service.HttpMessagerService : 开始发起http请求:HttpEntity(url=www.baidu.com, params={a=1}, method=get)
2019-07-20 23:53:14.699 INFO 65216 --- [nio-8080-exec-4] c.l.l.s.c.controller.HttpDemoController : 开始发起http请求,发布异步消息:HttpEntity(url=www.baidu.com1, params={a=1}, method=get) 2019-07-20 23:53:14.705 INFO 65216 --- [TaskExecutor-84] c.l.l.s.chapter38.mq.HttpMessagerLister : 接收到通知请求:{"method":"get","params":{"a":1},"url":"www.baidu.com1"},队列名:http.message.start 2019-07-20 23:53:14.705 INFO 65216 --- [TaskExecutor-84] c.l.l.s.c.service.HttpMessagerService : 开始发起http请求:HttpEntity(url=www.baidu.com1, params={a=1}, method=get) 2019-07-20 23:53:14.706 WARN 65216 --- [TaskExecutor-84] c.l.l.s.c.service.HttpMessagerService : http从新发送通知:www.baidu.com1, 通知队列rk为:delay.one, 原队列:http.message.start
在RabbitMQ
后台中,能够看见http.message.dlx.one
队列中存在这须要延时处理的消息,在一分钟后会转发至http.message.one
队列中。
在一分钟后,能够看见消息本再次消费了。
2019-07-20 23:54:14.722 INFO 65216 --- [TaskExecutor-16] c.l.l.s.chapter38.mq.HttpMessagerLister : 接收到通知请求:{"method":"get","params":{"a":1},"url":"www.baidu.com1"},队列名:http.message.one 2019-07-20 23:54:14.723 INFO 65216 --- [TaskExecutor-16] c.l.l.s.c.service.HttpMessagerService : 开始发起http请求:HttpEntity(url=www.baidu.com1, params={a=1}, method=get) 2019-07-20 23:54:14.723 WARN 65216 --- [TaskExecutor-16] c.l.l.s.c.service.HttpMessagerService : http通知已经通知N次失败,进入定时进行发起通知,url=www.baidu.com1
>在正式场景中,通常上补偿或者重试机制大几率是不会发送的,假若发生时,通常上是第三方业务系统出现了问题,故通常上在进行补充时,应该在非高峰期进行操做,故应该对延时监听器,应该在高峰期时中止消费,在非高峰期时进行消费。同时,还能够根据不一样的通知类型,放入不同的延时队列中,保障业务的正常。这里简单说明下,动态中止或者启动演示监听器的方式。通常上是使用RabbitListenerEndpointRegistry
对象获取延时监听器,以后进行动态中止或者启用。可设置@RabbitListener
的id属性,直接进行获取,固然也能够直接获取全部的监听器,进行自定义判断了。
@Autowired RabbitListenerEndpointRegistry registry; @GetMapping("/set") @ApiOperation(value = "set", notes = "设置消息监听器的状态") public String setSimpleMessageListenerContainer(String status) { if("1".equals(status)) { registry.getListenerContainer("httpDelayMessageNotifyConsumer").start(); } else { registry.getListenerContainer("httpDelayMessageNotifyConsumer").stop(); } return status; }
这里,只是简单进行演示说明,在真实场景下,可使用定时器,判断当前是否为高峰期,进而进行动态设置监听器的状态。
>本文主要简单介绍了基于RabbitMQ
实现延时队列的功能。对于须要实现更加灵活的配置及功能时,如可自定义配置通知次数等,你们可根据本身的需求进行添加,可使用动态建立队列的方式。固然使用延时队列的方式还有不少,好比可使用redis
的key值过时回调机制使用,也可使用定时机制。另,发现很久没有写文章了,感受写的有点乱,还望见谅呀~
>目前互联网上不少大佬都有SpringBoot
系列教程,若有雷同,请多多包涵了。原创不易,码字不易,还但愿你们多多支持。若文中有所错误之处,还望提出,谢谢。
499452441
lqdevOps
我的博客:http://blog.lqdev.cn 完整示例:基于RabbitMQ实现消息延迟队列方案 原文地址:https://blog.lqdev.cn/2019/07/21/springboot/chapter-thirty-eight/</string,></string,string>