说在前面apache
默认请求处理器,获取broker集群信息,从namesrv中获取全部的broker列表,删除namesrv中的topic、获取集群的全部topic、从namesrv中获取系统的topic列表、获取单元化模式非%RETRY%开头的topic、获取单元化模式消费失败重试的%RETRY%开头的topic微信
源码解析app
进入这个方法,获取broker集群信息,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getBrokerClusterInfothis
private RemotingCommand getBrokerClusterInfo(ChannelHandlerContext ctx, RemotingCommand request) {final RemotingCommand response = RemotingCommand.createResponseCommand(null);byte[] content = this.namesrvController.getRouteInfoManager().getAllClusterInfo();response.setBody(content);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response; }
进入这个方法,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getAllClusterInfocode
public byte[] getAllClusterInfo() {ClusterInfo clusterInfoSerializeWrapper = new ClusterInfo();// broker数据clusterInfoSerializeWrapper.setBrokerAddrTable(this.brokerAddrTable);// broker集群clusterInfoSerializeWrapper.setClusterAddrTable(this.clusterAddrTable);return clusterInfoSerializeWrapper.encode(); }
往上返回到这个方法,从namesrv中获取全部的broker列表,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getAllTopicListFromNameserverserver
private RemotingCommand getAllTopicListFromNameserver(ChannelHandlerContext ctx, RemotingCommand request) {final RemotingCommand response = RemotingCommand.createResponseCommand(null);// 从topic队列信息中获取全部的topicbyte[] body = this.namesrvController.getRouteInfoManager().getAllTopicList();response.setBody(body);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response; }
进入这个方法,从topic队列信息中获取全部的topic,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getAllTopicListblog
public byte[] getAllTopicList() {TopicList topicList = new TopicList();try {try {this.lock.readLock().lockInterruptibly();topicList.getTopicList().addAll(this.topicQueueTable.keySet());} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("getAllTopicList Exception", e);}return topicList.encode(); }
往上返回到这个方法,删除namesrv中的topic,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#deleteTopicInNamesrv队列
private RemotingCommand deleteTopicInNamesrv(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final DeleteTopicInNamesrvRequestHeader requestHeader =(DeleteTopicInNamesrvRequestHeader) request.decodeCommandCustomHeader(DeleteTopicInNamesrvRequestHeader.class);// 从topic消息队列信息中删除topic相关的信息this.namesrvController.getRouteInfoManager().deleteTopic(requestHeader.getTopic());response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response; }
进入这个方法,从topic消息队列信息中删除topic相关的信息,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#deleteTopicrem
public void deleteTopic(final String topic) {try {try {this.lock.writeLock().lockInterruptibly();this.topicQueueTable.remove(topic);} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("deleteTopic Exception", e);} }
往上返回到这个方法,获取集群的全部topic,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getTopicsByClusterget
private RemotingCommand getTopicsByCluster(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final GetTopicsByClusterRequestHeader requestHeader =(GetTopicsByClusterRequestHeader) request.decodeCommandCustomHeader(GetTopicsByClusterRequestHeader.class);// 根据broker集群地址获取topicsbyte[] body = this.namesrvController.getRouteInfoManager().getTopicsByCluster(requestHeader.getCluster());response.setBody(body);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response; }
进入这个方法,根据broker集群地址获取topics,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getTopicsByCluster
public byte[] getTopicsByCluster(String cluster) {TopicList topicList = new TopicList();try {try {this.lock.readLock().lockInterruptibly();// 获取集群的brokerSet<String> brokerNameSet = this.clusterAddrTable.get(cluster);for (String brokerName : brokerNameSet) {Iterator<Entry<String, List<QueueData>>> topicTableIt =this.topicQueueTable.entrySet().iterator();while (topicTableIt.hasNext()) {Entry<String, List<QueueData>> topicEntry = topicTableIt.next();String topic = topicEntry.getKey();List<QueueData> queueDatas = topicEntry.getValue();for (QueueData queueData : queueDatas) {if (brokerName.equals(queueData.getBrokerName())) {// 从topic的消息队列中获取topictopicList.getTopicList().add(topic);break;}}}}} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("getAllTopicList Exception", e);}return topicList.encode(); }
往上返回到这个方法,从namesrv中获取系统的topic列表,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getSystemTopicListFromNs
private RemotingCommand getSystemTopicListFromNs(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);byte[] body = this.namesrvController.getRouteInfoManager().getSystemTopicList();response.setBody(body);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response; }
进入这个方法,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getSystemTopicList
public byte[] getSystemTopicList() {TopicList topicList = new TopicList();try {try {this.lock.readLock().lockInterruptibly();for (Map.Entry<String, Set<String>> entry : clusterAddrTable.entrySet()) {// 这里有点疑问,broker集群的clusterName、brokerName添加到topic列表中是什么意思,系统topic就是这些名字吗topicList.getTopicList().add(entry.getKey());topicList.getTopicList().addAll(entry.getValue());}if (brokerAddrTable != null && !brokerAddrTable.isEmpty()) {Iterator<String> it = brokerAddrTable.keySet().iterator();while (it.hasNext()) {BrokerData bd = brokerAddrTable.get(it.next());HashMap<Long, String> brokerAddrs = bd.getBrokerAddrs();if (brokerAddrs != null && !brokerAddrs.isEmpty()) {Iterator<Long> it2 = brokerAddrs.keySet().iterator();// 从broker数据中获取broker地址topicList.setBrokerAddr(brokerAddrs.get(it2.next()));break;}}}} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("getAllTopicList Exception", e);}return topicList.encode(); }
往上返回到这个方法,获取单元化模式非%RETRY%开头的topic,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getUnitTopicList
private RemotingCommand getUnitTopicList(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);// =》byte[] body = this.namesrvController.getRouteInfoManager().getUnitTopics();response.setBody(body);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response; }
进入这个方法,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getUnitTopics
public byte[] getUnitTopics() {TopicList topicList = new TopicList();try {try {this.lock.readLock().lockInterruptibly();Iterator<Entry<String, List<QueueData>>> topicTableIt =this.topicQueueTable.entrySet().iterator();while (topicTableIt.hasNext()) {Entry<String, List<QueueData>> topicEntry = topicTableIt.next();String topic = topicEntry.getKey();List<QueueData> queueDatas = topicEntry.getValue();if (queueDatas != null && queueDatas.size() > 0&& TopicSysFlag.hasUnitFlag(queueDatas.get(0).getTopicSynFlag())) {topicList.getTopicList().add(topic);}}} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("getAllTopicList Exception", e);}return topicList.encode(); }
往上返回到这个方法,获取单元化模式消费失败重试的%RETRY%开头的topic,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getHasUnitSubTopicList
private RemotingCommand getHasUnitSubTopicList(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);byte[] body = this.namesrvController.getRouteInfoManager().getHasUnitSubTopicList();response.setBody(body);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response; }
进入这个方法,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getHasUnitSubTopicList
public byte[] getHasUnitSubTopicList() {TopicList topicList = new TopicList();try {try {this.lock.readLock().lockInterruptibly();Iterator<Entry<String, List<QueueData>>> topicTableIt =this.topicQueueTable.entrySet().iterator();while (topicTableIt.hasNext()) {Entry<String, List<QueueData>> topicEntry = topicTableIt.next();String topic = topicEntry.getKey();List<QueueData> queueDatas = topicEntry.getValue();if (queueDatas != null && queueDatas.size() > 0&& TopicSysFlag.hasUnitSubFlag(queueDatas.get(0).getTopicSynFlag())) {topicList.getTopicList().add(topic);}}} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("getAllTopicList Exception", e);}return topicList.encode(); }
说在最后
本次解析仅表明我的观点,仅供参考。
加入技术微信群
钉钉技术群