说在前面java
管理请求之查询消费队列apache
源码解析缓存
进入这个方法,org.apache.rocketmq.broker.processor.AdminBrokerProcessor#queryConsumeQueue微信
private RemotingCommand queryConsumeQueue(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {QueryConsumeQueueRequestHeader requestHeader =(QueryConsumeQueueRequestHeader) request.decodeCommandCustomHeader(QueryConsumeQueueRequestHeader.class);RemotingCommand response = RemotingCommand.createResponseCommand(null);// 获取消费队列=》ConsumeQueue consumeQueue = this.brokerController.getMessageStore().getConsumeQueue(requestHeader.getTopic(),requestHeader.getQueueId());if (consumeQueue == null) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("%d@%s is not exist!", requestHeader.getQueueId(), requestHeader.getTopic()));return response;}QueryConsumeQueueResponseBody body = new QueryConsumeQueueResponseBody();response.setCode(ResponseCode.SUCCESS);response.setBody(body.encode());body.setMaxQueueIndex(consumeQueue.getMaxOffsetInQueue());body.setMinQueueIndex(consumeQueue.getMinOffsetInQueue());MessageFilter messageFilter = null;if (requestHeader.getConsumerGroup() != null) {// 按消费组、topic查询订阅信息=》SubscriptionData subscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), requestHeader.getTopic());body.setSubscriptionData(subscriptionData);if (subscriptionData == null) {body.setFilterData(String.format("%s@%s is not online!", requestHeader.getConsumerGroup(), requestHeader.getTopic()));} else {// 获取消费者过滤信息=》ConsumerFilterData filterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(), requestHeader.getConsumerGroup());body.setFilterData(JSON.toJSONString(filterData, true));messageFilter = new ExpressionMessageFilter(subscriptionData, filterData,this.brokerController.getConsumerFilterManager());}}// 根据index获取selectMappedBufferResult=》SelectMappedBufferResult result = consumeQueue.getIndexBuffer(requestHeader.getIndex());if (result == null) {response.setRemark(String.format("Index %d of %d@%s is not exist!", requestHeader.getIndex(), requestHeader.getQueueId(), requestHeader.getTopic()));return response;}try {List<ConsumeQueueData> queues = new ArrayList<>();for (int i = 0; i < result.getSize() && i < requestHeader.getCount() * ConsumeQueue.CQ_STORE_UNIT_SIZE; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {ConsumeQueueData one = new ConsumeQueueData();one.setPhysicOffset(result.getByteBuffer().getLong());one.setPhysicSize(result.getByteBuffer().getInt());one.setTagsCode(result.getByteBuffer().getLong());if (!consumeQueue.isExtAddr(one.getTagsCode())) {queues.add(one);continue;}ConsumeQueueExt.CqExtUnit cqExtUnit = consumeQueue.getExt(one.getTagsCode());if (cqExtUnit != null) {one.setExtendDataJson(JSON.toJSONString(cqExtUnit));if (cqExtUnit.getFilterBitMap() != null) {one.setBitMap(BitsArray.create(cqExtUnit.getFilterBitMap()).toString());}if (messageFilter != null) {one.setEval(messageFilter.isMatchedByConsumeQueue(cqExtUnit.getTagsCode(), cqExtUnit));}} else {one.setMsg("Cq extend not exist!addr: " + one.getTagsCode());}queues.add(one);}body.setQueueData(queues);} finally {result.release();}return response; }
进入这个方法, 获取消费队列,org.apache.rocketmq.store.DefaultMessageStore#getConsumeQueueapp
@Overridepublic ConsumeQueue getConsumeQueue(String topic, int queueId) {ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);if (map == null) {return null;}return map.get(queueId); }
往上返回到这个方法,按消费组、topic查询订阅信息,org.apache.rocketmq.broker.client.ConsumerManager#findSubscriptionDataide
public SubscriptionData findSubscriptionData(final String group, final String topic) {// 按组从缓存中获取消费组信息ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);if (consumerGroupInfo != null) {// 按topic从缓存中获取订阅数据return consumerGroupInfo.findSubscriptionData(topic);}return null; }
往上返回到这个方法,获取消费者过滤信息,org.apache.rocketmq.broker.filter.ConsumerFilterManager#get(java.lang.String, java.lang.String)this
public ConsumerFilterData get(final String topic, final String consumerGroup) {if (!this.filterDataByTopic.containsKey(topic)) {return null;}if (this.filterDataByTopic.get(topic).getGroupFilterData().isEmpty()) {return null;}return this.filterDataByTopic.get(topic).getGroupFilterData().get(consumerGroup); }
往上返回到这个方法,根据index获取selectMappedBufferResult,org.apache.rocketmq.store.ConsumeQueue#getIndexBuffercode
public SelectMappedBufferResult getIndexBuffer(final long startIndex) {int mappedFileSize = this.mappedFileSize;// 获取最小的物理offsetlong offset = startIndex * CQ_STORE_UNIT_SIZE;if (offset >= this.getMinLogicOffset()) {// 根据offset查询映射文件 =》MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);if (mappedFile != null) {SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));return result;}}return null; }
进入这个方法, 根据offset查询映射文件,org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long)orm
public MappedFile findMappedFileByOffset(final long offset) {// =》return findMappedFileByOffset(offset, false); }
进入这个方法,org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)blog
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {try {// 获取队列中第一个映射文件MappedFile firstMappedFile = this.getFirstMappedFile();// 获取队列中最后一个映射文件MappedFile lastMappedFile = this.getLastMappedFile();if (firstMappedFile != null && lastMappedFile != null) {// 若是offset不在索引文件的offset范围内if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",offset,firstMappedFile.getFileFromOffset(),lastMappedFile.getFileFromOffset() + this.mappedFileSize,this.mappedFileSize,this.mappedFiles.size());} else {// 找到映射文件在队列中的索引位置int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));MappedFile targetFile = null;try {// 获取索引文件targetFile = this.mappedFiles.get(index);} catch (Exception ignored) {}// offset在目标文件的起始offset和结束offset范围内if (targetFile != null && offset >= targetFile.getFileFromOffset()&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {return targetFile;}// 若是按索引在队列中找不到映射文件就遍历队列查找映射文件for (MappedFile tmpMappedFile : this.mappedFiles) {if (offset >= tmpMappedFile.getFileFromOffset()&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {return tmpMappedFile;}}}// 若是offset=0获取队列中第一个映射文件,我的感受这个逻辑是否放在前面判断更为合理,仍是放在这里另有深意if (returnFirstOnNotFound) {return firstMappedFile;}}} catch (Exception e) {log.error("findMappedFileByOffset Exception", e);}return null; }
往上返回到这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#queryConsumeQueue结束。
说在最后
本次解析仅表明我的观点,仅供参考。
加入技术微信群
钉钉技术群