说在前面apache
管理请求 QUERY_TOPIC_CONSUME_BY_WHO 查询topic被哪些消费者消费缓存
源码解析微信
进入这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#queryTopicConsumeByWho查询topic被哪些消费者消费this
private RemotingCommand queryTopicConsumeByWho(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); QueryTopicConsumeByWhoRequestHeader requestHeader = (QueryTopicConsumeByWhoRequestHeader) request.decodeCommandCustomHeader(QueryTopicConsumeByWhoRequestHeader.class); // 根据topic查找消费者的消费组信息=》 HashSet<String> groups = this.brokerController.getConsumerManager().queryTopicConsumeByWho(requestHeader.getTopic()); Set<String> groupInOffset = this.brokerController.getConsumerOffsetManager().whichGroupByTopic(requestHeader.getTopic()); if (groupInOffset != null && !groupInOffset.isEmpty()) { groups.addAll(groupInOffset); } GroupList groupList = new GroupList(); groupList.setGroupList(groups); byte[] body = groupList.encode(); response.setBody(body); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
进入这个方法org.apache.rocketmq.broker.client.ConsumerManager#queryTopicConsumeByWho根据topic从消费组信息中查询消费组code
public HashSet<String> queryTopicConsumeByWho(final String topic) { HashSet<String> groups = new HashSet<>(); // 遍历换粗中消费组信息 Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, ConsumerGroupInfo> entry = it.next(); // 获取组中的缓存订阅信息 ConcurrentMap<String, SubscriptionData> subscriptionTable = entry.getValue().getSubscriptionTable(); if (subscriptionTable.containsKey(topic)) { groups.add(entry.getKey()); } } return groups; }
往上返回到这个方法org.apache.rocketmq.broker.offset.ConsumerOffsetManager#whichGroupByTopic按topic从offset信息中查询消费组blog
public Set<String> whichGroupByTopic(final String topic) { Set<String> groups = new HashSet<String>(); // 遍历topic和组的offset信息 Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, ConcurrentMap<Integer, Long>> next = it.next(); String topicAtGroup = next.getKey(); String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); if (arrays.length == 2) { if (topic.equals(arrays[0])) { groups.add(arrays[1]); } } } return groups; }
往上返回到这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#queryTopicConsumeByWho结束ip
说在最后get
本次解析仅表明我的观点,仅供参考。源码
加入技术微信群it
钉钉技术群