rocketmq源码解析默认请求处理器④

说在前面apache

默认请求处理器,获取topic的路由信息微信

 

源码解析this

进入这个方法,获取topic的路由信息,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getRouteInfoByTopicdebug




public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final GetRouteInfoRequestHeader requestHeader =(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);//        =》TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());if (topicRouteData != null) {if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {String orderTopicConf =this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,requestHeader.getTopic());topicRouteData.setOrderTopicConf(orderTopicConf);}byte[] content = topicRouteData.encode();response.setBody(content);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}response.setCode(ResponseCode.TOPIC_NOT_EXIST);response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));return response;    }

进入这个方法,按topic获取topic路由信息,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#pickupTopicRouteDatacode







public TopicRouteData pickupTopicRouteData(final String topic) {TopicRouteData topicRouteData = new TopicRouteData();boolean foundQueueData = false;boolean foundBrokerData = false;Set<String> brokerNameSet = new HashSet<String>();List<BrokerData> brokerDataList = new LinkedList<BrokerData>();topicRouteData.setBrokerDatas(brokerDataList);HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();topicRouteData.setFilterServerTable(filterServerMap);try {try {this.lock.readLock().lockInterruptibly();//                获取topic的消息队列List<QueueData> queueDataList = this.topicQueueTable.get(topic);if (queueDataList != null) {topicRouteData.setQueueDatas(queueDataList);foundQueueData = true;Iterator<QueueData> it = queueDataList.iterator();while (it.hasNext()) {QueueData qd = it.next();//                        从消息队列中获取brokerbrokerNameSet.add(qd.getBrokerName());}for (String brokerName : brokerNameSet) {//                        获取broker数据BrokerData brokerData = this.brokerAddrTable.get(brokerName);if (null != brokerData) {BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone());brokerDataList.add(brokerDataClone);foundBrokerData = true;for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {//                                获取过滤的broker地址List<String> filterServerList = this.filterServerTable.get(brokerAddr);filterServerMap.put(brokerAddr, filterServerList);}}}}} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("pickupTopicRouteData Exception", e);}log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);if (foundBrokerData && foundQueueData) {return topicRouteData;}return null;    }

往上返回到这个方法,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getRouteInfoByTopic结束。server

 

说在最后blog

本次解析仅表明我的观点,仅供参考。队列

 

加入技术微信群路由

钉钉技术群get

相关文章
相关标签/搜索