rocketmq源码解析之管理请求清除过时的消费队列

说在前面apache

管理请求 CLEAN_EXPIRED_CONSUMEQUEUE 清除过时的消费队列微信

 

源码解析并发

进入这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#cleanExpiredConsumeQueueapp

public RemotingCommand cleanExpiredConsumeQueue() {
        log.warn("invoke cleanExpiredConsumeQueue start.");
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
//        =》
        brokerController.getMessageStore().cleanExpiredConsumerQueue();
        log.warn("invoke cleanExpiredConsumeQueue end.");
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

进入这个方法org.apache.rocketmq.store.DefaultMessageStore#cleanExpiredConsumerQueue清除过时的消费队列性能

public void cleanExpiredConsumerQueue() {
//        从commitLog中获取最小的offset=》
        long minCommitLogOffset = this.commitLog.getMinOffset();
        Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, ConcurrentMap<Integer, ConsumeQueue>> next = it.next();
            String topic = next.getKey();
            if (!topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) {
                ConcurrentMap<Integer, ConsumeQueue> queueTable = next.getValue();
                Iterator<Entry<Integer, ConsumeQueue>> itQT = queueTable.entrySet().iterator();
                while (itQT.hasNext()) {
                    Entry<Integer, ConsumeQueue> nextQT = itQT.next();
//                    获取消息队列的lastOffset=》
                    long maxCLOffsetInConsumeQueue = nextQT.getValue().getLastOffset();
                    if (maxCLOffsetInConsumeQueue == -1) {
                        log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.",
                            nextQT.getValue().getTopic(),
                            nextQT.getValue().getQueueId(),
                            nextQT.getValue().getMaxPhysicOffset(),
                            nextQT.getValue().getMinLogicOffset());
//                        正常状况下应该等于,若是是小于说明有的消息队列已过时
                    } else if (maxCLOffsetInConsumeQueue < minCommitLogOffset) {
                        log.info(
                            "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}",
                            topic,
                            nextQT.getKey(),
                            minCommitLogOffset,
                            maxCLOffsetInConsumeQueue);
//                        按topic和queueId删除topic和queue的offset=》
                        DefaultMessageStore.this.commitLog.removeQueueFromTopicQueueTable(nextQT.getValue().getTopic(),
                            nextQT.getValue().getQueueId());
                        nextQT.getValue().destroy();
//                      消息队列销毁了,消息队列所在元素删除
                        itQT.remove();
                    }
                }

                if (queueTable.isEmpty()) {
                    log.info("cleanExpiredConsumerQueue: {},topic destroyed", topic);
//                  topic销毁了,消息队列所在的元素删除
                    it.remove();
                }
            }
        }
    }

进入这个方法org.apache.rocketmq.store.CommitLog#getMinOffset获取最小的offsetthis

public long getMinOffset() {
//        获取第一个映射文件=》
        MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();
        if (mappedFile != null) {
            if (mappedFile.isAvailable()) {
//                获取映射文件的起始偏移量
                return mappedFile.getFileFromOffset();
            } else {
//                获取下个文件的起始偏移量=》
                return this.rollNextFile(mappedFile.getFileFromOffset());
            }
        }

        return -1;
    }

进入到这个方法org.apache.rocketmq.store.MappedFileQueue#getFirstMappedFile获取映射队列中第一个映射文件3d

public MappedFile getFirstMappedFile() {
    MappedFile mappedFileFirst = null;
    if (!this.mappedFiles.isEmpty()) {
        try {
            mappedFileFirst = this.mappedFiles.get(0);
        } catch (IndexOutOfBoundsException e) {
            //ignore
        } catch (Exception e) {
            log.error("getFirstMappedFile has exception.", e);
        }
    }

    return mappedFileFirst;
}

进入到这个方法org.apache.rocketmq.store.CommitLog#rollNextFile获取到下个映射文件的起始偏移量blog

public long rollNextFile(final long offset) {
    int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
    return offset + mappedFileSize - offset % mappedFileSize;
}

往上返回到这个方法org.apache.rocketmq.store.ConsumeQueue#getLastOffset获取最后的offset队列

public long getLastOffset() {
        long lastOffset = -1;
        int logicFileSize = this.mappedFileSize;
//        获取映射文件队列中最后一个映射文件=》
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
        if (mappedFile != null) {

//            有点疑问,这里是获取的commitOffset吗
            int position = mappedFile.getWrotePosition() - CQ_STORE_UNIT_SIZE;
            if (position < 0)
                position = 0;
            ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
            byteBuffer.position(position);
            for (int i = 0; i < logicFileSize; i += CQ_STORE_UNIT_SIZE) {
                long offset = byteBuffer.getLong();
                int size = byteBuffer.getInt();
                byteBuffer.getLong();
                if (offset >= 0 && size > 0) {
                    lastOffset = offset + size;
                } else {
                    break;
                }
            }
        }

        return lastOffset;
    }

进入到这个方法org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile()获取映射队列中最后的映射文件rem

public MappedFile getLastMappedFile() {
    MappedFile mappedFileLast = null;
    while (!this.mappedFiles.isEmpty()) {
        try {
            mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
            break;
        } catch (IndexOutOfBoundsException e) {
            //continue;
        } catch (Exception e) {
            log.error("getLastMappedFile has exception.", e);
            break;
        }
    }

    return mappedFileLast;
}

往上返回到这个方法org.apache.rocketmq.store.CommitLog#removeQueueFromTopicQueueTable按topic、queueId删除消费队列

public void removeQueueFromTopicQueueTable(final String topic, final int queueId) {
        String key = topic + "-" + queueId;
//        这个地方同步用synchronized有点疑问,若是并发量比较小,synchronized性能也能够的,可是并发量达到必定量级lock或者其余无锁实现
//        应该会好一点,难道消息队列过时这种状况出现过时未消费的几率较低
        synchronized (this) {
            this.topicQueueTable.remove(key);
        }

        log.info("removeQueueFromTopicQueueTable OK Topic: {} QueueId: {}", topic, queueId);
    }

往上返回到这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#cleanExpiredConsumeQueue结束

 

说在最后

本次解析仅表明我的观点,仅供参考。

 

加入技术微信群

钉钉技术群

相关文章
相关标签/搜索