rocketmq源码解析请求处理从client获取消费者状态

说在前面apache

请求处理 从client获取消费者状态微信

 

源码解析ide

进入这个方法,org.apache.rocketmq.client.impl.ClientRemotingProcessor#getConsumeStatusthis

public Map<MessageQueue, Long> getConsumerStatus(String topic, String group) {MQConsumerInner impl = this.consumerTable.get(group);if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) impl;return consumer.getOffsetStore().cloneOffsetTable(topic);} else if (impl != null && impl instanceof DefaultMQPullConsumerImpl) {DefaultMQPullConsumerImpl consumer = (DefaultMQPullConsumerImpl) impl;return consumer.getOffsetStore().cloneOffsetTable(topic);} else {return Collections.EMPTY_MAP;}    }

进入这个方法, 获取topic的消费队列offset,org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#cloneOffsetTablecode

@Overridepublic Map<MessageQueue, Long> cloneOffsetTable(String topic) {Map<MessageQueue, Long> cloneOffsetTable = new HashMap<MessageQueue, Long>();for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {MessageQueue mq = entry.getKey();if (!UtilAll.isBlank(topic) && !topic.equals(mq.getTopic())) {continue;}cloneOffsetTable.put(mq, entry.getValue().get());}return cloneOffsetTable;    }

返回到这个方法,org.apache.rocketmq.client.impl.ClientRemotingProcessor#getConsumeStatus结束。blog

 

说在最后队列

本次解析仅表明我的观点,仅供参考。get

 

加入技术微信群源码

钉钉技术群io

相关文章
相关标签/搜索