spring的finishRefresh中,有个步骤是回调监听事件。因为实现了ApplicationListener
接口,因此回调onApplicationEvent
方法。spring
没有暴露过且没有取消暴露apache
public void onApplicationEvent(ContextRefreshedEvent event) { if (!isExported() && !isUnexported()) { if (logger.isInfoEnabled()) { logger.info("The service ready on spring started. service: " + getInterface()); } export(); } } public void export() { super.export(); // Publish ServiceBeanExportedEvent publishExportEvent(); } private void publishExportEvent() { ServiceBeanExportedEvent exportEvent = new ServiceBeanExportedEvent(this); applicationEventPublisher.publishEvent(exportEvent); }
服务的暴露,在ServiceConfig中segmentfault
public synchronized void export() { // 各类配置的更新 checkAndUpdateSubConfigs(); if (!shouldExport()) { return; } // 延迟暴露 if (shouldDelay()) { DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS); } else { // 服务暴露 doExport(); } }
public void checkAndUpdateSubConfigs() { // Use default configs defined explicitly on global configs // 设置registries、monitor的优先级,优先级application<module<provider // 除了上面两个,还有的设置configCenter、protocols completeCompoundConfigs(); // Config Center should always being started first. // 设置配置中心,而且把CompositeConfiguration的配置设置到Application、Monitor、Module、Protocols、Registries、Providers、Consumers中 startConfigCenter(); // 检查provider是否存在,若是不存在,就默认一个,并调用refresh方法 checkDefault(); // 若是没有设置protocols,会从provider中取protocols,并设置ProtocolConfig及调用refresh方法 checkProtocol(); // 检查provider是否存在,若是不存在,就默认一个,并调用refresh方法。再根据SHUTDOWN_WAIT_KEY和SHUTDOWN_WAIT_SECONDS_KEY用于停机 checkApplication(); // if protocol is not injvm checkRegistry // 若是没有injvm协议,就去检查Registry if (!isOnlyInJvm()) { // 若是registries为空,就从dubbo.registry.address获取地址,并设置RegistryConfig及调用refresh方法 // 这里会把注册中心的配置,赋值给配置中心,并调用refresh方法 // 刷新后会调用prepareEnvironment方法,这里经过SPI的方式获取注册中心 checkRegistry(); } // 刷新ServiceConfig this.refresh(); // 检查metadataReportConfig是否存在,若是不存在,就默认MetadataReportConfig,并调用refresh方法。 checkMetadataReport(); // 必须有interfaceName if (StringUtils.isEmpty(interfaceName)) { throw new IllegalStateException("<dubbo:service interface=\"\" /> interface not allow null!"); } // 泛化 if (ref instanceof GenericService) { interfaceClass = GenericService.class; if (StringUtils.isEmpty(generic)) { generic = Boolean.TRUE.toString(); } } else { try { interfaceClass = Class.forName(interfaceName, true, Thread.currentThread() .getContextClassLoader()); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } // 检查接口和方法 checkInterfaceAndMethods(interfaceClass, methods); // 检查ref是否有继承关系 checkRef(); generic = Boolean.FALSE.toString(); } if (local != null) { if (Boolean.TRUE.toString().equals(local)) { local = interfaceName + "Local"; } Class<?> localClass; try { localClass = ClassUtils.forNameWithThreadContextClassLoader(local); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } if (!interfaceClass.isAssignableFrom(localClass)) { throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName); } } if (stub != null) { if (Boolean.TRUE.toString().equals(stub)) { stub = interfaceName + "Stub"; } Class<?> stubClass; try { stubClass = ClassUtils.forNameWithThreadContextClassLoader(stub); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } if (!interfaceClass.isAssignableFrom(stubClass)) { throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + interfaceName); } } checkStubAndLocal(interfaceClass); checkMock(interfaceClass); } protected synchronized void doExport() { // 已经暴露了,就再也不暴露 if (unexported) { throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!"); } if (exported) { return; } exported = true; if (StringUtils.isEmpty(path)) { path = interfaceName; } doExportUrls(); }
设置registries、monitor的优先级,优先级application<module<providerapp
private void completeCompoundConfigs() { if (provider != null) { if (application == null) { setApplication(provider.getApplication()); } if (module == null) { setModule(provider.getModule()); } if (registries == null) { setRegistries(provider.getRegistries()); } if (monitor == null) { setMonitor(provider.getMonitor()); } if (protocols == null) { setProtocols(provider.getProtocols()); } if (configCenter == null) { setConfigCenter(provider.getConfigCenter()); } } if (module != null) { // 为空说明上面还没设置,这里能够设置。若是不为空,说明上面设置了,就不能设置。 if (registries == null) { setRegistries(module.getRegistries()); } if (monitor == null) { setMonitor(module.getMonitor()); } } if (application != null) { if (registries == null) { setRegistries(application.getRegistries()); } if (monitor == null) { setMonitor(application.getMonitor()); } } }
void startConfigCenter() { // 为空,从饿汉单例获取 if (configCenter == null) { ConfigManager.getInstance().getConfigCenter().ifPresent(cc -> this.configCenter = cc); } // 不为空,刷新并环境准备、检查 if (this.configCenter != null) { // TODO there may have duplicate refresh this.configCenter.refresh(); prepareEnvironment(); } // 刷新ApplicationConfig、MonitorConfig、ModuleConfig、ProtocolConfig、RegistryConfig、ProviderConfig、ConsumerConfig ConfigManager.getInstance().refreshAll(); }
private void prepareEnvironment() { // 检查address、protocol的合法性 if (configCenter.isValid()) { // 已经初始化的就再也不初始化 if (!configCenter.checkOrUpdateInited()) { return; } // 获取配置中心(这里就有用到SPI) DynamicConfiguration dynamicConfiguration = getDynamicConfiguration(configCenter.toUrl()); // 从配置中心获取配置 String configContent = dynamicConfiguration.getProperties(configCenter.getConfigFile(), configCenter.getGroup()); String appGroup = application != null ? application.getName() : null; String appConfigContent = null; if (StringUtils.isNotEmpty(appGroup)) { appConfigContent = dynamicConfiguration.getProperties (StringUtils.isNotEmpty(configCenter.getAppConfigFile()) ? configCenter.getAppConfigFile() : configCenter.getConfigFile(), appGroup ); } try { // 是否最高优先级 Environment.getInstance().setConfigCenterFirst(configCenter.isHighestPriority()); // 把configContent的Properties格式转为map并存入externalConfigurationMap Environment.getInstance().updateExternalConfigurationMap(parseProperties(configContent)); // 把appConfigContent的Properties格式转为map并存入appExternalConfigurationMap Environment.getInstance().updateAppExternalConfigurationMap(parseProperties(appConfigContent)); } catch (IOException e) { throw new IllegalStateException("Failed to parse configurations from Config Center.", e); } } }
刷新ApplicationConfig、MonitorConfig、ModuleConfig、ProtocolConfig、RegistryConfig、ProviderConfig、ConsumerConfigjvm
public void refreshAll() { // refresh all configs here, getApplication().ifPresent(ApplicationConfig::refresh); getMonitor().ifPresent(MonitorConfig::refresh); getModule().ifPresent(ModuleConfig::refresh); getProtocols().values().forEach(ProtocolConfig::refresh); getRegistries().values().forEach(RegistryConfig::refresh); getProviders().values().forEach(ProviderConfig::refresh); getConsumers().values().forEach(ConsumerConfig::refresh); }
若是没有provider,则建立一个,并refreshide
private void checkDefault() { createProviderIfAbsent(); } private void createProviderIfAbsent() { if (provider != null) { return; } setProvider( ConfigManager.getInstance() .getDefaultProvider() .orElseGet(() -> { ProviderConfig providerConfig = new ProviderConfig(); providerConfig.refresh(); return providerConfig; }) ); }
设置Protocolthis
private void checkProtocol() { // 若是protocols为空,从provider取 if (CollectionUtils.isEmpty(protocols) && provider != null) { setProtocols(provider.getProtocols()); } convertProtocolIdsToProtocols(); } private void convertProtocolIdsToProtocols() { if (StringUtils.isEmpty(protocolIds) && CollectionUtils.isEmpty(protocols)) { List<String> configedProtocols = new ArrayList<>(); configedProtocols.addAll(getSubProperties(Environment.getInstance() .getExternalConfigurationMap(), PROTOCOLS_SUFFIX)); configedProtocols.addAll(getSubProperties(Environment.getInstance() .getAppExternalConfigurationMap(), PROTOCOLS_SUFFIX)); protocolIds = String.join(",", configedProtocols); } // 若是仍是没有ProtocolConfig,则建立一个并refresh if (StringUtils.isEmpty(protocolIds)) { if (CollectionUtils.isEmpty(protocols)) { setProtocols( ConfigManager.getInstance().getDefaultProtocols() .filter(CollectionUtils::isNotEmpty) .orElseGet(() -> { ProtocolConfig protocolConfig = new ProtocolConfig(); protocolConfig.refresh(); return new ArrayList<>(Arrays.asList(protocolConfig)); }) ); } } else { // id赋值后refresh String[] arr = COMMA_SPLIT_PATTERN.split(protocolIds); List<ProtocolConfig> tmpProtocols = CollectionUtils.isNotEmpty(protocols) ? protocols : new ArrayList<>(); Arrays.stream(arr).forEach(id -> { if (tmpProtocols.stream().noneMatch(prot -> prot.getId().equals(id))) { tmpProtocols.add(ConfigManager.getInstance().getProtocol(id).orElseGet(() -> { ProtocolConfig protocolConfig = new ProtocolConfig(); protocolConfig.setId(id); protocolConfig.refresh(); return protocolConfig; })); } }); if (tmpProtocols.size() > arr.length) { throw new IllegalStateException("Too much protocols found, the protocols comply to this service are :" + protocolIds + " but got " + protocols .size() + " registries!"); } setProtocols(tmpProtocols); } }
protected void checkApplication() { // for backward compatibility // 若是没有Application,则建立一个并refresh createApplicationIfAbsent(); // 验证name是否为空 if (!application.isValid()) { throw new IllegalStateException("No application config found or it's not a valid config! " + "Please add <dubbo:application name=\"...\" /> to your spring config."); } // 把name赋值给ApplicationModel的Application ApplicationModel.setApplication(application.getName()); // backward compatibility // 用于优雅停机 String wait = ConfigUtils.getProperty(SHUTDOWN_WAIT_KEY); if (wait != null && wait.trim().length() > 0) { System.setProperty(SHUTDOWN_WAIT_KEY, wait.trim()); } else { wait = ConfigUtils.getProperty(SHUTDOWN_WAIT_SECONDS_KEY); if (wait != null && wait.trim().length() > 0) { System.setProperty(SHUTDOWN_WAIT_SECONDS_KEY, wait.trim()); } } }
protected void checkRegistry() { //加载RegistryConfig loadRegistriesFromBackwardConfig(); //加载RegistryConfig convertRegistryIdsToRegistries(); // 验证address for (RegistryConfig registryConfig : registries) { if (!registryConfig.isValid()) { throw new IllegalStateException("No registry config found or it's not a valid config! " + "The registry config is: " + registryConfig); } } useRegistryForConfigIfNecessary(); } private void useRegistryForConfigIfNecessary() { registries.stream().filter(RegistryConfig::isZookeeperProtocol).findFirst().ifPresent(rc -> { // we use the loading status of DynamicConfiguration to decide whether ConfigCenter has been initiated. Environment.getInstance().getDynamicConfiguration().orElseGet(() -> { ConfigManager configManager = ConfigManager.getInstance(); // 建立ConfigCenterConfig ConfigCenterConfig cc = configManager.getConfigCenter().orElse(new ConfigCenterConfig()); // 设置参数 if (cc.getParameters() == null) { cc.setParameters(new HashMap<>()); } if (rc.getParameters() != null) { cc.getParameters().putAll(rc.getParameters()); } cc.getParameters().put(org.apache.dubbo.remoting.Constants.CLIENT_KEY,rc.getClient()); cc.setProtocol(rc.getProtocol()); cc.setAddress(rc.getAddress()); cc.setUsername(rc.getUsername()); cc.setPassword(rc.getPassword()); cc.setHighestPriority(false); //设置ConfigCenterConfig setConfigCenter(cc); //刷新配置 startConfigCenter(); return null; }); }); }
配置信息都好了,后面看看怎么暴露的code