rabbitmq利用死信队列+TTL 实现延迟队列

适用场景:订单超时未支付,假若适用定时器的话,那么数据量大的话,轮询查询数据,首先IO开销大,其次任务时间要求高,扫描越频繁性能可能就越低。
延迟队列顾名思义延迟消费数据,那么先解释下延迟队列涉及的关键概念
一、消息的TTL(Time To Live)
RabbitMQ容许为消息和队列设置TTL(生存时间),若对消息设置了ttl,若是超过了ttl配置则消息死了,称之为死信.请注意,路由到多个队列的消息可能会在其所在的每一个队列中的不一样时间或根本不会消亡(不一样队列分别设置ttl)。故一个队列中的消息死亡对其余队列中相同消息的生命没有影响。对队列设置就是队列没有消费者连着的保留时间。java

1.1 对消息设置统一TTLgit

@Bean
    public Queue delayTTLQueue() {
        Map<String,Object> paramMap = new HashMap<>();
        paramMap.put("x-dead-letter-exchange",RabbitMqConfig.DELAY_EXCHANGE_NAME);
        paramMap.put("x-dead-letter-routing-key",RabbitMqConfig.DELAY_QUEUE_ROUTING_KEY);
        paramMap.put("x-message-ttl",3000);
        return new Queue(RabbitMqConfig.QUEUE_TTL_NAME,true,false,false,paramMap);
    }

1.2 对消息分别设置TTL,对比上一个少了个x-messgae-ttl参数设置github

/**
     * 延迟队列,超时时间不一致
     * @return
     */
    @Bean
    public Queue delayTTLQueue2() {
        Map<String,Object> paramMap = new HashMap<>();
        paramMap.put("x-dead-letter-exchange",RabbitMqConfig.DELAY_EXCHANGE_NAME);
        paramMap.put("x-dead-letter-routing-key",RabbitMqConfig.DELAY_QUEUE_ROUTING_KEY);
        return new Queue(RabbitMqConfig.QUEUE_TTL_NAME2,true,false,false,paramMap);
    }

 注意:生产者发送消息时,给他们设置了不一样时间的过时app

/**
     * 延迟时间不一致
     * @param message
     * @throws InterruptedException
     */
    public void sendPerQueueTTL2(Object message) throws InterruptedException {
        DelayConsumer.latch = new CountDownLatch(3);
        for (int i = 1; i <= 3; i++) {
            long expiration = i * 1000;
            rabbitTemplate.convertAndSend(RabbitMqConfig.QUEUE_TTL_EXCHANGE_NAME,RabbitMqConfig.QUEUE_TTL_ROUTING_KEY2,
                    message+" the expiration time is "+expiration,new ExpirationMessagePostProcessor(expiration));
        }
        DelayConsumer.latch.await();
    }

二、死信交换机DLX(Dead Letter Exchanges)ide

一个消息在知足以下条件下,会进死信交换机,记住这里是交换机而不是队列,一个交换机能够对应不少队列。
2.1 消息被Consumer拒收了,而且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其余消费者使用。
2.2 上面的消息的TTL到了,消息过时了。
2.3 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。
死信交换机就是普通的交换机,只是由于咱们把过时的消息扔进去,因此叫死信交换机,并非说死信交换机是某种特定的交换机性能

大体流程图测试

三、代码示例ui

3.1 queue配置code

/**
     * 实际消费队列
     * @return
     */
    @Bean
    public Queue delayQueue() {
        return new Queue(RabbitMqConfig.DELAY_PROCESS_QUEUE_NAME,true,false,false);
    }

    /**
     * 延迟队列
     * @return
     */
    @Bean
    public Queue delayTTLQueue() {
        Map<String,Object> paramMap = new HashMap<>();
        paramMap.put("x-dead-letter-exchange",RabbitMqConfig.DELAY_EXCHANGE_NAME);
        paramMap.put("x-dead-letter-routing-key",RabbitMqConfig.DELAY_QUEUE_ROUTING_KEY);
        paramMap.put("x-message-ttl",3000);
        return new Queue(RabbitMqConfig.QUEUE_TTL_NAME,true,false,false,paramMap);
    }

    /**
     * 延迟队列,超时时间不一致
     * @return
     */
    @Bean
    public Queue delayTTLQueue2() {
        Map<String,Object> paramMap = new HashMap<>();
        paramMap.put("x-dead-letter-exchange",RabbitMqConfig.DELAY_EXCHANGE_NAME);
        paramMap.put("x-dead-letter-routing-key",RabbitMqConfig.DELAY_QUEUE_ROUTING_KEY);
        return new Queue(RabbitMqConfig.QUEUE_TTL_NAME2,true,false,false,paramMap);
    }

3.2 exchange 配置blog

/**
     * 死信交换机
     * @return
     */
    @Bean
    public DirectExchange  delayExchange(){
        DirectExchange directExchange = new DirectExchange(RabbitMqConfig.DELAY_EXCHANGE_NAME,true,false);
        return directExchange;
    }

    /**
     * 延迟交换机
     * @return
     */
    @Bean
    public DirectExchange  perQueueTTLExchange(){
        DirectExchange directExchange = new DirectExchange(RabbitMqConfig.QUEUE_TTL_EXCHANGE_NAME,true,false);
        return directExchange;
    }

3.3 exchange和queue绑定关系

/**
     * 绑定死信队列
     * @return
     */
    @Bean
    public Binding dlxBinding() {
        return BindingBuilder.bind(queueConfig.delayQueue()).to(exchangeConfig.delayExchange()).with(RabbitMqConfig.DELAY_QUEUE_ROUTING_KEY);
    }

    /**
     * 绑定延迟队列1
     * @return
     */
    @Bean
    public Binding queueTTLBinding() {
        return BindingBuilder.bind(queueConfig.delayTTLQueue()).to(exchangeConfig.perQueueTTLExchange()).with(RabbitMqConfig.QUEUE_TTL_ROUTING_KEY);
    }

    /**
     * 绑定延迟队列2
     * @return
     */
    @Bean
    public Binding queueTTLBinding2() {
        return BindingBuilder.bind(queueConfig.delayTTLQueue2()).to(exchangeConfig.perQueueTTLExchange()).with(RabbitMqConfig.QUEUE_TTL_ROUTING_KEY2);
    }

3.4 生产者发送消息

public void sendPerQueueTTL(Object message) throws InterruptedException {
        DelayConsumer.latch = new CountDownLatch(3);
        for (int i = 1; i <= 3; i++) {
            rabbitTemplate.convertAndSend(RabbitMqConfig.QUEUE_TTL_EXCHANGE_NAME,RabbitMqConfig.QUEUE_TTL_ROUTING_KEY, message);
        }
        DelayConsumer.latch.await();
    }


    /**
     * 延迟时间不一致
     * @param message
     * @throws InterruptedException
     */
    public void sendPerQueueTTL2(Object message) throws InterruptedException {
        DelayConsumer.latch = new CountDownLatch(3);
        for (int i = 1; i <= 3; i++) {
            long expiration = i * 1000;
            rabbitTemplate.convertAndSend(RabbitMqConfig.QUEUE_TTL_EXCHANGE_NAME,RabbitMqConfig.QUEUE_TTL_ROUTING_KEY2,
                    message+" the expiration time is "+expiration,new ExpirationMessagePostProcessor(expiration));
        }
        DelayConsumer.latch.await();
    }

3.5 消费者消费对应消息

@Component
public class DelayConsumer implements ChannelAwareMessageListener {
    public static CountDownLatch latch;
    public static final String FAIL_MESSAGE = "This message will fail";
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            processMessage(message);
        }
        catch (Exception e) {
            // 若是发生了异常,则将该消息重定向到缓冲队列,会在必定延迟以后自动重作
            channel.basicPublish(RabbitMqConfig.QUEUE_TTL_EXCHANGE_NAME, RabbitMqConfig.QUEUE_TTL_ROUTING_KEY, null,
                    "The failed message will auto retry after a certain delay".getBytes());
        }

        if (latch != null) {
            latch.countDown();
        }
    }

    /**
     * 模拟消息处理。若是当消息内容为FAIL_MESSAGE的话,则须要抛出异常
     *
     * @param message
     * @throws Exception
     */
    public void processMessage(Message message) throws Exception {
        String realMessage = new String(message.getBody());
        System.out.println("Received <" + realMessage + ">");
        if (Objects.equals(realMessage, FAIL_MESSAGE)) {
            throw new Exception("Some exception happened");
        }
    }

}

测试结果

设置消息超时一致的测试结果如图:

设置消息超时时间一致

本文示例代码 详见 GitHub 死信队列+TTL实现延迟队列

相关文章
相关标签/搜索