说在前面linux
本次解析NamesrvController的启动过程。apache
源码解析json
进入到这个方法org.apache.rocketmq.namesrv.NamesrvStartup#main0这一行start(controller);进入这个方法org.apache.rocketmq.namesrv.NamesrvStartup#start服务器
public static NamesrvController start(final NamesrvController controller) throws Exception { if (null == controller) { throw new IllegalArgumentException("NamesrvController is null"); } // 初始化namesrv控制器 =》 boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() { @Override public Void call() throws Exception { controller.shutdown(); return null; } })); // namesrv控制器启动 controller.start(); return controller; }
进入这个方法org.apache.rocketmq.namesrv.NamesrvController#initialize NamesrcvController初始化微信
public boolean initialize() { // 加载配置 =》 this.kvConfigManager.load(); // 初始化netty server =》 this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); // 初始化netty执行器 8个线程 this.remotingExecutor = Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); // 注册处理器 =》 this.registerProcessor(); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { // 扫描非活动的broker NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { // Register a listener to reload SslContext try { fileWatchService = new FileWatchService( new String[] { TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath }, new FileWatchService.Listener() { boolean certChanged, keyChanged = false; @Override public void onChanged(String path) { if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) { log.info("The trust certificate changed, reload the ssl context"); reloadServerSslContext(); } if (path.equals(TlsSystemConfig.tlsServerCertPath)) { certChanged = true; } if (path.equals(TlsSystemConfig.tlsServerKeyPath)) { keyChanged = true; } if (certChanged && keyChanged) { log.info("The certificate and private key changed, reload the ssl context"); certChanged = keyChanged = false; reloadServerSslContext(); } } private void reloadServerSslContext() { ((NettyRemotingServer) remotingServer).loadSslContext(); } }); } catch (Exception e) { log.warn("FileWatchService created error, can't load the certificate dynamically"); } } return true; }
进入到这个方法,从持久化文件中加载配置到内存org.apache.rocketmq.namesrv.kvconfig.KVConfigManager#load并发
public void load() { String content = null; try { // 将文件读取成字符串 user.home/namesrv/kvConfig.json content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath()); } catch (IOException e) { log.warn("Load KV config table exception", e); } if (content != null) { KVConfigSerializeWrapper kvConfigSerializeWrapper = KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class); if (null != kvConfigSerializeWrapper) { this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable()); log.info("load KV config table OK"); } } }
进入这个方法org.apache.rocketmq.remoting.netty.NettyRemotingServer#NettyRemotingServer(org.apache.rocketmq.remoting.netty.NettyServerConfig, org.apache.rocketmq.remoting.ChannelEventListener)初始化netty服务器app
public NettyRemotingServer(final NettyServerConfig nettyServerConfig, final ChannelEventListener channelEventListener) { // 用信号量来控制并发数 super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue()); this.serverBootstrap = new ServerBootstrap(); this.nettyServerConfig = nettyServerConfig; this.channelEventListener = channelEventListener; // 默认netty server回调线程是4 int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads(); if (publicThreadNums <= 0) { publicThreadNums = 4; } // 建立核心线程数是4个的线程池 this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet()); } }); // 建立有一个线程的调度线程组 this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet())); } }); // 是否使用netty 的epoll组件,epoll组件在linux环境下能够使netty事件模型性能达到最优 if (useEpoll()) { this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); private int threadTotal = nettyServerConfig.getServerSelectorThreads(); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); } }); } else { // selctor默认是3个线程 this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); private int threadTotal = nettyServerConfig.getServerSelectorThreads(); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); } }); } // 加载ssl上下文 loadSslContext(); }
往上返回进入这个方法org.apache.rocketmq.namesrv.NamesrvController#registerProcessoride
注册处理器oop
private void registerProcessor() { // 是否开启集群测试 if (namesrvConfig.isClusterTest()) { // 注册集群测试请求处理器=》 this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), this.remotingExecutor); } else { // 注册默认的请求处理器 this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.remotingExecutor); } }
进入这个方法org.apache.rocketmq.namesrv.processor.ClusterTestRequestProcessor#ClusterTestRequestProcessor建立集群测试请求处理器性能
public ClusterTestRequestProcessor(NamesrvController namesrvController, String productEnvName) { super(namesrvController); this.productEnvName = productEnvName; adminExt = new DefaultMQAdminExt(); adminExt.setInstanceName("CLUSTER_TEST_NS_INS_" + productEnvName); adminExt.setUnitName(productEnvName); try { // 启动消息队列管理服务启动=》 adminExt.start(); } catch (MQClientException e) { log.error("Failed to start processor", e); } }
进入到这个方法org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#start启动mq默认的管理服务
@Override public void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST://服务只启动,不建立 this.serviceState = ServiceState.START_FAILED; this.defaultMQAdminExt.changeInstanceNameToPID(); // 建立mqclient对象 =》 this.mqClientInstance = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQAdminExt, rpcHook); // 注册管理服务处理器=》 boolean registerOK = mqClientInstance.registerAdminExt(this.defaultMQAdminExt.getAdminExtGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The adminExt group[" + this.defaultMQAdminExt.getAdminExtGroup() + "] has created already, specifed another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } // 启动mqclient =》 mqClientInstance.start(); log.info("the adminExt [{}] start OK", this.defaultMQAdminExt.getAdminExtGroup()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The AdminExt service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } }
未完待续。
说在最后
本次解析仅表明我的观点,仅供参考。
加入技术微信群
钉钉技术群