说在前面apache
管理请求 CLEAN_EXPIRED_CONSUMEQUEUE 清除过时的消费队列微信
源码解析并发
进入这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#cleanExpiredConsumeQueueapp
public RemotingCommand cleanExpiredConsumeQueue() { log.warn("invoke cleanExpiredConsumeQueue start."); final RemotingCommand response = RemotingCommand.createResponseCommand(null); // =》 brokerController.getMessageStore().cleanExpiredConsumerQueue(); log.warn("invoke cleanExpiredConsumeQueue end."); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
进入这个方法org.apache.rocketmq.store.DefaultMessageStore#cleanExpiredConsumerQueue清除过时的消费队列性能
public void cleanExpiredConsumerQueue() { // 从commitLog中获取最小的offset=》 long minCommitLogOffset = this.commitLog.getMinOffset(); Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, ConcurrentMap<Integer, ConsumeQueue>> next = it.next(); String topic = next.getKey(); if (!topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) { ConcurrentMap<Integer, ConsumeQueue> queueTable = next.getValue(); Iterator<Entry<Integer, ConsumeQueue>> itQT = queueTable.entrySet().iterator(); while (itQT.hasNext()) { Entry<Integer, ConsumeQueue> nextQT = itQT.next(); // 获取消息队列的lastOffset=》 long maxCLOffsetInConsumeQueue = nextQT.getValue().getLastOffset(); if (maxCLOffsetInConsumeQueue == -1) { log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.", nextQT.getValue().getTopic(), nextQT.getValue().getQueueId(), nextQT.getValue().getMaxPhysicOffset(), nextQT.getValue().getMinLogicOffset()); // 正常状况下应该等于,若是是小于说明有的消息队列已过时 } else if (maxCLOffsetInConsumeQueue < minCommitLogOffset) { log.info( "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}", topic, nextQT.getKey(), minCommitLogOffset, maxCLOffsetInConsumeQueue); // 按topic和queueId删除topic和queue的offset=》 DefaultMessageStore.this.commitLog.removeQueueFromTopicQueueTable(nextQT.getValue().getTopic(), nextQT.getValue().getQueueId()); nextQT.getValue().destroy(); // 消息队列销毁了,消息队列所在元素删除 itQT.remove(); } } if (queueTable.isEmpty()) { log.info("cleanExpiredConsumerQueue: {},topic destroyed", topic); // topic销毁了,消息队列所在的元素删除 it.remove(); } } } }
进入这个方法org.apache.rocketmq.store.CommitLog#getMinOffset获取最小的offsetthis
public long getMinOffset() { // 获取第一个映射文件=》 MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile(); if (mappedFile != null) { if (mappedFile.isAvailable()) { // 获取映射文件的起始偏移量 return mappedFile.getFileFromOffset(); } else { // 获取下个文件的起始偏移量=》 return this.rollNextFile(mappedFile.getFileFromOffset()); } } return -1; }
进入到这个方法org.apache.rocketmq.store.MappedFileQueue#getFirstMappedFile获取映射队列中第一个映射文件3d
public MappedFile getFirstMappedFile() { MappedFile mappedFileFirst = null; if (!this.mappedFiles.isEmpty()) { try { mappedFileFirst = this.mappedFiles.get(0); } catch (IndexOutOfBoundsException e) { //ignore } catch (Exception e) { log.error("getFirstMappedFile has exception.", e); } } return mappedFileFirst; }
进入到这个方法org.apache.rocketmq.store.CommitLog#rollNextFile获取到下个映射文件的起始偏移量blog
public long rollNextFile(final long offset) { int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); return offset + mappedFileSize - offset % mappedFileSize; }
往上返回到这个方法org.apache.rocketmq.store.ConsumeQueue#getLastOffset获取最后的offset队列
public long getLastOffset() { long lastOffset = -1; int logicFileSize = this.mappedFileSize; // 获取映射文件队列中最后一个映射文件=》 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); if (mappedFile != null) { // 有点疑问,这里是获取的commitOffset吗 int position = mappedFile.getWrotePosition() - CQ_STORE_UNIT_SIZE; if (position < 0) position = 0; ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); byteBuffer.position(position); for (int i = 0; i < logicFileSize; i += CQ_STORE_UNIT_SIZE) { long offset = byteBuffer.getLong(); int size = byteBuffer.getInt(); byteBuffer.getLong(); if (offset >= 0 && size > 0) { lastOffset = offset + size; } else { break; } } } return lastOffset; }
进入到这个方法org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile()获取映射队列中最后的映射文件rem
public MappedFile getLastMappedFile() { MappedFile mappedFileLast = null; while (!this.mappedFiles.isEmpty()) { try { mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1); break; } catch (IndexOutOfBoundsException e) { //continue; } catch (Exception e) { log.error("getLastMappedFile has exception.", e); break; } } return mappedFileLast; }
往上返回到这个方法org.apache.rocketmq.store.CommitLog#removeQueueFromTopicQueueTable按topic、queueId删除消费队列
public void removeQueueFromTopicQueueTable(final String topic, final int queueId) { String key = topic + "-" + queueId; // 这个地方同步用synchronized有点疑问,若是并发量比较小,synchronized性能也能够的,可是并发量达到必定量级lock或者其余无锁实现 // 应该会好一点,难道消息队列过时这种状况出现过时未消费的几率较低 synchronized (this) { this.topicQueueTable.remove(key); } log.info("removeQueueFromTopicQueueTable OK Topic: {} QueueId: {}", topic, queueId); }
往上返回到这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#cleanExpiredConsumeQueue结束
说在最后
本次解析仅表明我的观点,仅供参考。
加入技术微信群
钉钉技术群