rocketmq源码解析之NamesrvController启动②mqclient admin请求处理之更新broker配置信息

说在前面apache

今天开始解析管理请求处理之UPDATE_BROKER_CONFIG更新broker配置信息json

 

源码解析缓存

进入这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#updateBrokerConfig 更新broker配置信息微信

private synchronized RemotingCommand updateBrokerConfig(ChannelHandlerContext ctx, RemotingCommand request) {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        log.info("updateBrokerConfig called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
        byte[] body = request.getBody();
        if (body != null) {
            try {
                String bodyStr = new String(body, MixAll.DEFAULT_CHARSET);
                Properties properties = MixAll.string2Properties(bodyStr);
                if (properties != null) {
                    log.info("updateBrokerConfig, new config: [{}] client: {} ", properties, ctx.channel().remoteAddress());
//                    更新持久存储 =》
                    this.brokerController.getConfiguration().update(properties);
//                    若是有权限设置强制更新
                    if (properties.containsKey("brokerPermission")) {
//                        更新数据的版本号
                        this.brokerController.getTopicConfigManager().getDataVersion().nextVersion();
//                        注册全部的broker =》
                        this.brokerController.registerBrokerAll(false, false, true);
                    }
                } else {
                    log.error("string2Properties error");
                    response.setCode(ResponseCode.SYSTEM_ERROR);
                    response.setRemark("string2Properties error");
                    return response;
                }
            } catch (UnsupportedEncodingException e) {
                log.error("", e);
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("UnsupportedEncodingException " + e);
                return response;
            }
        }

        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

进入这个方法org.apache.rocketmq.common.Configuration#update 跟更新配置多线程

public void update(Properties properties) {
        try {
//            读写锁同步实现
            readWriteLock.writeLock().lockInterruptibly();
            try {
                // the property must be exist when update
                mergeIfExist(properties, this.allConfigs);
                for (Object configObject : configObjectList) {
                    // not allConfigs to update...
                    MixAll.properties2Object(properties, configObject);
                }

//                更新数据的版本号
                this.dataVersion.nextVersion();
            } finally {
                readWriteLock.writeLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("update lock error, {}", properties);
            return;
        }

//        持久化=》
        persist();
    }

进入这个方法org.apache.rocketmq.common.Configuration#persist broker配置持久化,从这个类org.apache.rocketmq.broker.BrokerPathConfigHelper的brokerConfigPath变量值是能够知道broker的配置文件路径是并发

private static String brokerConfigPath = System.getProperty("user.home") + File.separator + "store"
    + File.separator + "config" + File.separator + "broker.properties";

进入这个方法org.apache.rocketmq.common.MixAll#string2File文件存储app

public static void string2File(final String str, final String fileName) throws IOException {

//        要保存的内容存储在临时文件中
        String tmpFile = fileName + ".tmp";
        string2FileNotSafe(str, tmpFile);
//        把原来的数据进行备份
        String bakFile = fileName + ".bak";
        String prevContent = file2String(fileName);
        if (prevContent != null) {
            string2FileNotSafe(prevContent, bakFile);
        }

//      删掉源文件
        File file = new File(fileName);
        file.delete();
//       临时文件重命名
        file = new File(tmpFile);
        file.renameTo(new File(fileName));
    }

往上返回到这个方法org.apache.rocketmq.broker.BrokerController#registerBrokerAll 若是broker配置信息中有权限设置更新全部的broker信息ide

public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {
        TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
//        若是没有读写权限
        if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
            || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
            ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
            for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
                TopicConfig tmp =
                    new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                        this.brokerConfig.getBrokerPermission());
                topicConfigTable.put(topicConfig.getTopicName(), tmp);
            }
            topicConfigWrapper.setTopicConfigTable(topicConfigTable);
        }

//        判断broker集群是否须要注册=》
        if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId(),
            this.brokerConfig.getRegisterBrokerTimeoutMills())) {
//            向全部的broker注册topic配置信息 =》
            doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
        }
    }

进入这个方法org.apache.rocketmq.broker.BrokerController#needRegister 判断broker集群是否须要注册ui

private boolean needRegister(final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final int timeoutMills) {

        TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
//        判断是否须要注册 =》
        List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills);
        boolean needRegister = false;
        for (Boolean changed : changeList) {
//            只要一个namesrv须要更新就所有须要更新
            if (changed) {
                needRegister = true;
                break;
            }
        }
        return needRegister;
    }

进入这个方法org.apache.rocketmq.broker.out.BrokerOuterAPI#needRegister 判断是否须要注册 this

public List<Boolean> needRegister(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final int timeoutMills) {
//        多线程更新是否须要变化状态的集合
        final List<Boolean> changedList = new CopyOnWriteArrayList<>();
//        获取namesrv地址
        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
            final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
//            多线程分发执行,一个namesrv一个线程
            for (final String namesrvAddr : nameServerAddressList) {
                brokerOuterExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader();
                            requestHeader.setBrokerAddr(brokerAddr);
                            requestHeader.setBrokerId(brokerId);
                            requestHeader.setBrokerName(brokerName);
                            requestHeader.setClusterName(clusterName);
                            RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);
                            request.setBody(topicConfigWrapper.getDataVersion().encode());
//                            broker向namesrv同步查询数据版本 =》
                            RemotingCommand response = remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
                            DataVersion nameServerDataVersion = null;
                            Boolean changed = false;
                            switch (response.getCode()) {
                                case ResponseCode.SUCCESS: {
                                    QueryDataVersionResponseHeader queryDataVersionResponseHeader =
                                        (QueryDataVersionResponseHeader) response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
//                                   根据版本号判断数据有没有发生变化
                                    changed = queryDataVersionResponseHeader.getChanged();
                                    byte[] body = response.getBody();
                                    if (body != null) {
                                        nameServerDataVersion = DataVersion.decode(body, DataVersion.class);
                                        if (!topicConfigWrapper.getDataVersion().equals(nameServerDataVersion)) {
//                                            数据版本不一致须要更新
                                            changed = true;
                                        }
                                    }
                                    if (changed == null || changed) {
                                        changedList.add(Boolean.TRUE);
                                    }
                                }
                                default:
                                    break;
                            }
                            log.warn("Query data version from name server {} OK,changed {}, broker {},name server {}", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion);
                        } catch (Exception e) {
                            changedList.add(Boolean.TRUE);
                            log.error("Query data version from name server {}  Exception, {}", namesrvAddr, e);
                        } finally {
//                            异常状况下解除线程阻塞
                            countDownLatch.countDown();
                        }
                    }
                });
            }
            try {
//                等到超时解除线程阻塞
                countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                log.error("query dataversion from nameserver countDownLatch await Exception", e);
            }
        }
        return changedList;
    }

org.apache.rocketmq.remoting.RemotingClient#invokeSync同步请求处理方法前面介绍过了。

进入到这个方法org.apache.rocketmq.broker.BrokerController#doRegisterBrokerAll 注册全部的broker broker配置信息

private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
        TopicConfigSerializeWrapper topicConfigWrapper) {
//        向全部的broker进行注册=》
        List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
            this.brokerConfig.getBrokerClusterName(),
            this.getBrokerAddr(),
            this.brokerConfig.getBrokerName(),
            this.brokerConfig.getBrokerId(),
            this.getHAServerAddr(),
            topicConfigWrapper,
//            过滤的服务
            this.filterServerManager.buildNewFilterServerList(),
//            单途
            oneway,
            this.brokerConfig.getRegisterBrokerTimeoutMills(),
            this.brokerConfig.isCompressedRegister());
        if (registerBrokerResultList.size() > 0) {
            RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
            if (registerBrokerResult != null) {
                if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
//                    更新master地址本地缓存
                    this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
                }

//                同步设置slave的master地址
                this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
                if (checkOrderConfig) {
//                   更新订阅的topic配置 =》
                    this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
                }
            }
        }
    }

进入这个方法org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll

public List<RegisterBrokerResult> registerBrokerAll(
        final String clusterName,
        final String brokerAddr,
        final String brokerName,
        final long brokerId,
        final String haServerAddr,
        final TopicConfigSerializeWrapper topicConfigWrapper,
        final List<String> filterServerList,
        final boolean oneway,
        final int timeoutMills,
        final boolean compressed) {

        final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
//        获取namesrv地址集合
        List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
        if (nameServerAddressList != null && nameServerAddressList.size() > 0) {

            final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
            requestHeader.setBrokerAddr(brokerAddr);
            requestHeader.setBrokerId(brokerId);
            requestHeader.setBrokerName(brokerName);
            requestHeader.setClusterName(clusterName);
            requestHeader.setHaServerAddr(haServerAddr);
            requestHeader.setCompressed(compressed);
            RegisterBrokerBody requestBody = new RegisterBrokerBody();
            requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
            requestBody.setFilterServerList(filterServerList);
//            对请求体进行编码=》
            final byte[] body = requestBody.encode(compressed);
//            压缩处理
            final int bodyCrc32 = UtilAll.crc32(body);
            requestHeader.setBodyCrc32(bodyCrc32);
            final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
//            并发向namesrv集群注册broker
            for (final String namesrvAddr : nameServerAddressList) {
                brokerOuterExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
//                            注册broker服务任务分发=》
                            RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                            if (result != null) {
                                registerBrokerResultList.add(result);
                            }

                            log.info("register broker to name server {} OK", namesrvAddr);
                        } catch (Exception e) {
                            log.warn("registerBroker Exception, {}", namesrvAddr, e);
                        } finally {
                            countDownLatch.countDown();
                        }
                    }
                });
            }

            try {
                countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
        }

        return registerBrokerResultList;
    }

进入这个方法org.apache.rocketmq.common.protocol.body.RegisterBrokerBody#encode 对topic配置信息进行编码

public byte[] encode(boolean compress) {

        if (!compress) {
//            json编码
            return super.encode();
        }
        long start = System.currentTimeMillis();
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DeflaterOutputStream outputStream = new DeflaterOutputStream(byteArrayOutputStream, new Deflater(Deflater.BEST_COMPRESSION));
        DataVersion dataVersion = topicConfigSerializeWrapper.getDataVersion();
        ConcurrentMap<String, TopicConfig> topicConfigTable = cloneTopicConfigTable(topicConfigSerializeWrapper.getTopicConfigTable());
        assert topicConfigTable != null;
        try {
//            版本json编码
            byte[] buffer = dataVersion.encode();
            // write data version
            outputStream.write(convertIntToByteArray(buffer.length));
            outputStream.write(buffer);
            int topicNumber = topicConfigTable.size();
            // write number of topic configs
            outputStream.write(convertIntToByteArray(topicNumber));
            // write topic config entry one by one.
            for (ConcurrentMap.Entry<String, TopicConfig> next : topicConfigTable.entrySet()) {
                buffer = next.getValue().encode().getBytes(MixAll.DEFAULT_CHARSET);
                outputStream.write(convertIntToByteArray(buffer.length));
                outputStream.write(buffer);
            }

            buffer = JSON.toJSONString(filterServerList).getBytes(MixAll.DEFAULT_CHARSET);
            // write filter server list json length
            outputStream.write(convertIntToByteArray(buffer.length));
            // write filter server list json
            outputStream.write(buffer);
            outputStream.finish();
            long interval = System.currentTimeMillis() - start;
            if (interval > 50) {
                LOGGER.info("Compressing takes {}ms", interval);
            }
            return byteArrayOutputStream.toByteArray();
        } catch (IOException e) {
            LOGGER.error("Failed to compress RegisterBrokerBody object", e);
        }

        return null;
    }

往上返回到这个方法org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBroker 注册broker

private RegisterBrokerResult registerBroker(
        final String namesrvAddr,
        final boolean oneway,
        final int timeoutMills,
        final RegisterBrokerRequestHeader requestHeader,
        final byte[] body
    ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
        InterruptedException {
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
        request.setBody(body);
        if (oneway) {
            try {
//                单线请求,不关心结果 =》
                this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
            } catch (RemotingTooMuchRequestException e) {
                // Ignore
            }
            return null;
        }

//        broker同步向namesrv注册broker=》
        RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                RegisterBrokerResponseHeader responseHeader =
                    (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
                RegisterBrokerResult result = new RegisterBrokerResult();
                result.setMasterAddr(responseHeader.getMasterAddr());
                result.setHaServerAddr(responseHeader.getHaServerAddr());
                if (response.getBody() != null) {
                    result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
                }
                return result;
            }
            default:
                break;
        }

        throw new MQBrokerException(response.getCode(), response.getRemark());
    }

进入这个方法org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeOneway 执行单途请求

@Override
    public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
        RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
//        获取channel=》
        final Channel channel = this.getAndCreateChannel(addr);
        if (channel != null && channel.isActive()) {
            try {
                if (this.rpcHook != null) {
//                    执行请求执行前的钩子方法
                    this.rpcHook.doBeforeRequest(addr, request);
                }
//               执行单线请求 =》
                this.invokeOnewayImpl(channel, request, timeoutMillis);
            } catch (RemotingSendRequestException e) {
                log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
//                异常关闭channel=》
                this.closeChannel(addr, channel);
                throw e;
            }
        } else {
            this.closeChannel(addr, channel);
            throw new RemotingConnectException(addr);
        }
    }

前面介绍过了。

往上返回到这个方法org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync broker同步向namrsrv注册,前面介绍过了。

 

进入这个方法org.apache.rocketmq.broker.topic.TopicConfigManager#updateOrderTopicConfig 更新topic配置信息

public void updateOrderTopicConfig(final KVTable orderKVTableFromNs) {

        if (orderKVTableFromNs != null && orderKVTableFromNs.getTable() != null) {
            boolean isChange = false;
            Set<String> orderTopics = orderKVTableFromNs.getTable().keySet();
            for (String topic : orderTopics) {
                TopicConfig topicConfig = this.topicConfigTable.get(topic);
                if (topicConfig != null && !topicConfig.isOrder()) {
                    topicConfig.setOrder(true);
                    isChange = true;
                    log.info("update order topic config, topic={}, order={}", topic, true);
                }
            }

            for (Map.Entry<String, TopicConfig> entry : this.topicConfigTable.entrySet()) {
                String topic = entry.getKey();
                if (!orderTopics.contains(topic)) {
                    TopicConfig topicConfig = entry.getValue();
                    if (topicConfig.isOrder()) {
                        topicConfig.setOrder(false);
                        isChange = true;
                        log.info("update order topic config, topic={}, order={}", topic, false);
                    }
                }
            }

            if (isChange) {
//                更新数据版本号
                this.dataVersion.nextVersion();
//                持久化=》
                this.persist();
            }
        }
    }

进入这个方法org.apache.rocketmq.common.ConfigManager#persist 持久化

public synchronized void persist() {
//        持久化的是json存储,序列化的时候按版本号维护的数据 =》
        String jsonString = this.encode(true);
        if (jsonString != null) {
//             user.home/store/config/topics.json 文件存储 =》
            String fileName = this.configFilePath();
            try {
//                保存文件 =》
                MixAll.string2File(jsonString, fileName);
            } catch (IOException e) {
                log.error("persist file " + fileName + " exception", e);
            }
        }
    }

往上返回到这个方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#updateBrokerConfig 解析结束。

 

说在最后

本次解析仅表明我的观点,仅供参考。

 

加入技术微信群

钉钉技术群

相关文章
相关标签/搜索