说在前面apache
请求处理 从client获取消费者状态微信
源码解析ide
进入这个方法,org.apache.rocketmq.client.impl.ClientRemotingProcessor#getConsumeStatusthis
public Map<MessageQueue, Long> getConsumerStatus(String topic, String group) {MQConsumerInner impl = this.consumerTable.get(group);if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) impl;return consumer.getOffsetStore().cloneOffsetTable(topic);} else if (impl != null && impl instanceof DefaultMQPullConsumerImpl) {DefaultMQPullConsumerImpl consumer = (DefaultMQPullConsumerImpl) impl;return consumer.getOffsetStore().cloneOffsetTable(topic);} else {return Collections.EMPTY_MAP;} }
进入这个方法, 获取topic的消费队列offset,org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#cloneOffsetTablecode
@Overridepublic Map<MessageQueue, Long> cloneOffsetTable(String topic) {Map<MessageQueue, Long> cloneOffsetTable = new HashMap<MessageQueue, Long>();for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {MessageQueue mq = entry.getKey();if (!UtilAll.isBlank(topic) && !topic.equals(mq.getTopic())) {continue;}cloneOffsetTable.put(mq, entry.getValue().get());}return cloneOffsetTable; }
返回到这个方法,org.apache.rocketmq.client.impl.ClientRemotingProcessor#getConsumeStatus结束。blog
说在最后队列
本次解析仅表明我的观点,仅供参考。get
加入技术微信群源码
钉钉技术群io