RabbitMQ延迟消息发送

为何使用延迟消息?java

不一样于同步消息,有些业务场景下但愿能够实现延迟必定时间再消费消息。git

典型的场景有微信、支付宝等第三方支付回调接口,会在用户支付后3秒、5秒、30秒等等时间后向应用服务器发送回调请求,确保应用服务器能够正确收到消息。数据库

那有些朋友就会说了,把须要定时处理的数据存到数据库中用定时任务就能够实现,为何还弄个异步消息。增长后台维护成本。服务器

使用定时任务固然没有问题能够实现该问题。在小数据量状况下没有问题。但当数据量交大的时候怎么办?若是每一个任务的延迟时间不一样怎么办?微信

其余方式实现消息队列数据结构

名称 实现方式 详细说明
Redis 使用zset数据结构 使用zset的score属性存放执行时间戳,起一个死循环的线程不断的取第一个Key值,若是当前时间戳大于该Key的socre 值时将它取出来消费,注意不须要遍历整个Zset集合,以避免形成性能浪费
定时任务 给定周期扫描待处理消息 使用该方式间隔时间很差控制,给短会形成无心义的扫描,增长数据库压力,给长了偏差较大
定时任务 动态建立惟一性定时任务 一次性的任务会增长数据库存储,须要定时清理,如相差时间较近的任务较多,也会形成性能较差
时间轮 自定义 自定义一个时间轮的数据结构,启动一个后台线程,延迟一秒,获取时间轮中的任务启动子线程独立执行时间轮的任务

如何选择消息中间件?app

中间件 是否原生支持 说明
RocketMQ 支持 不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息。预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h
RabbitMQ 不支持 可以使用消息的TTL和死信Exchange实现
Kafka 不支持 可以使用TimingWheel 实现
AcitveMQ 支持

因本身在使用RabbitMQ作为消息中间件,因此直接选用了RabbitMQ来实现。dom

实现以前异步

在实现以前咱们先须要知道RabbitMQ如下两个概念。性能

  • TTL(Time To Live)消息过时时间。

消息若是在队列中一直没有被消费而且存在时间超过了TTL,消息就会变成了"死信" (Dead Message),后续没法再被消费了。

  • DLX(Dead-Letter-Exchange)死信交换器。

它的做用实际上是用来接收死信消息(dead message)的。

  1. 消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false
  2. 消息过时
  3. 队列达到最大长度

由于消息若是未被正常消费并设置了requeue为false时会进入死信队列,咱们能够监控消费死信队列中消息,来观察和分析系统的问题。

RabbitMQ能够从两种维度设置消息过时时间,分别是队列和消息自己。两种方式哪一个时间小先执行哪一个。

实现思路

想到有两种实现方式和效果。甚至能够结合使用。

第一种:设定固定几个延迟时间(像RocketMQ中间件)

固定延迟消息队列

第二种:实现自定义任意时间延迟

自定义任意时间延迟

以上两种方式各有优缺点,我本身实现的是第二种,下面详细说明

图中后半段死信路由与应用消费基本相同,只要在消费端绑将一个正常队列与死信路由绑定就行。

/**
 * @Author: maomao
 * @Date: 2019-09-04 18:34
 */
@Slf4j
@Component
public class FreeCloudMQConsume {
    @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "free.cloud.out.mq",durable = "true"),
                                 exchange = @Exchange(value = "free.cloud.die.exchange",type = ExchangeTypes.TOPIC),
                                 key = "free.cloud.out.mq.dead.message.#")})
    public void print(String message){
        log.info("print 5 ---- > {}",message);
    }
}

调用方发送消息

/**
     * 建立延迟队列,会随指定延迟时间+5秒后删除队列
     * @param queueName
     * @param delayMillis
     * @return
     */
    private static Queue createDelayQueue(String queueName, Integer delayMillis) {
        /**
         * 队列名称  //死信时间 ,死信从新投递的交换机 ,路由到队列的routingKey
         */
        String time = String.valueOf(System.currentTimeMillis());
        String delayQueueName = queueName + ".delay_" + delayMillis + "_" + time;
        return QueueBuilder.durable(delayQueueName)
                //设置消息失效时间
                .withArgument("x-message-ttl",delayMillis * 1000)
                //设置队列自动删除时间 ,比消息延迟时间多5秒
                .withArgument("x-expires", (delayMillis + 5) * 1000)
                //设置死信路由
                .withArgument("x-dead-letter-exchange", "free.cloud.die.exchange")
                //设置死信路由routingKey
                .withArgument("x-dead-letter-routing-key", queueName + ".dead.message." + time)
                .build();
    }

    /**
     * 发送延迟消息
     * @param queueName
     * @param message
     * @param delayMillis
     */
    public static void sendDelayMessage(String queueName,Object message,Integer delayMillis){
        //死信消息队列(动态建立,会销毁)
        Queue delayQueue = createDelayQueue(queueName, delayMillis);
        //建立队列
        addQueue(delayQueue);
        //延迟消息路由Key
        StringBuilder delayRoutingKey = new StringBuilder(queueName + ".delay");
        delayRoutingKey.append(".").append(message.hashCode() + "_" + RandomUtil.randomString(5));
        //绑定延迟路由
        RabbitMqUtil.addBinding(delayQueue,delayExchange,delayRoutingKey.toString());
        getRabbitTemplate().convertAndSend("free.cloud.delay.exchange",delayRoutingKey.toString(),message);
    }

以上是自定义延迟消息的关键实现代码,完整代码能够 点击这里 获取

效果

相关文章
相关标签/搜索