rocketmq源码解析默认请求处理器①

说在前面apache

默认请求处理器,查询数据版本微信

 

源码解析this

进入这个方法,查询数据版本,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#queryBrokerTopicConfigcode



public RemotingCommand queryBrokerTopicConfig(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class);final QueryDataVersionResponseHeader responseHeader = (QueryDataVersionResponseHeader) response.readCustomHeader();final QueryDataVersionRequestHeader requestHeader =(QueryDataVersionRequestHeader) request.decodeCommandCustomHeader(QueryDataVersionRequestHeader.class);DataVersion dataVersion = DataVersion.decode(request.getBody(), DataVersion.class);//       broker topic配置是否改变=》Boolean changed = this.namesrvController.getRouteInfoManager().isBrokerTopicConfigChanged(requestHeader.getBrokerAddr(), dataVersion);if (!changed) {//            更新broker配置的最后更新时间=》this.namesrvController.getRouteInfoManager().updateBrokerInfoUpdateTimestamp(requestHeader.getBrokerAddr());}//        查询namesrv的数据版本=》DataVersion nameSeverDataVersion = this.namesrvController.getRouteInfoManager().queryBrokerTopicConfig(requestHeader.getBrokerAddr());response.setCode(ResponseCode.SUCCESS);response.setRemark(null);if (nameSeverDataVersion != null) {response.setBody(nameSeverDataVersion.encode());}responseHeader.setChanged(changed);return response;    }

进入这个方法,roker topic配置是否改变,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#isBrokerTopicConfigChangedblog

public boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) {//        查询broker的topic配置版本=》DataVersion prev = queryBrokerTopicConfig(brokerAddr);return null == prev || !prev.equals(dataVersion);    }

进入这个方法,查询broker的topic配置版本,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#isBrokerTopicConfigChangedget

public boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) {//        查询broker的topic配置版本=》DataVersion prev = queryBrokerTopicConfig(brokerAddr);return null == prev || !prev.equals(dataVersion);    }

往上返回到这个方法,更新broker配置的最后更新时间,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#updateBrokerInfoUpdateTimestamp源码

public void updateBrokerInfoUpdateTimestamp(final String brokerAddr) {BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);if (prev != null) {prev.setLastUpdateTimestamp(System.currentTimeMillis());}    }

往上返回到这个方法,查询namesrv的数据版本,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#queryBrokerTopicConfigio

public DataVersion queryBrokerTopicConfig(final String brokerAddr) {BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);if (prev != null) {return prev.getDataVersion();}return null;    }

往上返回到这个方法,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#queryBrokerTopicConfig结束。ast

 

说在最后class

本次解析仅表明我的观点,仅供参考。

 

加入技术微信群

钉钉技术群

相关文章
相关标签/搜索