rocketmq源码解析消费者管理处理器③

说在前面java

消费者管理处理器,查询消费者offsetapache

 

源码解析缓存

进入这个方法,查询消费者offset,org.apache.rocketmq.broker.processor.ConsumerManageProcessor#queryConsumerOffset微信



private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response =RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);final QueryConsumerOffsetResponseHeader responseHeader =(QueryConsumerOffsetResponseHeader) response.readCustomHeader();final QueryConsumerOffsetRequestHeader requestHeader =(QueryConsumerOffsetRequestHeader) request.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);//        查询offset=》long offset =this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());if (offset >= 0) {responseHeader.setOffset(offset);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);} else {//            获取最小的offset=》long minOffset =this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),requestHeader.getQueueId());if (minOffset <= 0//                    检查持久化的offset=》&& !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {responseHeader.setOffset(0L);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);} else {response.setCode(ResponseCode.QUERY_NOT_FOUND);response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");}}return response;    }

进入这个方法,查询offset,org.apache.rocketmq.broker.offset.ConsumerOffsetManager#queryOffset(java.lang.String, java.lang.String, int)app

public long queryOffset(final String group, final String topic, final int queueId) {// topic@group 从本地offset缓存中查询String key = topic + TOPIC_GROUP_SEPARATOR + group;ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);if (null != map) {Long offset = map.get(queueId);if (offset != null)return offset;}return -1;    }

返回到这个方法,获取最小的offset,org.apache.rocketmq.store.DefaultMessageStore#getMinOffsetInQueueide

public long getMinOffsetInQueue(String topic, int queueId) {//        根据topic和queueId查询消费者队列 =》ConsumeQueue logic = this.findConsumeQueue(topic, queueId);if (logic != null) {//            获取队列中的最小offsetreturn logic.getMinOffsetInQueue();}return -1;    }

进入这个方法,根据topic和queueId查询消费者队列,org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueuethis


public ConsumeQueue findConsumeQueue(String topic, int queueId) {//        找到topic的全部消息队列ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);if (null == map) {ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);if (oldMap != null) {map = oldMap;} else {map = newMap;}}//        按queue id查找消费者队列ConsumeQueue logic = map.get(queueId);if (null == logic) {ConsumeQueue newLogic = new ConsumeQueue(topic,queueId,//                消费者队列存储地址 user.home/store/consumequeueStorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),//                每一个文件存储默认30Wthis.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),this);ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);if (oldLogic != null) {logic = oldLogic;} else {logic = newLogic;}}return logic;    }

往上返回到这个方法,检查持久化的offset,org.apache.rocketmq.store.DefaultMessageStore#checkInDiskByConsumeOffsetcode



@Overridepublic boolean checkInDiskByConsumeOffset(final String topic, final int queueId, long consumeOffset) {//        从commitLog中获取最大的offset=》final long maxOffsetPy = this.commitLog.getMaxOffset();//        按队列id和topic查找消息队列=》ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);if (consumeQueue != null) {SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(consumeOffset);if (bufferConsumeQueue != null) {try {for (int i = 0; i < bufferConsumeQueue.getSize(); ) {i += ConsumeQueue.CQ_STORE_UNIT_SIZE;long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();return checkInDiskByCommitOffset(offsetPy, maxOffsetPy);}} finally {bufferConsumeQueue.release();}} else {return false;}}return false;    }

进入这个方法,从commitLog中获取最大的offset,org.apache.rocketmq.store.CommitLog#getMaxOffsetblog

public long getMaxOffset() {return this.mappedFileQueue.getMaxOffset();    }

往上返回到这个方法,按队列id和topic查找消息队列,org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueue上面介绍过了。队列

往上返回到这个方法,org.apache.rocketmq.broker.processor.ConsumerManageProcessor#queryConsumerOffset结束。

 

说在最后

本次解析仅表明我的观点,仅供参考。

 

加入技术微信群

钉钉技术群

相关文章
相关标签/搜索