摘要
- 延迟队列的应用场景
示例
定时消息(延迟消息)是根据延迟队列的level来的,延迟队列默认是ide
public class MessageStoreConfig { private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; }
是这18个等级(秒(s)、分(m)、小时(h)),level为1,表示延迟1秒后消费,level为5表示延迟1分钟后消费,level为17表示延迟一个小时消费。生产消息跟普通的生产消息相似,只须要在消息上设置延迟队列的level便可。消费消息跟普通的消费消息一致。源码分析
producer示例
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("localhost:9876"); producer.start(); for (int i = 0; i < 1; i++) try { { Message msg = new Message("TOPIC_TEST", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); msg.setDelayTimeLevel(5);//设置延迟队列的level,5表示延迟一分钟 SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); } } catch (Exception e) { e.printStackTrace(); } // producer.shutdown(); } }
consumer示例
public class PushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PUSH_CONSUME_GROUP"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("TOPIC_TEST", "*");// topic tag /** * 全新的消费组 才适用这些策略 * CONSUME_FROM_LAST_OFFSET //默认策略,从该队列最尾开始消费,即跳过历史消息 * CONSUME_FROM_FIRST_OFFSET //从队列最开始开始消费,即历史消息(还储存在broker的)所有消费一遍 * CONSUME_FROM_TIMESTAMP//从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时之前 */ // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // consumer.setConsumeTimestamp("20170422221800"); //时间格式 yyyyMMddHHmmss consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); for(MessageExt msg:msgs){ System.out.println("msgId:"+msg.getMsgId() + " body:" + new String(msg.getBody())); } /** * CONSUME_SUCCESS 消费成功 * RECONSUME_LATER 重试消费,重试次数能够设置,默认是16次 */ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
源码分析
参考文档
RocketMQ 定时消息和重试消息 RocketMQ 延迟消息的使用与原理分析 如何在MQ中实现支持任意延迟的消息?google