上一节消息重试里面提到了重试的消息能够被延时消费,其实除此以外,用户发送的消息也能够指定延时时间(更准确的说是延时等级),而后在指定延时时间以后投递消息,而后被consumer消费。阿里云的ons还支持定时消息,并且延时消息是直接指定延时时间,其实阿里云的延时消息也是定时消息的另外一种表述方式,都是经过设置消息被投递的时间来实现的,可是Apache RocketMQ在版本4.2.0中尚不支持指定时间的延时,只能经过配置延时等级和延时等级对应的时间来实现延时。html
一个延时消息被发出到消费成功经历如下几个过程:java
注意:批量消息是不支持延时消息的apache
tips:下文中说到的延时队列能够理解为一个ConsumeQueue
app
在producer中发送消息的时候,设置Message的delayLevelide
// org.apache.rocketmq.common.message.Message public void setDelayTimeLevel(int level) { this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level)); }
调用上面的方法设置延时等级的时候,会向message添加"DELAY"属性,后面broker处理延时消息就是依赖该属性进行特别的处理。ui
接下来发送消息的流程和正常发送消息的流程基本一致,只是会将该消息标记为延时消息类型this
// org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendKernelImpl if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { context.setMsgType(MessageType.Delay_Msg); }
broker收到延时消息和正常消息在前置的处理流程是一致的,对于延时消息的特殊处理体如今将消息写入存储(内存或文件)的时候阿里云
// org.apache.rocketmq.store.CommitLog#putMessage public PutMessageResult putMessage(final MessageExtBrokerInner msg) { // 省略中间代码... StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); // 拿到原始topic和对应的queueId String topic = msg.getTopic(); int queueId = msg.getQueueId(); final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); // 非事务消息和事务的commit消息才会进一步判断delayLevel if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // Delay Delivery if (msg.getDelayTimeLevel() > 0) { // 纠正设置过大的level,就是delayLevel设置都大于延时时间等级的最大级 if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } // 设置为延时队列的topic topic = ScheduleMessageService.SCHEDULE_TOPIC; // 每个延时等级一个queue,queueId = delayLevel - 1 queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId // 备份原始的topic和queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); // 更新properties msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); } } // 省略中间代码... }
上面的SCHEDULE_TOPIC是:线程
public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
code
这个topic是一个特殊的topic,和正常的topic不一样的地方是:
后面消息写入的过程和普通的又是一致的。
上面将消息写入延时队列中了,接下来就是处理延时队列中的消息,而后从新发送回原始topic的队列中。
在此以前先说明下至今还有疑问的一个个概念——delayLevel。这个概念和咱们接下要须要用到的的类org.apache.rocketmq.store.schedule.ScheduleMessageService有关,这个类的字段delayLevelTable里面保存了具体的延时等级
private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable = new ConcurrentHashMap<Integer, Long>(32);
看下这个字段的初始化过程
// org.apache.rocketmq.store.schedule.ScheduleMessageService#parseDelayLevel public boolean parseDelayLevel() { HashMap<String, Long> timeUnitTable = new HashMap<String, Long>(); // 每一个延时等级延时时间的单位对应的ms数 timeUnitTable.put("s", 1000L); timeUnitTable.put("m", 1000L * 60); timeUnitTable.put("h", 1000L * 60 * 60); timeUnitTable.put("d", 1000L * 60 * 60 * 24); // 延时等级在MessageStoreConfig中配置 // private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel(); try { // 根据空格将配置分隔出每一个等级 String[] levelArray = levelString.split(" "); for (int i = 0; i < levelArray.length; i++) { String value = levelArray[i]; String ch = value.substring(value.length() - 1); // 时间单位对应的ms数 Long tu = timeUnitTable.get(ch); // 延时等级从1开始 int level = i + 1; if (level > this.maxDelayLevel) { // 找出最大的延时等级 this.maxDelayLevel = level; } long num = Long.parseLong(value.substring(0, value.length() - 1)); long delayTimeMillis = tu * num; this.delayLevelTable.put(level, delayTimeMillis); // 省略部分代码... }
上面这个load方法在broker启动的时候DefaultMessageStore会调用来初始化延时等级。
接下来就应该解决怎么处理延时消息队列中的消息的问题了。处理延时消息的服务是:ScheduleMessageService。
仍是broker启动的时候DefaultMessageStore会调用org.apache.rocketmq.store.schedule.ScheduleMessageService#start来启动处理延时消息队列的服务:
public void start() { for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); Long timeDelay = entry.getValue(); // 记录队列的处理进度 Long offset = this.offsetTable.get(level); if (null == offset) { offset = 0L; } if (timeDelay != null) { // 每一个延时队列启动一个定时任务来处理该队列的延时消息 this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); } } this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { // 持久化offsetTable(保存了每一个延时队列对应的处理进度offset) ScheduleMessageService.this.persist(); } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } } }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); }
DeliverDelayedMessageTimerTask是一个TimerTask,启动之后不断处理延时队列中的消息,直到出现异常则终止该线程从新启动一个新的TimerTask
public void executeOnTimeup() { // 找到该延时等级对应的ConsumeQueue ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); // 记录异常状况下一次启动TimerTask开始处理的offset long failScheduleOffset = offset; if (cq != null) { // 找到offset所处的MappedFile中offset后面的buffer SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); if (bufferCQ != null) { try { long nextOffset = offset; int i = 0; ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { // 下面三个字段信息是ConsumeQueue物理存储的信息 long offsetPy = bufferCQ.getByteBuffer().getLong(); int sizePy = bufferCQ.getByteBuffer().getInt(); // 注意这个tagCode,再也不是普通的tag的hashCode,而是该延时消息到期的时间 long tagsCode = bufferCQ.getByteBuffer().getLong(); // 省略中间代码.... long now = System.currentTimeMillis(); // 计算应该投递该消息的时间,若是已经超时则当即投递 long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); // 计算下一个消息的开始位置,用来寻找下一个消息位置(若是有的话) nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); // 判断延时消息是否到期 long countdown = deliverTimestamp - now; if (countdown <= 0) { MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset( offsetPy, sizePy); if (msgExt != null) { try { // 将消息恢复到原始消息的格式,恢复topic、queueId、tagCode等,清除属性"DELAY" MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore .putMessage(msgInner); if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { // 投递成功,处理下一个 continue; } else { // XXX: warn and notify me log.error( "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", msgExt.getTopic(), msgExt.getMsgId()); // 投递失败,结束当前task,从新启动TimerTask,从下一个消息开始处理,也就是说当前消息丢弃 // 更新offsetTable中当前队列的offset为下一个消息的offset ScheduleMessageService.this.timer.schedule( new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } } catch (Exception e) { // 从新投递期间出现任何异常,结束当前task,从新启动TimerTask,从当前消息开始重试 /* * XXX: warn and notify me */ log.error( "ScheduleMessageService, messageTimeup execute error, drop it. msgExt=" + msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" + offsetPy + ",sizePy=" + sizePy, e); } } } else { ScheduleMessageService.this.timer.schedule( new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } } // end of for // 处理完当前MappedFile中的消息后,从新启动TimerTask,从下一个消息开始处理 // 更新offsetTable中当前队列的offset为下一个消息的offset nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask( this.delayLevel, nextOffset), DELAY_FOR_A_WHILE); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } finally { bufferCQ.release(); } } // end of if (bufferCQ != null) else { // 若是根据offsetTable中的offset没有找到对应的消息(可能被删除了),则按照当前ConsumeQueue的最小offset开始处理 long cqMinOffset = cq.getMinOffsetInQueue(); if (offset < cqMinOffset) { failScheduleOffset = cqMinOffset; log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset=" + cqMinOffset + ", queueId=" + cq.getQueueId()); } } } // end of if (cq != null) ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE); }
对于上面的tagCode作一下特别说明,延时消息的tagCode和普通消息不同:
对延时消息的tagCode的特别处理是在下面这个方法中完成的,也就是在build ConsumeQueue信息的时候
org.apache.rocketmq.store.CommitLog#checkMessageAndReturnSize(java.nio.ByteBuffer, boolean, boolean)
以上就是RocketMQ延时消息的实现方式,上面没有详说的是重试消息的延时是怎么实现的,其实就是在consumer将延时消息发送回broker的时候设置了(用户能够本身设置,若是没有本身设置默认是0)delayLevel,到了broker处理重试消息的时候若是delayLevel是0(也就是说是默认的延时等级)的时候会在原来的基础上加3,后面的处理就和上面说的延时消息同样了,存储的时候将消息投递到延时队列,等待延时到期后再从新投递到原始topic队列中等到consumer消费。