实战|固然我仍是更建议你用MQ搞定超时订单的-(2)

1、依然用三根鸡毛作引言

  • 真的! 不骗大家的喔~ 相信你们都遇到相似于:订单30min后未支付自动取消的开发任务

2、MQ 延迟消息实现原理

傲娇的RabbitMQ官网赫然写着:git

RabbitMQ is the most widely deployed open source message broker.
复制代码

因而可知,RabbitMQ是一个消息中间件,生产者生成消息,消费者消费消息,它遵循AMQP(高级消息队列协议),是最普遍部署的开源消息代理。 因此,今天我用RabbitMQ为你们捣鼓一下延迟队列。程序员

使用RabbitMQ来实现延迟任务必须先了解RabbitMQ的两个概念:消息的TTL和死信Exchange,经过这二者的组合来实现上述需求。github

  • 消息的TTL(Time To Live)

消息的TTL就是消息的存活时间。RabbitMQ 能够对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也能够对每个单独的消息作单独的设置。超过了这个时间,咱们认为这个消息就死了,称之为死信。若是队列设置了,消息也设置了,那么会取小的(谁小谁尴尬)。因此一个消息若是被路由到不一样的队列中,这个消息死亡的时间有可能不同(不一样的队列设置)。这里单讲单个消息的TTL,由于它才是实现延迟任务的关键。windows

那么,如何设置这个TTL值呢?有两种方式,第一种是在建立队列的时候设置队列的"x-message-ttl"属性,以下:api

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
复制代码

这样全部被投递到该队列的消息都最多不会存活超过6s。bash

另外一种方式即是针对每条消息设置TTL,代码以下:app

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());
复制代码

这样这条消息的过时时间也被设置成了6s。异步

但这两种方式是有区别的,若是设置了队列的TTL属性,那么一旦消息过时,就会被队列丢弃,而第二种方式,消息即便过时,也不必定会被立刻丢弃,由于消息是否过时是在即将投递到消费者以前断定的,若是当前队列有严重的消息积压状况,则已过时的消息也许还能存活较长时间。 另外,还须要注意的一点是,若是不设置TTL,表示消息永远不会过时,若是将TTL设置为0,则表示除非此时能够直接投递该消息到消费者,不然该消息将会被丢弃。

单靠死信还不能实现延迟任务,还要靠Dead Letter Exchange

  • Dead Letter Exchanges

Exchage的概念在这里就不在赘述。一个消息在知足以下条件下,会进死信路由,记住这里是路由而不是队列,一个路由能够对应不少队列。

  1. 一个消息被Consumer拒收了,而且reject方法的参数里requeuefalse。也就是说不会被再次放在队列里,被其余消费者使用。
  2. 上面的消息的TTL到了,消息就过时了。
  3. 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。

Dead Letter Exchange其实就是一种普通的exchange,和建立其余exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过时了,会自动触发消息的转发,发送到Dead Letter Exchange中去。

  • 原理图

延迟任务经过消息的TTLDead Letter Exchange来实现。咱们须要创建2个队列,一个用于发送消息,一个用于消息过时后的转发目标队列。

生产者生产一条延时消息,根据须要延时时间的不一样,利用不一样的routingkey将消息路由到不一样的延时队列,每一个队列都设置了不一样的TTL属性,并绑定在同一个死信交换机中,消息过时后,根据routingkey的不一样,又会被路由到不一样的死信队列中,消费者只须要监听对应的死信队列进行处理便可。

3、实战演练

  • 下载安装 windows示例
  1. 下载RabbitMQ,须要ErLang环境的支持
  2. 运行命令
rabbitmq-plugins enable rabbitmq_management
复制代码

开启Web管理插件,而后启动rabbitmq-server访问http://localhost:15672/#/,输入密令后你能看到就能够啦.

  • 插件安装

在 RabbitMQ 3.6.x 以前咱们通常采用死信队列(DLX)+TTL过时时间来实现延迟队列,咱们这里不作过多介绍,能够参考其余道友的:TTL+DLX实现方式。

在 RabbitMQ 3.6.x开始(如今都3.8.+了),RabbitMQ 官方提供了延迟队列的插件,能够下载放置到 RabbitMQ 根目录下的 plugins 下。延迟队列插件下载地址:

  1. 官方地址 2. JFrog Bintray地址 我安装的时候在官网没找到3.7.x的,可是3.8.0是向下兼容3.7.x的,而后我又在Bintray找到了3.7.x,你们信不过就找对应的版本插件哈....

下载好,放到 plugins的目录中,运行以下命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
复制代码
  • 搭建SpringBoot环境
  1. yml配置以下
#集成 rabbitmq
 rabbitmq:
 host: localhost
 port: 5672
 username: guest
 password: guest
 virtual-host: /
 connection-timeout: 150000
 publisher-confirms: true    #开启确认机制 采用消息确认模式,
 publisher-returns: true     #开启return确认机制
 template:                   #消息发出去后,异步等待响应
 mandatory: true           #设置为 true 后,消费者在消息没有被路由到合适队列状况下会被return监听,而不会自动删除
复制代码
  1. 启动配置声明几个Bean
@Configuration
public class MQConfig {
    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }

    public static final String DELAY_EXCHANGE = "Ex.DelayExchange";
    public static final String DELAY_QUEUE = "MQ.DelayQueue";
    public static final String DELAY_KEY = "delay.#";

    /** * 延时交换机 * * @return TopicExchange */
    @Bean
    public TopicExchange delayExchange() {
        Map<String, Object> pros = new HashMap<>(3);
        //设置交换机支持延迟消息推送
        pros.put("x-delayed-message", "topic");
        TopicExchange exchange = new TopicExchange(DELAY_EXCHANGE, true, false, pros);
        //咱们在也能够在 Exchange 的声明中能够设置exchange.setDelayed(true)来开启延迟队列
        exchange.setDelayed(true);
        return exchange;
    }

    /** * 延时队列 * * @return Queue */
    @Bean
    public Queue delayQueue() {
        return new Queue(DELAY_QUEUE, true);
    }

    /** * 绑定队列和交换机,以及设定路由规则key * * @return Binding */
    @Bean
    public Binding delayBinding() {
        return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(DELAY_KEY);
    }
}
复制代码
  1. 建立一个生产者
/** * @author LiJing * @ClassName: MQSender * @Description: MQ发送 生产者 * @date 2019/10/9 11:50 */
@Component
public class MQSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.out.println("correlationData: " + correlationData);
            System.out.println("ack: " + ack);
            if (!ack) {
                System.out.println("异常处理....");
            }
        }
    };

    final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange , String routingKey) {
            System.out.println("return exchange: " + exchange + ", routingKey: "
                    + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
        }
    };

    public void sendDelay(Object message, int delayTime) {
        //采用消息确认模式,消息发出去后,异步等待响应
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);
        //id + 时间戳 全局惟一
        CorrelationData correlationData = new CorrelationData("delay" + System.nanoTime());
        //发送消息时指定 header 延迟时间
        rabbitTemplate.convertAndSend(MQConfig.DELAY_EXCHANGE, "delay.boot", message,
                new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        //设置消息持久化
                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        // 两种方式 都可
                        //message.getMessageProperties().setHeader("x-delay", "6000");
                        message.getMessageProperties().setDelay(delayTime);
                        return message;
                    }
                }, correlationData);
    }
}
复制代码
  1. 建立一个消费者
/** * @author LiJing * @ClassName: MQReceiver * @Description: 消费者 * @date 2019/10/9 11:51 */
@Component
@Slf4j
public class MQReceiver {
    @RabbitListener(queues = MQConfig.DELAY_QUEUE)
    @RabbitHandler
    public void onDelayMessage(Message msg, Channel channel) throws IOException {
        long deliveryTag = msg.getMessageProperties().getDeliveryTag();
        channel.basicAck(deliveryTag, true);
        System.out.println("延迟队列在" + LocalDateTime.now()+"时间," + "延迟后收到消息:" + new String(msg.getBody()));
    }
}
复制代码

5.建立一个mq的测试控制器

@RestController
@RequestMapping("/mq")
public class MqController extends AbstractController {

    @Autowired
    private MQSender mqSender;

    @GetMapping(value = "/send/delay")
    public void sendDelay(int delayTime) {
        String msg = "hello delay";
        System.out.println("发送开始时间:" + LocalDateTime.now() + "测试发送delay消息====>" + msg);
        mqSender.sendDelay(msg, delayTime);
    }
}
复制代码
  1. 启动,测试一把
http://localhost:8080/api/mq/send/delay?delayTime=6000
 http://localhost:8080/api/mq/send/delay?delayTime=10000
复制代码

果真,名不虚传..... 意思就是:你已经成功引发了个人注意...小小的演练,你们有收获就点个爱心

4、小结来了

延时队列在须要延时处理的场景下很是有用,使用RabbitMQ来实现延时队列,能够很好的利用RabbitMQ的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。

另外,经过RabbitMQ集群的特性,能够很好的解决单点故障问题,不会由于单个节点挂掉致使延时队列不可用或者消息丢失。

固然,延时队列还有不少其它选择,好比利用Redis的zset,Quartz或者利用kafka的时间轮,这些方式各有特色,但就像炉石传说通常,这些知识就比如手里的卡牌,知道的越多,能够用的卡牌也就越多,遇到问题便能游刃有余,因此须要大量的知识储备和经验积累才能打造出更出色的卡牌组合,让本身解决问题的能力获得更好的提高。

5、结束语

肥朝告诉我说:闻道有前后,术业有专攻,达者为师。

那今日份的讲解就到此结束,具体的代码请移步个人gitHub的mybot项目888分支查阅,fork体验一把,或者评论区留言探讨,写的很差,请多多指教~~

相关文章
相关标签/搜索