rocketmq源码解析之name启动(一)

 

说在前面java

主要解析namrsrv启动部分,namesrv配置加载、netty server建立、注册出处理器。apache

 

正文缓存

源码解析namesrv启动app

入口找到这个方法org.apache.rocketmq.namesrv.NamesrvStartup#main0这行代码oop

(controller)

进入到这个方法ui

org.apache.rocketmq.namesrv.NamesrvStartup#startspa

    NamesrvController (NamesrvController controller) Exception {

        (== controller) {
            IllegalArgumentException()}

initResult = controller.initialize()(!initResult) {
            controller.shutdown()System.(-)}

        Runtime.().addShutdownHook(ShutdownHookThread(Callable<Void>() {
            Void () Exception {
                .shutdown()}
        }))controller.start()controller}

先看这行代码.net

initResult = controller.initialize()

进入这个方法netty

org.apache.rocketmq.namesrv.NamesrvController#initializeserver

..load()

进入这个方法org.apache.rocketmq.namesrv.kvconfig.KVConfigManager#load

    () {
        String content = {
content = MixAll.(..getNamesrvConfig().getKvConfigPath())} (IOException e) {
            .warn(e)}
        (content != ) {
            KVConfigSerializeWrapper kvConfigSerializeWrapper =
                KVConfigSerializeWrapper.(contentKVConfigSerializeWrapper.)(!= kvConfigSerializeWrapper) {
                ..putAll(kvConfigSerializeWrapper.getConfigTable()).info()}
        }
    }

进入这个方法

.= NettyRemotingServer(..)

进入这个方法org.apache.rocketmq.remoting.netty.NettyRemotingServer#NettyRemotingServer(org.apache.rocketmq.remoting.netty.NettyServerConfig, org.apache.rocketmq.remoting.ChannelEventListener)建立netty server

    (NettyServerConfig nettyServerConfigChannelEventListener channelEventListener) {
(nettyServerConfig.getServerOnewaySemaphoreValue()nettyServerConfig.getServerAsyncSemaphoreValue()).= ServerBootstrap().= nettyServerConfig.= channelEventListenerpublicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads()(publicThreadNums <= ) {
            publicThreadNums = }

.= Executors.(publicThreadNumsThreadFactory() {
            AtomicInteger = AtomicInteger()Thread (Runnable r) {
                Thread(r+ ..incrementAndGet())}
        }).= NioEventLoopGroup(ThreadFactory() {
            AtomicInteger = AtomicInteger()Thread (Runnable r) {
                Thread(rString.(..incrementAndGet()))}
        })(useEpoll()) {
            .= EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads()ThreadFactory() {
                AtomicInteger = AtomicInteger()= .getServerSelectorThreads()Thread (Runnable r) {
                    Thread(rString.(..incrementAndGet()))}
            })} {
.= NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads()ThreadFactory() {
                AtomicInteger = AtomicInteger()= .getServerSelectorThreads()Thread (Runnable r) {
                    Thread(rString.(..incrementAndGet()))}
            })}

loadSslContext()}
.registerProcessor()

进入这个方法org.apache.rocketmq.namesrv.NamesrvController#registerProcessor

    () {
(.isClusterTest()) {

            ..registerDefaultProcessor(ClusterTestRequestProcessor(.getProductEnvName()).)} {

            ..registerDefaultProcessor(DefaultRequestProcessor().)}
    }

进入这个方法org.apache.rocketmq.namesrv.processor.ClusterTestRequestProcessor#ClusterTestRequestProcessor建立集群请求控制器

(NamesrvController namesrvControllerString productEnvName) {
    (namesrvController).= productEnvName= DefaultMQAdminExt().setInstanceName(+ productEnvName).setUnitName(productEnvName){
        .start()} (MQClientException e) {
        .error(e)}
}

进入这个方法org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#start

() MQClientException {
    (.) {
        :.= ServiceState...changeInstanceNameToPID().= MQClientManager.().getAndCreateMQClientInstance(.)registerOK = .registerAdminExt(..getAdminExtGroup())(!registerOK) {
                .= ServiceState.MQClientException(+ ..getAdminExtGroup()
                    + + FAQUrl.(FAQUrl.))}

            .start().info(..getAdminExtGroup()).= ServiceState.:
        :
        :
            MQClientException(+ .+ FAQUrl.(FAQUrl.)):
            }
}
.= MQClientManager.().getAndCreateMQClientInstance(.)

进入这个方法org.apache.rocketmq.client.impl.MQClientManager#getAndCreateMQClientInstance(org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)

  MQClientInstance (ClientConfig clientConfigRPCHook rpcHook) {
        String clientId = clientConfig.buildMQClientId()MQClientInstance instance = ..get(clientId)(== instance) {
            instance =
                MQClientInstance(clientConfig.cloneClientConfig()..getAndIncrement()clientIdrpcHook)MQClientInstance prev = ..putIfAbsent(clientIdinstance)(prev != ) {
                instance = prev.warn(clientId)} {
                .info(clientId)}
        }

        instance}

从缓存中获取,若是缓存中没有就建立后在放到缓存中。

进入这个方法建立mqclient对象

org.apache.rocketmq.client.impl.factory.MQClientInstance#MQClientInstance(org.apache.rocketmq.client.ClientConfig, int, java.lang.String, org.apache.rocketmq.remoting.RPCHook)

(ClientConfig clientConfiginstanceIndexString clientIdRPCHook rpcHook) {
    .= clientConfig.= instanceIndex.= NettyClientConfig()..setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads())..setUseTLS(clientConfig.isUseTLS()).= ClientRemotingProcessor().= MQClientAPIImpl(..rpcHookclientConfig)(..getNamesrvAddr() != ) {
        ..updateNameServerAddressList(..getNamesrvAddr()).info(..getNamesrvAddr())}

    .= clientId.= MQAdminImpl().= PullMessageService().= RebalanceService().= DefaultMQProducer(MixAll.)..resetClientConfig(clientConfig).= ConsumerStatsManager(.).info(...MQVersion.(MQVersion.)RemotingCommand.())}

 

说在最后

本篇介绍namesrv启动源码解析,仅表明我的观点,欢迎一块儿讨论交流。

关注“天河聊技术”公众号

 

 

加群讨论

 

相关文章
相关标签/搜索