rocketmq源码解析默认请求处理器⑤

说在前面apache

默认请求处理器,获取broker集群信息,从namesrv中获取全部的broker列表,删除namesrv中的topic、获取集群的全部topic、从namesrv中获取系统的topic列表、获取单元化模式非%RETRY%开头的topic、获取单元化模式消费失败重试的%RETRY%开头的topic微信

 

源码解析app

进入这个方法,获取broker集群信息,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getBrokerClusterInfothis


private RemotingCommand getBrokerClusterInfo(ChannelHandlerContext ctx, RemotingCommand request) {final RemotingCommand response = RemotingCommand.createResponseCommand(null);byte[] content = this.namesrvController.getRouteInfoManager().getAllClusterInfo();response.setBody(content);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;    }

进入这个方法,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getAllClusterInfocode

public byte[] getAllClusterInfo() {ClusterInfo clusterInfoSerializeWrapper = new ClusterInfo();//        broker数据clusterInfoSerializeWrapper.setBrokerAddrTable(this.brokerAddrTable);//        broker集群clusterInfoSerializeWrapper.setClusterAddrTable(this.clusterAddrTable);return clusterInfoSerializeWrapper.encode();    }

往上返回到这个方法,从namesrv中获取全部的broker列表,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getAllTopicListFromNameserverserver


private RemotingCommand getAllTopicListFromNameserver(ChannelHandlerContext ctx, RemotingCommand request) {final RemotingCommand response = RemotingCommand.createResponseCommand(null);//        从topic队列信息中获取全部的topicbyte[] body = this.namesrvController.getRouteInfoManager().getAllTopicList();response.setBody(body);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;    }

进入这个方法,从topic队列信息中获取全部的topic,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getAllTopicListblog

public byte[] getAllTopicList() {TopicList topicList = new TopicList();try {try {this.lock.readLock().lockInterruptibly();topicList.getTopicList().addAll(this.topicQueueTable.keySet());} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("getAllTopicList Exception", e);}return topicList.encode();    }

往上返回到这个方法,删除namesrv中的topic,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#deleteTopicInNamesrv队列


private RemotingCommand deleteTopicInNamesrv(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final DeleteTopicInNamesrvRequestHeader requestHeader =(DeleteTopicInNamesrvRequestHeader) request.decodeCommandCustomHeader(DeleteTopicInNamesrvRequestHeader.class);//        从topic消息队列信息中删除topic相关的信息this.namesrvController.getRouteInfoManager().deleteTopic(requestHeader.getTopic());response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;    }

进入这个方法,从topic消息队列信息中删除topic相关的信息,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#deleteTopicrem

public void deleteTopic(final String topic) {try {try {this.lock.writeLock().lockInterruptibly();this.topicQueueTable.remove(topic);} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("deleteTopic Exception", e);}    }

往上返回到这个方法,获取集群的全部topic,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getTopicsByClusterget


private RemotingCommand getTopicsByCluster(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final GetTopicsByClusterRequestHeader requestHeader =(GetTopicsByClusterRequestHeader) request.decodeCommandCustomHeader(GetTopicsByClusterRequestHeader.class);//        根据broker集群地址获取topicsbyte[] body = this.namesrvController.getRouteInfoManager().getTopicsByCluster(requestHeader.getCluster());response.setBody(body);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;    }

进入这个方法,根据broker集群地址获取topics,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getTopicsByCluster

public byte[] getTopicsByCluster(String cluster) {TopicList topicList = new TopicList();try {try {this.lock.readLock().lockInterruptibly();//                获取集群的brokerSet<String> brokerNameSet = this.clusterAddrTable.get(cluster);for (String brokerName : brokerNameSet) {Iterator<Entry<String, List<QueueData>>> topicTableIt =this.topicQueueTable.entrySet().iterator();while (topicTableIt.hasNext()) {Entry<String, List<QueueData>> topicEntry = topicTableIt.next();String topic = topicEntry.getKey();List<QueueData> queueDatas = topicEntry.getValue();for (QueueData queueData : queueDatas) {if (brokerName.equals(queueData.getBrokerName())) {//                                从topic的消息队列中获取topictopicList.getTopicList().add(topic);break;}}}}} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("getAllTopicList Exception", e);}return topicList.encode();    }

往上返回到这个方法,从namesrv中获取系统的topic列表,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getSystemTopicListFromNs


private RemotingCommand getSystemTopicListFromNs(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);byte[] body = this.namesrvController.getRouteInfoManager().getSystemTopicList();response.setBody(body);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;    }

进入这个方法,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getSystemTopicList


public byte[] getSystemTopicList() {TopicList topicList = new TopicList();try {try {this.lock.readLock().lockInterruptibly();for (Map.Entry<String, Set<String>> entry : clusterAddrTable.entrySet()) {//                    这里有点疑问,broker集群的clusterName、brokerName添加到topic列表中是什么意思,系统topic就是这些名字吗topicList.getTopicList().add(entry.getKey());topicList.getTopicList().addAll(entry.getValue());}if (brokerAddrTable != null && !brokerAddrTable.isEmpty()) {Iterator<String> it = brokerAddrTable.keySet().iterator();while (it.hasNext()) {BrokerData bd = brokerAddrTable.get(it.next());HashMap<Long, String> brokerAddrs = bd.getBrokerAddrs();if (brokerAddrs != null && !brokerAddrs.isEmpty()) {Iterator<Long> it2 = brokerAddrs.keySet().iterator();//                            从broker数据中获取broker地址topicList.setBrokerAddr(brokerAddrs.get(it2.next()));break;}}}} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("getAllTopicList Exception", e);}return topicList.encode();    }

往上返回到这个方法,获取单元化模式非%RETRY%开头的topic,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getUnitTopicList


private RemotingCommand getUnitTopicList(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);//        =》byte[] body = this.namesrvController.getRouteInfoManager().getUnitTopics();response.setBody(body);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;    }

进入这个方法,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getUnitTopics

public byte[] getUnitTopics() {TopicList topicList = new TopicList();try {try {this.lock.readLock().lockInterruptibly();Iterator<Entry<String, List<QueueData>>> topicTableIt =this.topicQueueTable.entrySet().iterator();while (topicTableIt.hasNext()) {Entry<String, List<QueueData>> topicEntry = topicTableIt.next();String topic = topicEntry.getKey();List<QueueData> queueDatas = topicEntry.getValue();if (queueDatas != null && queueDatas.size() > 0&& TopicSysFlag.hasUnitFlag(queueDatas.get(0).getTopicSynFlag())) {topicList.getTopicList().add(topic);}}} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("getAllTopicList Exception", e);}return topicList.encode();    }

往上返回到这个方法,获取单元化模式消费失败重试的%RETRY%开头的topic,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getHasUnitSubTopicList


private RemotingCommand getHasUnitSubTopicList(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);byte[] body = this.namesrvController.getRouteInfoManager().getHasUnitSubTopicList();response.setBody(body);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;    }

进入这个方法,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getHasUnitSubTopicList

public byte[] getHasUnitSubTopicList() {TopicList topicList = new TopicList();try {try {this.lock.readLock().lockInterruptibly();Iterator<Entry<String, List<QueueData>>> topicTableIt =this.topicQueueTable.entrySet().iterator();while (topicTableIt.hasNext()) {Entry<String, List<QueueData>> topicEntry = topicTableIt.next();String topic = topicEntry.getKey();List<QueueData> queueDatas = topicEntry.getValue();if (queueDatas != null && queueDatas.size() > 0&& TopicSysFlag.hasUnitSubFlag(queueDatas.get(0).getTopicSynFlag())) {topicList.getTopicList().add(topic);}}} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("getAllTopicList Exception", e);}return topicList.encode();    }

 

说在最后

本次解析仅表明我的观点,仅供参考。

 

加入技术微信群

钉钉技术群

相关文章
相关标签/搜索