说在前面java
消费者管理处理器,查询消费者offsetapache
源码解析缓存
进入这个方法,查询消费者offset,org.apache.rocketmq.broker.processor.ConsumerManageProcessor#queryConsumerOffset微信
private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response =RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);final QueryConsumerOffsetResponseHeader responseHeader =(QueryConsumerOffsetResponseHeader) response.readCustomHeader();final QueryConsumerOffsetRequestHeader requestHeader =(QueryConsumerOffsetRequestHeader) request.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);// 查询offset=》long offset =this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());if (offset >= 0) {responseHeader.setOffset(offset);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);} else {// 获取最小的offset=》long minOffset =this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),requestHeader.getQueueId());if (minOffset <= 0// 检查持久化的offset=》&& !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {responseHeader.setOffset(0L);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);} else {response.setCode(ResponseCode.QUERY_NOT_FOUND);response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");}}return response; }
进入这个方法,查询offset,org.apache.rocketmq.broker.offset.ConsumerOffsetManager#queryOffset(java.lang.String, java.lang.String, int)app
public long queryOffset(final String group, final String topic, final int queueId) {// topic@group 从本地offset缓存中查询String key = topic + TOPIC_GROUP_SEPARATOR + group;ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);if (null != map) {Long offset = map.get(queueId);if (offset != null)return offset;}return -1; }
返回到这个方法,获取最小的offset,org.apache.rocketmq.store.DefaultMessageStore#getMinOffsetInQueueide
public long getMinOffsetInQueue(String topic, int queueId) {// 根据topic和queueId查询消费者队列 =》ConsumeQueue logic = this.findConsumeQueue(topic, queueId);if (logic != null) {// 获取队列中的最小offsetreturn logic.getMinOffsetInQueue();}return -1; }
进入这个方法,根据topic和queueId查询消费者队列,org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueuethis
public ConsumeQueue findConsumeQueue(String topic, int queueId) {// 找到topic的全部消息队列ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);if (null == map) {ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);if (oldMap != null) {map = oldMap;} else {map = newMap;}}// 按queue id查找消费者队列ConsumeQueue logic = map.get(queueId);if (null == logic) {ConsumeQueue newLogic = new ConsumeQueue(topic,queueId,// 消费者队列存储地址 user.home/store/consumequeueStorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),// 每一个文件存储默认30Wthis.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),this);ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);if (oldLogic != null) {logic = oldLogic;} else {logic = newLogic;}}return logic; }
往上返回到这个方法,检查持久化的offset,org.apache.rocketmq.store.DefaultMessageStore#checkInDiskByConsumeOffsetcode
@Overridepublic boolean checkInDiskByConsumeOffset(final String topic, final int queueId, long consumeOffset) {// 从commitLog中获取最大的offset=》final long maxOffsetPy = this.commitLog.getMaxOffset();// 按队列id和topic查找消息队列=》ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);if (consumeQueue != null) {SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(consumeOffset);if (bufferConsumeQueue != null) {try {for (int i = 0; i < bufferConsumeQueue.getSize(); ) {i += ConsumeQueue.CQ_STORE_UNIT_SIZE;long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();return checkInDiskByCommitOffset(offsetPy, maxOffsetPy);}} finally {bufferConsumeQueue.release();}} else {return false;}}return false; }
进入这个方法,从commitLog中获取最大的offset,org.apache.rocketmq.store.CommitLog#getMaxOffsetblog
public long getMaxOffset() {return this.mappedFileQueue.getMaxOffset(); }
往上返回到这个方法,按队列id和topic查找消息队列,org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueue上面介绍过了。队列
往上返回到这个方法,org.apache.rocketmq.broker.processor.ConsumerManageProcessor#queryConsumerOffset结束。
说在最后
本次解析仅表明我的观点,仅供参考。
加入技术微信群
钉钉技术群