说在前面apache
今天开始解析管理请求处理之UPDATE_BROKER_CONFIG更新broker配置信息json
源码解析缓存
进入这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#updateBrokerConfig 更新broker配置信息微信
private synchronized RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); log.info("updateBrokerConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); byte[] body = request.getBody(); if (body != null) { try { String bodyStr = new String(body, MixAll.DEFAULT_CHARSET); Properties properties = MixAll.string2Properties(bodyStr); if (properties != null) { log.info("updateBrokerConfig, new config: [{}] client: {} ", properties, ctx.channel().remoteAddress()); // 更新持久存储 =》 this.brokerController.getConfiguration().update(properties); // 若是有权限设置强制更新 if (properties.containsKey("brokerPermission")) { // 更新数据的版本号 this.brokerController.getTopicConfigManager().getDataVersion().nextVersion(); // 注册全部的broker =》 this.brokerController.registerBrokerAll(false, false, true); } } else { log.error("string2Properties error"); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("string2Properties error"); return response; } } catch (UnsupportedEncodingException e) { log.error("", e); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("UnsupportedEncodingException " + e); return response; } } response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
进入这个方法org.apache.rocketmq.common.Configuration#update 跟更新配置多线程
public void update(Properties properties) { try { // 读写锁同步实现 readWriteLock.writeLock().lockInterruptibly(); try { // the property must be exist when update mergeIfExist(properties, this.allConfigs); for (Object configObject : configObjectList) { // not allConfigs to update... MixAll.properties2Object(properties, configObject); } // 更新数据的版本号 this.dataVersion.nextVersion(); } finally { readWriteLock.writeLock().unlock(); } } catch (InterruptedException e) { log.error("update lock error, {}", properties); return; } // 持久化=》 persist(); }
进入这个方法org.apache.rocketmq.common.Configuration#persist broker配置持久化,从这个类org.apache.rocketmq.broker.BrokerPathConfigHelper的brokerConfigPath变量值是能够知道broker的配置文件路径是并发
private static String brokerConfigPath = System.getProperty("user.home") + File.separator + "store" + File.separator + "config" + File.separator + "broker.properties";
进入这个方法org.apache.rocketmq.common.MixAll#string2File文件存储app
public static void string2File(final String str, final String fileName) throws IOException { // 要保存的内容存储在临时文件中 String tmpFile = fileName + ".tmp"; string2FileNotSafe(str, tmpFile); // 把原来的数据进行备份 String bakFile = fileName + ".bak"; String prevContent = file2String(fileName); if (prevContent != null) { string2FileNotSafe(prevContent, bakFile); } // 删掉源文件 File file = new File(fileName); file.delete(); // 临时文件重命名 file = new File(tmpFile); file.renameTo(new File(fileName)); }
往上返回到这个方法org.apache.rocketmq.broker.BrokerController#registerBrokerAll 若是broker配置信息中有权限设置更新全部的broker信息ide
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); // 若是没有读写权限 if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>(); for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) { TopicConfig tmp = new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), this.brokerConfig.getBrokerPermission()); topicConfigTable.put(topicConfig.getTopicName(), tmp); } topicConfigWrapper.setTopicConfigTable(topicConfigTable); } // 判断broker集群是否须要注册=》 if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.brokerConfig.getRegisterBrokerTimeoutMills())) { // 向全部的broker注册topic配置信息 =》 doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper); } }
进入这个方法org.apache.rocketmq.broker.BrokerController#needRegister 判断broker集群是否须要注册ui
private boolean needRegister(final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final int timeoutMills) { TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); // 判断是否须要注册 =》 List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills); boolean needRegister = false; for (Boolean changed : changeList) { // 只要一个namesrv须要更新就所有须要更新 if (changed) { needRegister = true; break; } } return needRegister; }
进入这个方法org.apache.rocketmq.broker.out.BrokerOuterAPI#needRegister 判断是否须要注册 this
public List<Boolean> needRegister( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final TopicConfigSerializeWrapper topicConfigWrapper, final int timeoutMills) { // 多线程更新是否须要变化状态的集合 final List<Boolean> changedList = new CopyOnWriteArrayList<>(); // 获取namesrv地址 List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0) { final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); // 多线程分发执行,一个namesrv一个线程 for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try { QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader); request.setBody(topicConfigWrapper.getDataVersion().encode()); // broker向namesrv同步查询数据版本 =》 RemotingCommand response = remotingClient.invokeSync(namesrvAddr, request, timeoutMills); DataVersion nameServerDataVersion = null; Boolean changed = false; switch (response.getCode()) { case ResponseCode.SUCCESS: { QueryDataVersionResponseHeader queryDataVersionResponseHeader = (QueryDataVersionResponseHeader) response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class); // 根据版本号判断数据有没有发生变化 changed = queryDataVersionResponseHeader.getChanged(); byte[] body = response.getBody(); if (body != null) { nameServerDataVersion = DataVersion.decode(body, DataVersion.class); if (!topicConfigWrapper.getDataVersion().equals(nameServerDataVersion)) { // 数据版本不一致须要更新 changed = true; } } if (changed == null || changed) { changedList.add(Boolean.TRUE); } } default: break; } log.warn("Query data version from name server {} OK,changed {}, broker {},name server {}", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion); } catch (Exception e) { changedList.add(Boolean.TRUE); log.error("Query data version from name server {} Exception, {}", namesrvAddr, e); } finally { // 异常状况下解除线程阻塞 countDownLatch.countDown(); } } }); } try { // 等到超时解除线程阻塞 countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("query dataversion from nameserver countDownLatch await Exception", e); } } return changedList; }
org.apache.rocketmq.remoting.RemotingClient#invokeSync同步请求处理方法前面介绍过了。
进入到这个方法org.apache.rocketmq.broker.BrokerController#doRegisterBrokerAll 注册全部的broker broker配置信息
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway, TopicConfigSerializeWrapper topicConfigWrapper) { // 向全部的broker进行注册=》 List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll( this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.getHAServerAddr(), topicConfigWrapper, // 过滤的服务 this.filterServerManager.buildNewFilterServerList(), // 单途 oneway, this.brokerConfig.getRegisterBrokerTimeoutMills(), this.brokerConfig.isCompressedRegister()); if (registerBrokerResultList.size() > 0) { RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0); if (registerBrokerResult != null) { if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) { // 更新master地址本地缓存 this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr()); } // 同步设置slave的master地址 this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr()); if (checkOrderConfig) { // 更新订阅的topic配置 =》 this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable()); } } } }
进入这个方法org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll
public List<RegisterBrokerResult> registerBrokerAll( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final boolean oneway, final int timeoutMills, final boolean compressed) { final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList(); // 获取namesrv地址集合 List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0) { final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr); requestHeader.setCompressed(compressed); RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); // 对请求体进行编码=》 final byte[] body = requestBody.encode(compressed); // 压缩处理 final int bodyCrc32 = UtilAll.crc32(body); requestHeader.setBodyCrc32(bodyCrc32); final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); // 并发向namesrv集群注册broker for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try { // 注册broker服务任务分发=》 RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body); if (result != null) { registerBrokerResultList.add(result); } log.info("register broker to name server {} OK", namesrvAddr); } catch (Exception e) { log.warn("registerBroker Exception, {}", namesrvAddr, e); } finally { countDownLatch.countDown(); } } }); } try { countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } } return registerBrokerResultList; }
进入这个方法org.apache.rocketmq.common.protocol.body.RegisterBrokerBody#encode 对topic配置信息进行编码
public byte[] encode(boolean compress) { if (!compress) { // json编码 return super.encode(); } long start = System.currentTimeMillis(); ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); DeflaterOutputStream outputStream = new DeflaterOutputStream(byteArrayOutputStream, new Deflater(Deflater.BEST_COMPRESSION)); DataVersion dataVersion = topicConfigSerializeWrapper.getDataVersion(); ConcurrentMap<String, TopicConfig> topicConfigTable = cloneTopicConfigTable(topicConfigSerializeWrapper.getTopicConfigTable()); assert topicConfigTable != null; try { // 版本json编码 byte[] buffer = dataVersion.encode(); // write data version outputStream.write(convertIntToByteArray(buffer.length)); outputStream.write(buffer); int topicNumber = topicConfigTable.size(); // write number of topic configs outputStream.write(convertIntToByteArray(topicNumber)); // write topic config entry one by one. for (ConcurrentMap.Entry<String, TopicConfig> next : topicConfigTable.entrySet()) { buffer = next.getValue().encode().getBytes(MixAll.DEFAULT_CHARSET); outputStream.write(convertIntToByteArray(buffer.length)); outputStream.write(buffer); } buffer = JSON.toJSONString(filterServerList).getBytes(MixAll.DEFAULT_CHARSET); // write filter server list json length outputStream.write(convertIntToByteArray(buffer.length)); // write filter server list json outputStream.write(buffer); outputStream.finish(); long interval = System.currentTimeMillis() - start; if (interval > 50) { LOGGER.info("Compressing takes {}ms", interval); } return byteArrayOutputStream.toByteArray(); } catch (IOException e) { LOGGER.error("Failed to compress RegisterBrokerBody object", e); } return null; }
往上返回到这个方法org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBroker 注册broker
private RegisterBrokerResult registerBroker( final String namesrvAddr, final boolean oneway, final int timeoutMills, final RegisterBrokerRequestHeader requestHeader, final byte[] body ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader); request.setBody(body); if (oneway) { try { // 单线请求,不关心结果 =》 this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills); } catch (RemotingTooMuchRequestException e) { // Ignore } return null; } // broker同步向namesrv注册broker=》 RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class); RegisterBrokerResult result = new RegisterBrokerResult(); result.setMasterAddr(responseHeader.getMasterAddr()); result.setHaServerAddr(responseHeader.getHaServerAddr()); if (response.getBody() != null) { result.setKvTable(KVTable.decode(response.getBody(), KVTable.class)); } return result; } default: break; } throw new MQBrokerException(response.getCode(), response.getRemark()); }
进入这个方法org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeOneway 执行单途请求
@Override public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { // 获取channel=》 final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { if (this.rpcHook != null) { // 执行请求执行前的钩子方法 this.rpcHook.doBeforeRequest(addr, request); } // 执行单线请求 =》 this.invokeOnewayImpl(channel, request, timeoutMillis); } catch (RemotingSendRequestException e) { log.warn("invokeOneway: send request exception, so close the channel[{}]", addr); // 异常关闭channel=》 this.closeChannel(addr, channel); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); } }
前面介绍过了。
往上返回到这个方法org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync broker同步向namrsrv注册,前面介绍过了。
进入这个方法org.apache.rocketmq.broker.topic.TopicConfigManager#updateOrderTopicConfig 更新topic配置信息
public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) { if (orderKVTableFromNs != null && orderKVTableFromNs.getTable() != null) { boolean isChange = false; Set<String> orderTopics = orderKVTableFromNs.getTable().keySet(); for (String topic : orderTopics) { TopicConfig topicConfig = this.topicConfigTable.get(topic); if (topicConfig != null && !topicConfig.isOrder()) { topicConfig.setOrder(true); isChange = true; log.info("update order topic config, topic={}, order={}", topic, true); } } for (Map.Entry<String, TopicConfig> entry : this.topicConfigTable.entrySet()) { String topic = entry.getKey(); if (!orderTopics.contains(topic)) { TopicConfig topicConfig = entry.getValue(); if (topicConfig.isOrder()) { topicConfig.setOrder(false); isChange = true; log.info("update order topic config, topic={}, order={}", topic, false); } } } if (isChange) { // 更新数据版本号 this.dataVersion.nextVersion(); // 持久化=》 this.persist(); } } }
进入这个方法org.apache.rocketmq.common.ConfigManager#persist 持久化
public synchronized void persist() { // 持久化的是json存储,序列化的时候按版本号维护的数据 =》 String jsonString = this.encode(true); if (jsonString != null) { // user.home/store/config/topics.json 文件存储 =》 String fileName = this.configFilePath(); try { // 保存文件 =》 MixAll.string2File(jsonString, fileName); } catch (IOException e) { log.error("persist file " + fileName + " exception", e); } } }
往上返回到这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#updateBrokerConfig 解析结束。
说在最后
本次解析仅表明我的观点,仅供参考。
加入技术微信群
钉钉技术群