rocketmq源码解析默认请求处理器⑥

说在前面apache

默认请求处理器,获取单元化模式消费失败重试的%RETRY%开头的topic微信

 

源码解析this

进入这个方法,获取单元化模式消费失败重试的%RETRY%开头的topic,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getHasUnitSubUnUnitTopicListcode


private RemotingCommand getHasUnitSubUnUnitTopicList(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);byte[] body = this.namesrvController.getRouteInfoManager().getHasUnitSubUnUnitTopicList();response.setBody(body);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;    }

进入这个方法,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#getHasUnitSubUnUnitTopicListorm

public byte[] getHasUnitSubUnUnitTopicList() {TopicList topicList = new TopicList();try {try {this.lock.readLock().lockInterruptibly();Iterator<Entry<String, List<QueueData>>> topicTableIt =this.topicQueueTable.entrySet().iterator();while (topicTableIt.hasNext()) {Entry<String, List<QueueData>> topicEntry = topicTableIt.next();String topic = topicEntry.getKey();List<QueueData> queueDatas = topicEntry.getValue();if (queueDatas != null && queueDatas.size() > 0&& !TopicSysFlag.hasUnitFlag(queueDatas.get(0).getTopicSynFlag())&& TopicSysFlag.hasUnitSubFlag(queueDatas.get(0).getTopicSynFlag())) {topicList.getTopicList().add(topic);}}} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("getAllTopicList Exception", e);}return topicList.encode();}}

往上返回到这个方法,更新namesrv配置,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#updateConfigblog






private RemotingCommand updateConfig(ChannelHandlerContext ctx, RemotingCommand request) {log.info("updateConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));final RemotingCommand response = RemotingCommand.createResponseCommand(null);byte[] body = request.getBody();if (body != null) {String bodyStr;try {bodyStr = new String(body, MixAll.DEFAULT_CHARSET);} catch (UnsupportedEncodingException e) {log.error("updateConfig byte array to string error: ", e);response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("UnsupportedEncodingException " + e);return response;}if (bodyStr == null) {log.error("updateConfig get null body!");response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("string2Properties error");return response;}Properties properties = MixAll.string2Properties(bodyStr);if (properties == null) {log.error("updateConfig MixAll.string2Properties error {}", bodyStr);response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("string2Properties error");return response;}this.namesrvController.getConfiguration().update(properties);}response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;    }

往上返回到这个方法,获取namesrv配置,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getConfigget


private RemotingCommand getConfig(ChannelHandlerContext ctx, RemotingCommand request) {final RemotingCommand response = RemotingCommand.createResponseCommand(null);String content = this.namesrvController.getConfiguration().getAllConfigsFormatString();if (content != null && content.length() > 0) {try {response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));} catch (UnsupportedEncodingException e) {log.error("getConfig error, ", e);response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("UnsupportedEncodingException " + e);return response;}}response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;    }

说在最后源码

本次解析仅表明我的观点,仅供参考。string

 

加入技术微信群it

钉钉技术群

相关文章
相关标签/搜索