为何使用延迟消息?java
不一样于同步消息,有些业务场景下但愿能够实现延迟必定时间再消费消息。git
典型的场景有微信、支付宝等第三方支付回调接口,会在用户支付后3秒、5秒、30秒等等时间后向应用服务器发送回调请求,确保应用服务器能够正确收到消息。数据库
那有些朋友就会说了,把须要定时处理的数据存到数据库中用定时任务就能够实现,为何还弄个异步消息。增长后台维护成本。服务器
使用定时任务固然没有问题能够实现该问题。在小数据量状况下没有问题。但当数据量交大的时候怎么办?若是每一个任务的延迟时间不一样怎么办?微信
其余方式实现消息队列数据结构
名称 | 实现方式 | 详细说明 |
---|---|---|
Redis | 使用zset数据结构 | 使用zset的score属性存放执行时间戳,起一个死循环的线程不断的取第一个Key值,若是当前时间戳大于该Key的socre 值时将它取出来消费,注意不须要遍历整个Zset集合,以避免形成性能浪费 |
定时任务 | 给定周期扫描待处理消息 | 使用该方式间隔时间很差控制,给短会形成无心义的扫描,增长数据库压力,给长了偏差较大 |
定时任务 | 动态建立惟一性定时任务 | 一次性的任务会增长数据库存储,须要定时清理,如相差时间较近的任务较多,也会形成性能较差 |
时间轮 | 自定义 | 自定义一个时间轮的数据结构,启动一个后台线程,延迟一秒,获取时间轮中的任务启动子线程独立执行时间轮的任务 |
如何选择消息中间件?app
中间件 | 是否原生支持 | 说明 |
---|---|---|
RocketMQ | 支持 | 不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息。预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h |
RabbitMQ | 不支持 | 可以使用消息的TTL和死信Exchange实现 |
Kafka | 不支持 | 可以使用TimingWheel 实现 |
AcitveMQ | 支持 |
因本身在使用RabbitMQ作为消息中间件,因此直接选用了RabbitMQ来实现。dom
实现以前异步
在实现以前咱们先须要知道RabbitMQ如下两个概念。性能
消息若是在队列中一直没有被消费而且存在时间超过了TTL,消息就会变成了"死信" (Dead Message),后续没法再被消费了。
它的做用实际上是用来接收死信消息(dead message)的。
由于消息若是未被正常消费并设置了requeue为false时会进入死信队列,咱们能够监控消费死信队列中消息,来观察和分析系统的问题。
RabbitMQ能够从两种维度设置消息过时时间,分别是队列和消息自己。两种方式哪一个时间小先执行哪一个。
实现思路
想到有两种实现方式和效果。甚至能够结合使用。
第一种:设定固定几个延迟时间(像RocketMQ中间件)
第二种:实现自定义任意时间延迟
以上两种方式各有优缺点,我本身实现的是第二种,下面详细说明
图中后半段死信路由与应用消费基本相同,只要在消费端绑将一个正常队列与死信路由绑定就行。
/** * @Author: maomao * @Date: 2019-09-04 18:34 */ @Slf4j @Component public class FreeCloudMQConsume { @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "free.cloud.out.mq",durable = "true"), exchange = @Exchange(value = "free.cloud.die.exchange",type = ExchangeTypes.TOPIC), key = "free.cloud.out.mq.dead.message.#")}) public void print(String message){ log.info("print 5 ---- > {}",message); } }
调用方发送消息
/** * 建立延迟队列,会随指定延迟时间+5秒后删除队列 * @param queueName * @param delayMillis * @return */ private static Queue createDelayQueue(String queueName, Integer delayMillis) { /** * 队列名称 //死信时间 ,死信从新投递的交换机 ,路由到队列的routingKey */ String time = String.valueOf(System.currentTimeMillis()); String delayQueueName = queueName + ".delay_" + delayMillis + "_" + time; return QueueBuilder.durable(delayQueueName) //设置消息失效时间 .withArgument("x-message-ttl",delayMillis * 1000) //设置队列自动删除时间 ,比消息延迟时间多5秒 .withArgument("x-expires", (delayMillis + 5) * 1000) //设置死信路由 .withArgument("x-dead-letter-exchange", "free.cloud.die.exchange") //设置死信路由routingKey .withArgument("x-dead-letter-routing-key", queueName + ".dead.message." + time) .build(); } /** * 发送延迟消息 * @param queueName * @param message * @param delayMillis */ public static void sendDelayMessage(String queueName,Object message,Integer delayMillis){ //死信消息队列(动态建立,会销毁) Queue delayQueue = createDelayQueue(queueName, delayMillis); //建立队列 addQueue(delayQueue); //延迟消息路由Key StringBuilder delayRoutingKey = new StringBuilder(queueName + ".delay"); delayRoutingKey.append(".").append(message.hashCode() + "_" + RandomUtil.randomString(5)); //绑定延迟路由 RabbitMqUtil.addBinding(delayQueue,delayExchange,delayRoutingKey.toString()); getRabbitTemplate().convertAndSend("free.cloud.delay.exchange",delayRoutingKey.toString(),message); }
以上是自定义延迟消息的关键实现代码,完整代码能够 点击这里 获取
效果