说在前面apache
默认请求处理器,查询数据版本微信
源码解析this
进入这个方法,查询数据版本,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#queryBrokerTopicConfigcode
public RemotingCommand queryBrokerTopicConfig(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class);final QueryDataVersionResponseHeader responseHeader = (QueryDataVersionResponseHeader) response.readCustomHeader();final QueryDataVersionRequestHeader requestHeader =(QueryDataVersionRequestHeader) request.decodeCommandCustomHeader(QueryDataVersionRequestHeader.class);DataVersion dataVersion = DataVersion.decode(request.getBody(), DataVersion.class);// broker topic配置是否改变=》Boolean changed = this.namesrvController.getRouteInfoManager().isBrokerTopicConfigChanged(requestHeader.getBrokerAddr(), dataVersion);if (!changed) {// 更新broker配置的最后更新时间=》this.namesrvController.getRouteInfoManager().updateBrokerInfoUpdateTimestamp(requestHeader.getBrokerAddr());}// 查询namesrv的数据版本=》DataVersion nameSeverDataVersion = this.namesrvController.getRouteInfoManager().queryBrokerTopicConfig(requestHeader.getBrokerAddr());response.setCode(ResponseCode.SUCCESS);response.setRemark(null);if (nameSeverDataVersion != null) {response.setBody(nameSeverDataVersion.encode());}responseHeader.setChanged(changed);return response; }
进入这个方法,roker topic配置是否改变,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#isBrokerTopicConfigChangedblog
public boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) {// 查询broker的topic配置版本=》DataVersion prev = queryBrokerTopicConfig(brokerAddr);return null == prev || !prev.equals(dataVersion); }
进入这个方法,查询broker的topic配置版本,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#isBrokerTopicConfigChangedget
public boolean isBrokerTopicConfigChanged(final String brokerAddr, final DataVersion dataVersion) {// 查询broker的topic配置版本=》DataVersion prev = queryBrokerTopicConfig(brokerAddr);return null == prev || !prev.equals(dataVersion); }
往上返回到这个方法,更新broker配置的最后更新时间,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#updateBrokerInfoUpdateTimestamp源码
public void updateBrokerInfoUpdateTimestamp(final String brokerAddr) {BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);if (prev != null) {prev.setLastUpdateTimestamp(System.currentTimeMillis());} }
往上返回到这个方法,查询namesrv的数据版本,org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#queryBrokerTopicConfigio
public DataVersion queryBrokerTopicConfig(final String brokerAddr) {BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);if (prev != null) {return prev.getDataVersion();}return null; }
往上返回到这个方法,org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#queryBrokerTopicConfig结束。ast
说在最后class
本次解析仅表明我的观点,仅供参考。
加入技术微信群
钉钉技术群