rocketmq源码解析之管理请求查询修改后的offset

说在前面java

管理请求 QUERY_CORRECTION_OFFSET 查询修改后的offsetapache

 

源码解析微信

进入这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#queryCorrectionOffsetthis




private RemotingCommand queryCorrectionOffset(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);QueryCorrectionOffsetHeader requestHeader =(QueryCorrectionOffsetHeader) request.decodeCommandCustomHeader(QueryCorrectionOffsetHeader.class);//        在全部的消费组中查询最小的offset=》Map<Integer, Long> correctionOffset = this.brokerController.getConsumerOffsetManager().queryMinOffsetInAllGroup(requestHeader.getTopic(), requestHeader.getFilterGroups());//        =》按topic和消费组查找offset=》Map<Integer, Long> compareOffset =this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getTopic(), requestHeader.getCompareGroup());if (compareOffset != null && !compareOffset.isEmpty()) {for (Map.Entry<Integer, Long> entry : compareOffset.entrySet()) {Integer queueId = entry.getKey();correctionOffset.put(queueId,correctionOffset.get(queueId) > entry.getValue() ? Long.MAX_VALUE : correctionOffset.get(queueId));}}QueryCorrectionOffsetBody body = new QueryCorrectionOffsetBody();body.setCorrectionOffsets(correctionOffset);response.setBody(body.encode());response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}

进入这个方法org.apache.rocketmq.broker.offset.ConsumerOffsetManager#queryMinOffsetInAllGroup查询全部消费组中最小的offset3d

 

 



public Map<Integer, Long> queryMinOffsetInAllGroup(final String topic, final String filterGroups) {Map<Integer, Long> queueMinOffset = new HashMap<Integer, Long>();Set<String> topicGroups = this.offsetTable.keySet();if (!UtilAll.isBlank(filterGroups)) {for (String group : filterGroups.split(",")) {Iterator<String> it = topicGroups.iterator();while (it.hasNext()) {if (group.equals(it.next().split(TOPIC_GROUP_SEPARATOR)[1])) {//                        若是这个组是过滤的消费组从集合中删除it.remove();}}}}for (Map.Entry<String, ConcurrentMap<Integer, Long>> offSetEntry : this.offsetTable.entrySet()) {String topicGroup = offSetEntry.getKey();String[] topicGroupArr = topicGroup.split(TOPIC_GROUP_SEPARATOR);if (topic.equals(topicGroupArr[0])) {for (Entry<Integer, Long> entry : offSetEntry.getValue().entrySet()) {//                    查询队列最小的offset=》long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, entry.getKey());if (entry.getValue() >= minOffset) {Long offset = queueMinOffset.get(entry.getKey());if (offset == null) {queueMinOffset.put(entry.getKey(), Math.min(Long.MAX_VALUE, entry.getValue()));} else {queueMinOffset.put(entry.getKey(), Math.min(entry.getValue(), offset));}}}}}return queueMinOffset;}

进入这个方法org.apache.rocketmq.store.DefaultMessageStore#getMinOffsetInQueue,查询消费队列中最小的offsetcode

 

 

public long getMinOffsetInQueue(String topic, int queueId) {//        根据topic和queueId查询消费者队列 =》ConsumeQueue logic = this.findConsumeQueue(topic, queueId);if (logic != null) {//            获取队列中的最小offsetreturn logic.getMinOffsetInQueue();}return -1;}

进入这个方法,org.apache.rocketmq.store.DefaultMessageStore#getMinOffsetInQueue 根据topic和queueId查询消费者队列blog

 

 

public long getMinOffsetInQueue(String topic, int queueId) {//        根据topic和queueId查询消费者队列 =》ConsumeQueue logic = this.findConsumeQueue(topic, queueId);if (logic != null) {//            获取队列中的最小offsetreturn logic.getMinOffsetInQueue();}return -1;}

进入这个方法org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueue队列

 

 


public ConsumeQueue findConsumeQueue(String topic, int queueId) {//        找到topic的全部消息队列ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);if (null == map) {ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);if (oldMap != null) {map = oldMap;} else {map = newMap;}}//        按queue id查找消费者队列ConsumeQueue logic = map.get(queueId);if (null == logic) {ConsumeQueue newLogic = new ConsumeQueue(topic,queueId,//                消费者队列存储地址 user.home/store/consumequeueStorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),//                每一个文件存储默认30Wthis.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),this);ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);if (oldLogic != null) {logic = oldLogic;} else {logic = newLogic;}}return logic;}

往上返回到这个方法org.apache.rocketmq.broker.offset.ConsumerOffsetManager#queryOffset(java.lang.String, java.lang.String) 按topic和消费组查找offsetrem

 

 

public Map<Integer, Long> queryOffset(final String group, final String topic) {// topic@groupString key = topic + TOPIC_GROUP_SEPARATOR + group;return this.offsetTable.get(key);}

 

往上返回到这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#queryCorrectionOffset结束get

 

说在最后

本次解析仅表明我的观点,仅供参考。

 

加入技术微信群

钉钉技术群

相关文章
相关标签/搜索