说在前面java
管理请求 QUERY_CORRECTION_OFFSET 查询修改后的offsetapache
源码解析微信
进入这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#queryCorrectionOffsetthis
private RemotingCommand queryCorrectionOffset(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);QueryCorrectionOffsetHeader requestHeader =(QueryCorrectionOffsetHeader) request.decodeCommandCustomHeader(QueryCorrectionOffsetHeader.class);// 在全部的消费组中查询最小的offset=》Map<Integer, Long> correctionOffset = this.brokerController.getConsumerOffsetManager().queryMinOffsetInAllGroup(requestHeader.getTopic(), requestHeader.getFilterGroups());// =》按topic和消费组查找offset=》Map<Integer, Long> compareOffset =this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getTopic(), requestHeader.getCompareGroup());if (compareOffset != null && !compareOffset.isEmpty()) {for (Map.Entry<Integer, Long> entry : compareOffset.entrySet()) {Integer queueId = entry.getKey();correctionOffset.put(queueId,correctionOffset.get(queueId) > entry.getValue() ? Long.MAX_VALUE : correctionOffset.get(queueId));}}QueryCorrectionOffsetBody body = new QueryCorrectionOffsetBody();body.setCorrectionOffsets(correctionOffset);response.setBody(body.encode());response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}
进入这个方法org.apache.rocketmq.broker.offset.ConsumerOffsetManager#queryMinOffsetInAllGroup查询全部消费组中最小的offset3d
public Map<Integer, Long> queryMinOffsetInAllGroup(final String topic, final String filterGroups) {Map<Integer, Long> queueMinOffset = new HashMap<Integer, Long>();Set<String> topicGroups = this.offsetTable.keySet();if (!UtilAll.isBlank(filterGroups)) {for (String group : filterGroups.split(",")) {Iterator<String> it = topicGroups.iterator();while (it.hasNext()) {if (group.equals(it.next().split(TOPIC_GROUP_SEPARATOR)[1])) {// 若是这个组是过滤的消费组从集合中删除it.remove();}}}}for (Map.Entry<String, ConcurrentMap<Integer, Long>> offSetEntry : this.offsetTable.entrySet()) {String topicGroup = offSetEntry.getKey();String[] topicGroupArr = topicGroup.split(TOPIC_GROUP_SEPARATOR);if (topic.equals(topicGroupArr[0])) {for (Entry<Integer, Long> entry : offSetEntry.getValue().entrySet()) {// 查询队列最小的offset=》long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, entry.getKey());if (entry.getValue() >= minOffset) {Long offset = queueMinOffset.get(entry.getKey());if (offset == null) {queueMinOffset.put(entry.getKey(), Math.min(Long.MAX_VALUE, entry.getValue()));} else {queueMinOffset.put(entry.getKey(), Math.min(entry.getValue(), offset));}}}}}return queueMinOffset;}
进入这个方法org.apache.rocketmq.store.DefaultMessageStore#getMinOffsetInQueue,查询消费队列中最小的offsetcode
public long getMinOffsetInQueue(String topic, int queueId) {// 根据topic和queueId查询消费者队列 =》ConsumeQueue logic = this.findConsumeQueue(topic, queueId);if (logic != null) {// 获取队列中的最小offsetreturn logic.getMinOffsetInQueue();}return -1;}
进入这个方法,org.apache.rocketmq.store.DefaultMessageStore#getMinOffsetInQueue 根据topic和queueId查询消费者队列blog
public long getMinOffsetInQueue(String topic, int queueId) {// 根据topic和queueId查询消费者队列 =》ConsumeQueue logic = this.findConsumeQueue(topic, queueId);if (logic != null) {// 获取队列中的最小offsetreturn logic.getMinOffsetInQueue();}return -1;}
进入这个方法org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueue队列
public ConsumeQueue findConsumeQueue(String topic, int queueId) {// 找到topic的全部消息队列ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);if (null == map) {ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);if (oldMap != null) {map = oldMap;} else {map = newMap;}}// 按queue id查找消费者队列ConsumeQueue logic = map.get(queueId);if (null == logic) {ConsumeQueue newLogic = new ConsumeQueue(topic,queueId,// 消费者队列存储地址 user.home/store/consumequeueStorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),// 每一个文件存储默认30Wthis.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),this);ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);if (oldLogic != null) {logic = oldLogic;} else {logic = newLogic;}}return logic;}
往上返回到这个方法org.apache.rocketmq.broker.offset.ConsumerOffsetManager#queryOffset(java.lang.String, java.lang.String) 按topic和消费组查找offsetrem
public Map<Integer, Long> queryOffset(final String group, final String topic) {// topic@groupString key = topic + TOPIC_GROUP_SEPARATOR + group;return this.offsetTable.get(key);}
往上返回到这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#queryCorrectionOffset结束get
说在最后
本次解析仅表明我的观点,仅供参考。
加入技术微信群
钉钉技术群