rocketmq源码解析之管理请求之查询broker全部消费组状态

说在前面java

管理请求之查询broker全部消费组状态apache

 

源码解析缓存

进入这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#fetchAllConsumeStatsInBroker微信









private RemotingCommand fetchAllConsumeStatsInBroker(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);GetConsumeStatsInBrokerHeader requestHeader =(GetConsumeStatsInBrokerHeader) request.decodeCommandCustomHeader(GetConsumeStatsInBrokerHeader.class);boolean isOrder = requestHeader.isOrder();//        获取消费组订阅信息ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroups =brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable();List<Map<String/* subscriptionGroupName */, List<ConsumeStats>>> brokerConsumeStatsList =new ArrayList<Map<String, List<ConsumeStats>>>();long totalDiff = 0L;for (String group : subscriptionGroups.keySet()) {Map<String, List<ConsumeStats>> subscripTopicConsumeMap = new HashMap<String, List<ConsumeStats>>();//            消费组订阅了哪些topicSet<String> topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(group);List<ConsumeStats> consumeStatsList = new ArrayList<ConsumeStats>();for (String topic : topics) {ConsumeStats consumeStats = new ConsumeStats();//                查询topic的配置信息TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);if (null == topicConfig) {log.warn("consumeStats, topic config not exist, {}", topic);continue;}if (isOrder && !topicConfig.isOrder()) {continue;}{//                    按消费组和topic查询订阅信息=》SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(group, topic);if (null == findSubscriptionData&& this.brokerController.getConsumerManager().findSubscriptionDataCount(group) > 0) {log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", group, topic);continue;}}//                topic写队列的数量默认16for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {MessageQueue mq = new MessageQueue();mq.setTopic(topic);mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());mq.setQueueId(i);OffsetWrapper offsetWrapper = new OffsetWrapper();//                    获取队列最大的offset=》long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);if (brokerOffset < 0)brokerOffset = 0;//                    查询消费者的offset=》long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(group,topic,i);if (consumerOffset < 0)consumerOffset = 0;offsetWrapper.setBrokerOffset(brokerOffset);offsetWrapper.setConsumerOffset(consumerOffset);long timeOffset = consumerOffset - 1;if (timeOffset >= 0) {//                        按topic、queueId、timeOffset查询最后的时间 =》long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset);if (lastTimestamp > 0) {offsetWrapper.setLastTimestamp(lastTimestamp);}}consumeStats.getOffsetTable().put(mq, offsetWrapper);}//                按消费组和topic获取tps=》double consumeTps = this.brokerController.getBrokerStatsManager().tpsGroupGetNums(group, topic);consumeTps += consumeStats.getConsumeTps();consumeStats.setConsumeTps(consumeTps);totalDiff += consumeStats.computeTotalDiff();consumeStatsList.add(consumeStats);}subscripTopicConsumeMap.put(group, consumeStatsList);brokerConsumeStatsList.add(subscripTopicConsumeMap);}ConsumeStatsList consumeStats = new ConsumeStatsList();consumeStats.setBrokerAddr(brokerController.getBrokerAddr());consumeStats.setConsumeStatsList(brokerConsumeStatsList);consumeStats.setTotalDiff(totalDiff);response.setBody(consumeStats.encode());response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}

 

进入这个方法,消费组订阅了哪些topic,org.apache.rocketmq.broker.offset.ConsumerOffsetManager#whichTopicByConsumerapp

 

 


public Set<String> whichTopicByConsumer(final String group) {Set<String> topics = new HashSet<String>();//        遍历消费者组、topic缓存信息Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();while (it.hasNext()) {Entry<String, ConcurrentMap<Integer, Long>> next = it.next();String topicAtGroup = next.getKey();String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);if (arrays.length == 2) {if (group.equals(arrays[1])) {topics.add(arrays[0]);}}}return topics;}

 

进入这个方法,查询topic的配置信息,org.apache.rocketmq.broker.topic.TopicConfigManager#selectTopicConfigide

 

 

public TopicConfig selectTopicConfig(final String topic) {//        从topic配置缓存信息中查询当前topic的配置return this.topicConfigTable.get(topic);}

 

进入这个方法,按消费组和topic查询订阅信息,org.apache.rocketmq.broker.client.ConsumerManager#findSubscriptionDatafetch

public SubscriptionData findSubscriptionData(final String group, final String topic) {//        按组从缓存中获取消费组信息ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);if (consumerGroupInfo != null) {//            按topic从缓存中获取订阅数据return consumerGroupInfo.findSubscriptionData(topic);}return null;}

 

进入这个方法,获取队列最大的offset,org.apache.rocketmq.store.DefaultMessageStore#getMaxOffsetInQueueui

public long getMaxOffsetInQueue(String topic, int queueId) {//        根据topic和queueId找到消费者队列=》ConsumeQueue logic = this.findConsumeQueue(topic, queueId);if (logic != null) {//            获取最大的offset =》long offset = logic.getMaxOffsetInQueue();return offset;}//        若是不存在指定topic和queueId的消费队列直接返回0return 0;}

 

进入这个方法,根据topic和queueId找到消费者队列,org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueuethis

 


public ConsumeQueue findConsumeQueue(String topic, int queueId) {//        找到topic的全部消息队列ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);if (null == map) {ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);if (oldMap != null) {map = oldMap;} else {map = newMap;}}//        按queue id查找消费者队列ConsumeQueue logic = map.get(queueId);if (null == logic) {ConsumeQueue newLogic = new ConsumeQueue(topic,queueId,//                消费者队列存储地址 user.home/store/consumequeueStorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),//                每一个文件存储默认30Wthis.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),this);ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);if (oldLogic != null) {logic = oldLogic;} else {logic = newLogic;}}return logic;}

 

往上返回到这个方法, 获取最大的offset,org.apache.rocketmq.store.ConsumeQueue#getMaxOffsetInQueuecode

 

public long getMaxOffsetInQueue() {//        =》return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE;}

 

进入这个方法,org.apache.rocketmq.store.MappedFileQueue#getMaxOffset

 

public long getMaxOffset() {//        获取存储映射文件队列中索引位置最大的映射文件=》MappedFile mappedFile = getLastMappedFile();if (mappedFile != null) {//            映射文件的起始offset+映射文件的可读取的索引位置return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();}//        若是队列中没有存储映射文件直接返回0return 0;}

 

进入这个方法,获取存储映射文件队列中索引位置最大的映射文件,org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile()

 


public MappedFile getLastMappedFile() {MappedFile mappedFileLast = null;while (!this.mappedFiles.isEmpty()) {try {mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);break;} catch (IndexOutOfBoundsException e) {//continue;} catch (Exception e) {log.error("getLastMappedFile has exception.", e);break;}}return mappedFileLast;}

 

往上返回到这个方法,查询消费者的offset,org.apache.rocketmq.broker.offset.ConsumerOffsetManager#queryOffset(java.lang.String, java.lang.String, int)

 

public long queryOffset(final String group, final String topic, final int queueId) {// topic@group 从本地offset缓存中查询String key = topic + TOPIC_GROUP_SEPARATOR + group;ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);if (null != map) {Long offset = map.get(queueId);if (offset != null)return offset;}return -1;}

 

进入这个方法,按topic、queueId、timeOffset查询最后的时间,org.apache.rocketmq.store.DefaultMessageStore#getMessageStoreTimeStamp

 

@Overridepublic long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueueOffset) {//        按topic和queueId查询到消费队列=》ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId);if (logicQueue != null) {//            按消费者的offset查询存储时间所在的buffer=》SelectMappedBufferResult result = logicQueue.getIndexBuffer(consumeQueueOffset);//            =》return getStoreTime(result);}return -1;}

 

进入这个方法,按topic和queueId查询到消费队列,org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueue

 


public ConsumeQueue findConsumeQueue(String topic, int queueId) {//        找到topic的全部消息队列ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);if (null == map) {ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);if (oldMap != null) {map = oldMap;} else {map = newMap;}}//        按queue id查找消费者队列ConsumeQueue logic = map.get(queueId);if (null == logic) {ConsumeQueue newLogic = new ConsumeQueue(topic,queueId,//                消费者队列存储地址 user.home/store/consumequeueStorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),//                每一个文件存储默认30Wthis.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),this);ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);if (oldLogic != null) {logic = oldLogic;} else {logic = newLogic;}}return logic;}

 

往上返回到这个方法,按消费者的offset查询存储时间所在的buffer,org.apache.rocketmq.store.ConsumeQueue#getIndexBuffer

 

public SelectMappedBufferResult getIndexBuffer(final long startIndex) {int mappedFileSize = this.mappedFileSize;//        获取最小的物理offsetlong offset = startIndex * CQ_STORE_UNIT_SIZE;if (offset >= this.getMinLogicOffset()) {//            根据offset查询映射文件 =》MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);if (mappedFile != null) {SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));return result;}}return null;}

 

进入这个方法,根据offset查询映射文件,org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long)

public MappedFile findMappedFileByOffset(final long offset) {//        =》return findMappedFileByOffset(offset, false);}

 

进入这个方法,org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)




public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {try {//            获取队列中第一个映射文件MappedFile firstMappedFile = this.getFirstMappedFile();//            获取队列中最后一个映射文件MappedFile lastMappedFile = this.getLastMappedFile();if (firstMappedFile != null && lastMappedFile != null) {//                若是offset不在索引文件的offset范围内if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",offset,firstMappedFile.getFileFromOffset(),lastMappedFile.getFileFromOffset() + this.mappedFileSize,this.mappedFileSize,this.mappedFiles.size());} else {//                   找到映射文件在队列中的索引位置int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));MappedFile targetFile = null;try {//                        获取索引文件targetFile = this.mappedFiles.get(index);} catch (Exception ignored) {}//                    offset在目标文件的起始offset和结束offset范围内if (targetFile != null && offset >= targetFile.getFileFromOffset()&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {return targetFile;}//                    若是按索引在队列中找不到映射文件就遍历队列查找映射文件for (MappedFile tmpMappedFile : this.mappedFiles) {if (offset >= tmpMappedFile.getFileFromOffset()&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {return tmpMappedFile;}}}//                若是offset=0获取队列中第一个映射文件,我的感受这个逻辑是否放在前面判断更为合理,仍是放在这里另有深意if (returnFirstOnNotFound) {return firstMappedFile;}}} catch (Exception e) {log.error("findMappedFileByOffset Exception", e);}return null;    }

往上返回到这个方法,按消费组和topic获取tps,org.apache.rocketmq.store.stats.BrokerStatsManager#tpsGroupGetNums

public double tpsGroupGetNums(final String group, final String topic) {final String statsKey = buildStatsKey(topic, group);return this.statsTable.get(GROUP_GET_NUMS).getStatsDataInMinute(statsKey).getTps();    }

往上返回到这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#fetchAllConsumeStatsInBroker结束。

 

说在最后

本次解析仅表明我的观点,仅供参考。

 

加入技术微信群

钉钉技术群

相关文章
相关标签/搜索