ROCKETMQ——定时消息(延迟消息)

摘要

  1. 延迟队列的应用场景

示例

定时消息(延迟消息)是根据延迟队列的level来的,延迟队列默认是ide

public class MessageStoreConfig {
    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
}

是这18个等级(秒(s)、分(m)、小时(h)),level为1,表示延迟1秒后消费,level为5表示延迟1分钟后消费,level为17表示延迟一个小时消费。生产消息跟普通的生产消息相似,只须要在消息上设置延迟队列的level便可。消费消息跟普通的消费消息一致。源码分析

producer示例

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        for (int i = 0; i < 1; i++)
            try {
                {
                    Message msg = new Message("TOPIC_TEST",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    msg.setDelayTimeLevel(5);//设置延迟队列的level,5表示延迟一分钟
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                    System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

//        producer.shutdown();
    }
}

consumer示例

public class PushConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PUSH_CONSUME_GROUP");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("TOPIC_TEST", "*");// topic  tag
        /**
         * 全新的消费组 才适用这些策略
         * CONSUME_FROM_LAST_OFFSET //默认策略,从该队列最尾开始消费,即跳过历史消息
		 * CONSUME_FROM_FIRST_OFFSET //从队列最开始开始消费,即历史消息(还储存在broker的)所有消费一遍
         * CONSUME_FROM_TIMESTAMP//从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时之前
         */
//        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//        consumer.setConsumeTimestamp("20170422221800"); //时间格式 yyyyMMddHHmmss
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
                for(MessageExt msg:msgs){
                	System.out.println("msgId:"+msg.getMsgId() + " body:" + new String(msg.getBody()));
                }
                /**
                 * CONSUME_SUCCESS 消费成功
                 * RECONSUME_LATER 重试消费,重试次数能够设置,默认是16次
                 */
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

源码分析

参考文档

RocketMQ 定时消息和重试消息 RocketMQ 延迟消息的使用与原理分析 如何在MQ中实现支持任意延迟的消息?google

转载于:https://my.oschina.net/liangxiao/blog/3014971.net