context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-provider.xml"}); // 删除了一些步骤 public void refresh() throws BeansException, IllegalStateException { synchronized (this.startupShutdownMonitor) { try { // 1. 里面的核心代码就是初始化了applicationEventMulticaster,用于后面发布事件使用 // this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory); initApplicationEventMulticaster(); // 2. 初始化非延迟加载的bean,这里就会初始化dubbo配置的一些bean,包括ServiceBean,用于服务导出 finishBeanFactoryInitialization(beanFactory); // 3. 发布容器刷新事件,这里面是服务导出的入口 finishRefresh(); } } }
// 步骤2分析 // 这里Spring容器会初始化非延迟加载的bean,包括<dubbo:service/>表示的bean // <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/> finishBeanFactoryInitialization(beanFactory);
// Spring容器初始化<dubbo:service/>表示的ServiceBean时会建立ServiceBean对象,因为ServiceBean实现了 // ApplicationContextAware接口,因此Spring容器会先调用setApplicationContext给其注入Spring容器 class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware { @Override public void setApplicationContext(ApplicationContext applicationContext) { this.applicationContext = applicationContext; // 给SpringExtensionFactory注入Spring容器 SpringExtensionFactory.addApplicationContext(applicationContext); if (applicationContext != null) { SPRING_CONTEXT = applicationContext; try { // method是addListener方法,调用该方法用于给applicationEventMulticaster // 添加listener method.invoke(applicationContext, new Object[]{this}); supportedApplicationListener = true; } } } }
// 步骤3分析,发布相关事件,这里会发布容器刷新事件 finishRefresh(); protected void finishRefresh() { initLifecycleProcessor(); getLifecycleProcessor().onRefresh(); // 1). 发布容器刷新事件,ServiceBean监听的就是该事件 // ServiceBean implements ApplicationListener<ContextRefreshedEvent> publishEvent(new ContextRefreshedEvent(this)); LiveBeansView.registerApplicationContext(this); } // 2). 步骤1会走到这里,这里会获取以前的applicationEventMulticaster,用于发布事件 getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType); // 3). 走到这里,listener就是以前调用 method.invoke(applicationContext, new Object[]{this}); // 加进去的ServiceBean,this表示ServiceBean,也是listener,event就是容器刷新事件 doInvokeListener(listener, event); // 4) 走到这里,最终调用ServiceBean实现的onApplicationEvent方法 listener.onApplicationEvent(event);
这样,就走到了Dubbo暴露服务的入口
的方法.这也是Dubbo官方文档中说起的入口方法,参考: 服务导出html
public void onApplicationEvent(ContextRefreshedEvent event) { // 若是服务没有被暴露而且服务没有被取消暴露,则打印日志 if (isDelay() && !isExported() && !isUnexported()) { if (logger.isInfoEnabled()) { logger.info("The service ready on spring started. service: " + getInterface()); } // 导出服务 export(); } }
接下来研究一下 Dubbo 导出服务的过程。Dubbo 服务导出过程始于Spring容器发布刷新事件,Dubbo在接收到事件后,会当即执行服务导出逻辑。整个逻辑大体可分为三个部分,第一部分是前置工做,主要用于检查参数,组装 URL。第二部分是导出服务,包含导出服务到本地 (JVM),和导出服务到远程两个过程。第三部分是向注册中心注册服务,用于服务发现。下面将会对这三个部分代码进行详细的分析。spring
服务导出的入口方法是ServiceBean的onApplicationEvent
。onApplicationEvent 是一个事件响应方法,该方法会在收到Spring上下文刷新事件后执行服务导出操做。方法代码以下apache
public void onApplicationEvent(ContextRefreshedEvent event) { // 若是服务没有被暴露而且服务没有被取消暴露,则打印日志 if (isDelay() && !isExported() && !isUnexported()) { if (logger.isInfoEnabled()) { logger.info("The service ready on spring started. service: " + getInterface()); } // 导出服务 export(); } }
这个方法首先会根据条件决定是否导出服务,好比有些服务设置了延时导出,那么此时就不该该在此处导出。还有一些服务已经被导出了,或者当前服务被取消导出了,此时也不能再次导出相关服务。注意这里的 isDelay 方法,这个方法字面意思是“是否延迟导出服务”,返回 true 表示延迟导出,false 表示不延迟导出。可是该方法真实意思却并不是如此,当方法返回 true 时,表示无需延迟导出。返回 false 时,表示须要延迟导出。与字面意思偏偏相反,这个须要你们注意一下。
前置工做主要包含两个部分,分别是配置检查,以及 URL 装配。在导出服务以前,Dubbo 须要检查用户的配置是否合理,或者为用户补充缺省配置。配置检查完成后,接下来须要根据这些配置组装 URL。在 Dubbo 中,URL 的做用十分重要。Dubbo 使用 URL 做为配置载体,全部的拓展点都是经过 URL 获取配置。这一点,官方文档中有所说明。下面的export方法会走到doExport()
方法。segmentfault
public synchronized void export() { if (provider != null) { if (export == null) { export = provider.getExport(); } if (delay == null) { delay = provider.getDelay(); } } if (export != null && !export) { return; } if (delay != null && delay > 0) { delayExportExecutor.schedule(new Runnable() { @Override public void run() { doExport(); } }, delay, TimeUnit.MILLISECONDS); } else { doExport(); } }
如下是配置检查的相关分析,代码比较多,须要你们耐心看一下。下面对配置检查的逻辑进行简单的总结,以下:
1) 检测 <dubbo:service> 标签的 interface 属性合法性,不合法则抛出异常
2) 检测 ProviderConfig、ApplicationConfig 等核心配置类对象是否为空,若为空,则尝试从其余配置类对象中获取相应的实例。
3) 检测并处理泛化服务和普通服务类
4) 检测本地存根配置,并进行相应的处理
5) 对 ApplicationConfig、RegistryConfig 等配置类进行检测,为空则尝试建立,若没法建立则抛出异常配置检查并不是本文重点,所以这里不打算对 doExport 方法所调用的方法进行分析(doExportUrls 方法除外)。在这些方法中,除了appendProperties方法稍微复杂一些,其余方法逻辑不是很复杂。所以,你们可自行分析。app
protected synchronized void doExport() { if (unexported) { throw new IllegalStateException("Already unexported!"); } if (exported) { return; } exported = true; if (interfaceName == null || interfaceName.length() == 0) { // 抛异常 } checkDefault();
if (provider != null) { if (application == null) { application = provider.getApplication(); } if (module == null) { module = provider.getModule(); } if (registries == null) { registries = provider.getRegistries(); } if (monitor == null) { monitor = provider.getMonitor(); } if (protocols == null) { protocols = provider.getProtocols(); } }
if (module != null) { if (registries == null) { registries = module.getRegistries(); } if (monitor == null) { monitor = module.getMonitor(); } } if (application != null) { if (registries == null) { registries = application.getRegistries(); } if (monitor == null) { monitor = application.getMonitor(); } }
// 检测ref是否为泛化服务类型 if (ref instanceof GenericService) { // 设置interfaceClass为GenericService interfaceClass = GenericService.class; if (StringUtils.isEmpty(generic)) { // 设置generic = true generic = Boolean.TRUE.toString(); } } else { try { // 得到接口类型 interfaceClass = Class.forName(interfaceName, true, Thread.currentThread() .getContextClassLoader()); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } // 对interfaceClass,以及<dubbo:method>标签中的必要字段进行检查 checkInterfaceAndMethods(interfaceClass, methods); // 对ref合法性进行检测 checkRef(); generic = Boolean.FALSE.toString(); }
// stub local同样都是配置本地存根 if (local != null) { if ("true".equals(local)) { local = interfaceName + "Local"; } Class<?> localClass; try { localClass = ClassHelper.forNameWithThreadContextClassLoader(local); } } if (stub != null) { if ("true".equals(stub)) { stub = interfaceName + "Stub"; } Class<?> stubClass; try { stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub); } }
checkApplication(); checkRegistry(); checkProtocol(); appendProperties(this); // 本地存根、mock合法性校验 checkStubAndMock(interfaceClass); if (path == null || path.length() == 0) { path = interfaceName; } // 核心代码,暴露服务、注册逻辑就在其中 doExportUrls(); ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref); ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel); }
Dubbo 容许咱们使用不一样的协议导出服务,也容许咱们向多个注册中心注册服务。Dubbo在doExportUrls方法中对多协议,多注册中心进行了支持。相关代码以下dom
/** * 多协议多注册中心暴露服务进行支持 */ private void doExportUrls() { // 加载注册中心连接 List<URL> registryURLs = loadRegistries(true); // 遍历protocols,并在每一个协议下暴露 for (ProtocolConfig protocolConfig : protocols) { doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
上面代码首先是经过loadRegistries加载注册中心连接,而后再遍历ProtocolConfig集合导出每一个服务。并在导出服务的过程当中,将服务注册到注册中心。咱们先来看一下loadRegistries方法的逻辑。先能够打开看下该方法能够获得什么。jvm
<!-- provider's application name, used for tracing dependency relationship --> <dubbo:application name="demo-provider"/> <!-- use multicast registry center to export service --> <dubbo:registry address="zookeeper://10.101.99.127:2181"/> <!-- use dubbo protocol to export service on port 20880 --> <dubbo:protocol name="dubbo" port="20880"/> <dubbo:provider server="netty4"/> <!-- service implementation, as same as regular local bean --> <bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/> <!-- declare the service interface to be exported --> <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>
protected List<URL> loadRegistries(boolean provider) { checkRegistry(); List<URL> registryList = new ArrayList<URL>(); // 若是registries为空,直接返回空集合 if (registries != null && !registries.isEmpty()) { // 遍历注册中心配置集合registries for (RegistryConfig config : registries) { // 得到地址 String address = config.getAddress(); // 若地址为空,则设置为0.0.0.0 if (address == null || address.length() == 0) { address = Constants.ANYHOST_VALUE; } String sysaddress = System.getProperty("dubbo.registry.address"); if (sysaddress != null && sysaddress.length() > 0) { address = sysaddress; }
// 若是地址为N/A,则跳过 if (address.length() > 0 && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) { Map<String, String> map = new HashMap<String, String>(); // 添加ApplicationConfig中的字段信息到map中 appendParameters(map, application); // 添加RegistryConfig字段信息到map中 appendParameters(map, config); // 添加path、协议版本 map.put("path", RegistryService.class.getName()); map.put("dubbo", Version.getProtocolVersion()); map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); } // 若是map中没有protocol,则默认为使用dubbo协议 if (!map.containsKey("protocol")) { if (ExtensionLoader.getExtensionLoader(RegistryFactory.class). hasExtension("remote")) { map.put("protocol", "remote"); } else { map.put("protocol", "dubbo"); } }
// 解析获得URL列表,address可能包含多个注册中心ip,所以解析获得的是一个URL列表 List<URL> urls = UrlUtils.parseURLs(address, map); // 遍历URL 列表 for (URL url : urls) { // 将URL协议头设置为registry url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol()); // 这里将协议设置为了registry,这也是后面调用的是RegistryProtocol的export()方法缘由 url = url.setProtocol(Constants.REGISTRY_PROTOCOL); // 经过判断条件,决定是否添加url到registryList中,条件以下: // 若是是服务提供者,而且是注册中心服务或者是消费者端,而且是订阅服务,则加入到registryList if ((provider && url.getParameter(Constants.REGISTER_KEY, true)) || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) { registryList.add(url); } } } } } return registryList; }
ProtocolConfig主要封装了<dubbo:protocol name="dubbo" port="20880"/>标签的信息,意思是使用Dubbo协议暴露服务。ide
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { // 获取协议名 String name = protocolConfig.getName(); // 若是为空,则是默认的dubbo if (name == null || name.length() == 0) { name = "dubbo"; } Map<String, String> map = new HashMap<String, String>(); // 设置服务提供者侧 map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE); map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion()); map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); } // 这段代码其实完成了子节点配置信息对父节点的覆盖 appendParameters(map, application); appendParameters(map, module); appendParameters(map, provider, Constants.DEFAULT_KEY); appendParameters(map, protocolConfig); appendParameters(map, this);
// 若是method的配置列表不为空 if (methods != null && !methods.isEmpty()) { // 遍历method配置列表 for (MethodConfig method : methods) { // 把方法名加入map appendParameters(map, method, method.getName()); // 添加 MethodConfig对象的字段信息到map中,键=方法名.属性名 // 好比存储<dubbo:method name="sayHello" retries="2">对应的MethodConfig, // 键=sayHello.retries,map = {"sayHello.retries": 2, "xxx": "yyy"} String retryKey = method.getName() + ".retry"; if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); // 若是retryValue为false,则不重试,设置值为0 if ("false".equals(retryValue)) { map.put(method.getName() + ".retries", "0"); } }
if (ProtocolUtils.isGeneric(generic)) { map.put(Constants.GENERIC_KEY, generic); map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); } else { String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put("revision", revision); } String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0) { map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); } else { map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } if (!ConfigUtils.isEmpty(token)) { if (ConfigUtils.isDefault(token)) { map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString()); } else { map.put(Constants.TOKEN_KEY, token); } }
if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) { protocolConfig.setRegister(false); map.put("notify", "false"); } // export service String contextPath = protocolConfig.getContextpath(); if ((contextPath == null || contextPath.length() == 0) && provider != null) { contextPath = provider.getContextpath(); } String host = this.findConfigedHosts(protocolConfig, registryURLs, map); Integer port = this.findConfigedPorts(protocolConfig, name, map); URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map); if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); }
String scope = url.getParameter(Constants.SCOPE_KEY); // don't export when none is configured if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) { // 暴露到本地 if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { exportLocal(url); } // 暴露到远程 if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { // 后面分析 } } this.urls.add(url); }
前置工做作完,接下来就能够进行服务导出了。服务导出分为导出到本地(JVM)和导出到远程。ui
// 暴露到本地 if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { exportLocal(url); } private void exportLocal(URL url) { // 若是协议不是injvm if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { // 生成本地的url,分别把协议改成injvm,设置host和port URL local = URL.valueOf(url.toFullString()) .setProtocol(Constants.LOCAL_PROTOCOL) .setHost(LOCALHOST).setPort(0); ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref)); // 经过代理工程建立invoker // 再调用export方法进行暴露服务,生成Exporter // 这里的protocol是生成的拓展代理对象,具体可看https://segmentfault.com/a/1190000020384210 // 它是在运行时才根据URL中的protocol参数去决定运行哪一个Protocol实例的export方法,这里因为前面 // setProtocol(Constants.LOCAL_PROTOCOL),因此调用的是InjvmProtocol的export方法 Exporter<?> exporter = protocol.export( proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); // 把生成的暴露者加入集合 exporters.add(exporter); } }
下面两个是url和local的具体值,由于Dubbo采用自适应拓展机制,exportLocal(URL url)中用到的protocol是自适应拓展,protocol的export方法会用到URL中protocol参数从而决定具体生成protocol的哪一个实例,因此URL的protocol值能够关注下。this
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
下面分析下面这句代码。它是核心方法,分为两步。
Exporter<?> exporter = protocol.export( proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); 1) proxyFactory.getInvoker(ref, (Class) interfaceClass, local) -> 返回invoker 2) protocol.export(invoker)
// 步骤1)分析 // proxyFactory也是自适应拓展代理带,它默认使用JavassistProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); // 这里调用的就是JavassistProxyFactory的getInvoker方法 public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // 建立Wrapper对象 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); // 建立匿名Invoker类对象,并实现doInvoke方法 return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { // 调用Wrapper的invokeMethod方法,invokeMethod最终会调用目标方法 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
在 Dubbo 中,Invoker是一个很是重要的模型
。在服务提供端,以及服务引用端均会出现Invoker。Dubbo 官方文档中对Invoker进行了说明,这里引用一下。Invoker是实体域,它是Dubbo的核心模型,其它模型都向它靠扰,或转换成它,它表明一个可执行体,可向它发起invoke调用
,它有多是一个本地的实现,也多是一个远程的实现,也可能一个集群实现。这里面getInvoker方法建立了一个匿名Invoker对象,我理解是经过invoke实行远程调用时,会走wrapper.invokeMethod方法,而wrapper其实是一个代理类,调用wrapper.invokeMethod最终会走proxy,也就是DemoService的sayHello方法。Wrapper建立比较复杂,能够参考 Dubbo中JavaAssist的Wrapper.getWrapper生成代理分析。
// 步骤2分析,调用的是InjvmProtocol的export方法 public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // 该方法只是建立了一个,由于暴露到本地,因此在同一个jvm中,因此不须要其余操做 return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap); }