说在前面java
主要解析namrsrv启动部分,namesrv配置加载、netty server建立、注册出处理器。apache
正文缓存
源码解析namesrv启动app
入口找到这个方法org.apache.rocketmq.namesrv.NamesrvStartup#main0这行代码oop
(controller)
进入到这个方法ui
org.apache.rocketmq.namesrv.NamesrvStartup#startspa
NamesrvController (NamesrvController controller) Exception { (== controller) { IllegalArgumentException()} initResult = controller.initialize()(!initResult) { controller.shutdown()System.(-)} Runtime.().addShutdownHook(ShutdownHookThread(Callable<Void>() { Void () Exception { .shutdown()} }))controller.start()controller}
先看这行代码.net
initResult = controller.initialize()
进入这个方法netty
org.apache.rocketmq.namesrv.NamesrvController#initializeserver
..load()
进入这个方法org.apache.rocketmq.namesrv.kvconfig.KVConfigManager#load
() { String content = { content = MixAll.(..getNamesrvConfig().getKvConfigPath())} (IOException e) { .warn(e)} (content != ) { KVConfigSerializeWrapper kvConfigSerializeWrapper = KVConfigSerializeWrapper.(contentKVConfigSerializeWrapper.)(!= kvConfigSerializeWrapper) { ..putAll(kvConfigSerializeWrapper.getConfigTable()).info()} } }
进入这个方法
.= NettyRemotingServer(..)
进入这个方法org.apache.rocketmq.remoting.netty.NettyRemotingServer#NettyRemotingServer(org.apache.rocketmq.remoting.netty.NettyServerConfig, org.apache.rocketmq.remoting.ChannelEventListener)建立netty server
(NettyServerConfig nettyServerConfigChannelEventListener channelEventListener) { (nettyServerConfig.getServerOnewaySemaphoreValue()nettyServerConfig.getServerAsyncSemaphoreValue()).= ServerBootstrap().= nettyServerConfig.= channelEventListenerpublicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads()(publicThreadNums <= ) { publicThreadNums = } .= Executors.(publicThreadNumsThreadFactory() { AtomicInteger = AtomicInteger()Thread (Runnable r) { Thread(r+ ..incrementAndGet())} }).= NioEventLoopGroup(ThreadFactory() { AtomicInteger = AtomicInteger()Thread (Runnable r) { Thread(rString.(..incrementAndGet()))} })(useEpoll()) { .= EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads()ThreadFactory() { AtomicInteger = AtomicInteger()= .getServerSelectorThreads()Thread (Runnable r) { Thread(rString.(..incrementAndGet()))} })} { .= NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads()ThreadFactory() { AtomicInteger = AtomicInteger()= .getServerSelectorThreads()Thread (Runnable r) { Thread(rString.(..incrementAndGet()))} })} loadSslContext()}
.registerProcessor()
进入这个方法org.apache.rocketmq.namesrv.NamesrvController#registerProcessor
() { (.isClusterTest()) { ..registerDefaultProcessor(ClusterTestRequestProcessor(.getProductEnvName()).)} { ..registerDefaultProcessor(DefaultRequestProcessor().)} }
进入这个方法org.apache.rocketmq.namesrv.processor.ClusterTestRequestProcessor#ClusterTestRequestProcessor建立集群请求控制器
(NamesrvController namesrvControllerString productEnvName) { (namesrvController).= productEnvName= DefaultMQAdminExt().setInstanceName(+ productEnvName).setUnitName(productEnvName){ .start()} (MQClientException e) { .error(e)} }
进入这个方法org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#start
() MQClientException { (.) { :.= ServiceState...changeInstanceNameToPID().= MQClientManager.().getAndCreateMQClientInstance(.)registerOK = .registerAdminExt(..getAdminExtGroup())(!registerOK) { .= ServiceState.MQClientException(+ ..getAdminExtGroup() + + FAQUrl.(FAQUrl.))} .start().info(..getAdminExtGroup()).= ServiceState.: : : MQClientException(+ .+ FAQUrl.(FAQUrl.)): } }
.= MQClientManager.().getAndCreateMQClientInstance(.)
进入这个方法org.apache.rocketmq.client.impl.MQClientManager#getAndCreateMQClientInstance(org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)
MQClientInstance (ClientConfig clientConfigRPCHook rpcHook) { String clientId = clientConfig.buildMQClientId()MQClientInstance instance = ..get(clientId)(== instance) { instance = MQClientInstance(clientConfig.cloneClientConfig()..getAndIncrement()clientIdrpcHook)MQClientInstance prev = ..putIfAbsent(clientIdinstance)(prev != ) { instance = prev.warn(clientId)} { .info(clientId)} } instance}
从缓存中获取,若是缓存中没有就建立后在放到缓存中。
进入这个方法建立mqclient对象
org.apache.rocketmq.client.impl.factory.MQClientInstance#MQClientInstance(org.apache.rocketmq.client.ClientConfig, int, java.lang.String, org.apache.rocketmq.remoting.RPCHook)
(ClientConfig clientConfiginstanceIndexString clientIdRPCHook rpcHook) { .= clientConfig.= instanceIndex.= NettyClientConfig()..setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads())..setUseTLS(clientConfig.isUseTLS()).= ClientRemotingProcessor().= MQClientAPIImpl(..rpcHookclientConfig)(..getNamesrvAddr() != ) { ..updateNameServerAddressList(..getNamesrvAddr()).info(..getNamesrvAddr())} .= clientId.= MQAdminImpl().= PullMessageService().= RebalanceService().= DefaultMQProducer(MixAll.)..resetClientConfig(clientConfig).= ConsumerStatsManager(.).info(...MQVersion.(MQVersion.)RemotingCommand.())}
说在最后
本篇介绍namesrv启动源码解析,仅表明我的观点,欢迎一块儿讨论交流。
关注“天河聊技术”公众号
加群讨论