本篇文章,咱们来研究一下 Dubbo 导出服务的过程。Dubbo 服务导出过程始于 Spring 容器发布刷新事件,Dubbo 在接收到事件后,会当即执行服务导出逻辑。整个逻辑大体可分为三个部分,第一是前置工做,主要用于检查参数,组装 URL。第二是导出服务,包含导出服务到本地 (JVM),和导出服务到远程两个过程。第三是向注册中心注册服务,用于服务发现。本篇文章将会对这三个部分代码进行详细的分析,在分析以前,咱们先来了解一下服务的导出过程。java
Dubbo 支持两种服务导出方式,分别延迟导出和当即导出。延迟导出的入口是 ServiceBean 的 afterPropertiesSet 方法,当即导出的入口是 ServiceBean 的 onApplicationEvent 方法。本文打算分析服务延迟导出过程,所以不会分析 afterPropertiesSet 方法。下面从 onApplicationEvent 方法提及,该方法收到 Spring 容器的刷新事件后,会调用 export 方法执行服务导出操做。服务导出以前,要进行对一系列的配置进行检查,以及生成 URL。准备工做作完,随后开始导出服务。首先导出到本地,而后再导出到远程。导出到本地就是将服务导出到 JVM 中,此过程比较简单。导出到远程的过程则要复杂的多,以 dubbo 协议为例,DubboProtocol 类的 export 方法将会被调用。该方法主要用于建立 Exporter 和 ExchangeServer。ExchangeServer 自己并不具有通讯能力,须要借助更底层的 Server 实现通讯功能。所以,在建立 ExchangeServer 实例时,须要先建立 NettyServer 或者 MinaServer 实例,并将实例做为参数传给 ExchangeServer 实现类的构造方法。ExchangeServer 实例建立完成后,导出服务到远程的过程也就接近尾声了。服务导出结束后,服务消费者便可经过直联的方式消费服务。固然,通常咱们不会使用直联的方式消费服务。因此,在服务导出结束后,紧接着要作的事情是向注册中心注册服务。此时,客户端便可从注册中心发现服务。git
以上就是 Dubbo 服务导出的过程,比较复杂。下面开始分析源码,从源码的角度展示整个过程。github
一场 Dubbo 源码分析的马拉松比赛即将开始,如今咱们站在赛道的起点进行热身准备。本次比赛的起点位置位于 ServiceBean 的 onApplicationEvent 方法处。好了,发令枪响了,我将和一些朋友从 onApplicationEvent 方法处出发,探索 Dubbo 服务导出的全过程。下面咱们来看一下 onApplicationEvent 方法的源码。正则表达式
public void onApplicationEvent(ContextRefreshedEvent event) { // 是否有延迟导出 && 是否已导出 && 是否是已被取消导出 if (isDelay() && !isExported() && !isUnexported()) { // 导出服务 export(); } }
onApplicationEvent 是一个事件响应方法,该方法会在收到 Spring 上下文刷新事件后执行。这个方法首先会根据条件决定是否导出服务,好比有些服务设置了延时导出,那么此时就不该该在此处导出。还有一些服务已经被导出了,或者当前服务被取消导出了,此时也不能再次导出相关服务。注意这里的 isDelay 方法,这个方法字面意思是“是否延迟导出服务”,返回 true 表示延迟导出,false 表示不延迟导出。可是该方法真实意思却并不是如此,当方法返回 true 时,表示无需延迟导出。返回 false 时,表示须要延迟导出。与字面意思偏偏相反,让人以为很奇怪。下面咱们来看一下这个方法的逻辑。apache
// -☆- ServiceBean private boolean isDelay() { // 获取 delay Integer delay = getDelay(); ProviderConfig provider = getProvider(); if (delay == null && provider != null) { // 若是前面获取的 delay 为空,这里继续获取 delay = provider.getDelay(); } // 判断 delay 是否为空,或者等于 -1 return supportedApplicationListener && (delay == null || delay == -1); }
暂时忽略 supportedApplicationListener 这个条件,当 delay 为空,或者等于-1时,该方法返回 true,而不是 false。这个方法的返回值让人有点困惑,所以我重构了该方法的代码,并给 Dubbo 提了一个 Pull Request,最终这个 PR 被合到了 Dubbo 主分支中。详细请参见 Dubbo #2686。bootstrap
如今解释一下 supportedApplicationListener 变量含义,该变量用于表示当前的 Spring 容器是否支持 ApplicationListener,这个值初始为 false。在 Spring 容器将本身设置到 ServiceBean 中时,ServiceBean 的 setApplicationContext 方法会检测 Spring 容器是否支持 ApplicationListener。若支持,则将 supportedApplicationListener 置为 true。代码就不分析了,你们自行查阅了解。数组
ServiceBean 是 Dubbo 与 Spring 框架进行整合的关键,能够看作是两个框架之间的桥梁。具备一样做用的类还有 ReferenceBean。ServiceBean 实现了 Spring 的一些拓展接口,有 FactoryBean、ApplicationContextAware、ApplicationListener、DisposableBean 和 BeanNameAware。这些接口我在 Spring 源码分析系列文章中介绍过,你们能够参考一下,这里就不赘述了。缓存
如今咱们知道了 Dubbo 服务导出过程的起点。那么接下来,咱们马不停蹄,继续进行比赛。赛程预告,下一站是“服务导出的前置工做”。服务器
前置工做主要包含两个部分,分别是配置检查,以及 URL 装配。在导出服务以前,Dubbo 须要检查用户的配置是否合理,或者为用户补充缺省配置。配置检查完成后,接下来须要根据这些配置组装 URL。在 Dubbo 中,URL 的做用十分重要。Dubbo 使用 URL 做为配置载体,全部的拓展点都是经过 URL 获取配置。这一点,官方文档中有所说明。app
采用 URL 做为配置信息的统一格式,全部扩展点都经过传递 URL 携带配置信息。
接下来,咱们先来分析配置检查部分的源码,随后再来分析 URL 组装部分的源码。
本节咱们接着前面的源码向下分析,前面说过 onApplicationEvent 方法在通过一些判断后,会决定是否调用 export 方法导出服务。那么下面咱们从 export 方法开始进行分析,以下:
public synchronized void export() { if (provider != null) { // 获取 export 和 delay 配置 if (export == null) { export = provider.getExport(); } if (delay == null) { delay = provider.getDelay(); } } // 若是 export 为 false,则不导出服务 if (export != null && !export) { return; } if (delay != null && delay > 0) { // delay > 0,延时导出服务 delayExportExecutor.schedule(new Runnable() { @Override public void run() { doExport(); } }, delay, TimeUnit.MILLISECONDS); } else { // 当即导出服务 doExport(); } }
export 对两个配置进行了检查,并配置执行相应的动做。首先是 export,这个配置决定了是否导出服务。有时候咱们只是想本地启动服务进行一些调试工做,这个时候咱们并不但愿把本地启动的服务暴露出去给别人调用。此时,咱们就能够经过配置 export 禁止服务导出,好比:
<dubbo:provider export="false" />
delay 见名知意了,用于延迟导出服务。下面,咱们继续分析源码,此次要分析的是 doExport 方法。
protected synchronized void doExport() { if (unexported) { throw new IllegalStateException("Already unexported!"); } if (exported) { return; } exported = true; // 检测 interfaceName 是否合法 if (interfaceName == null || interfaceName.length() == 0) { throw new IllegalStateException("interface not allow null!"); } // 检测 provider 是否为空,为空则新建一个,并经过系统变量为其初始化 checkDefault(); // 下面几个 if 语句用于检测 provider、application 等核心配置类对象是否为空, // 若为空,则尝试从其余配置类对象中获取相应的实例。 if (provider != null) { if (application == null) { application = provider.getApplication(); } if (module == null) { module = provider.getModule(); } if (registries == null) {...} if (monitor == null) {...} if (protocols == null) {...} } if (module != null) { if (registries == null) { registries = module.getRegistries(); } if (monitor == null) {...} } if (application != null) { if (registries == null) { registries = application.getRegistries(); } if (monitor == null) {...} } // 检测 ref 是否泛化服务类型 if (ref instanceof GenericService) { // 设置 interfaceClass 为 GenericService.class interfaceClass = GenericService.class; if (StringUtils.isEmpty(generic)) { // 设置 generic = "true" generic = Boolean.TRUE.toString(); } } else { // ref 非 GenericService 类型 try { interfaceClass = Class.forName(interfaceName, true, Thread.currentThread() .getContextClassLoader()); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } // 对 interfaceClass,以及 <dubbo:method> 必要字段进行检查 checkInterfaceAndMethods(interfaceClass, methods); // 对 ref 合法性进行检测 checkRef(); // 设置 generic = "false" generic = Boolean.FALSE.toString(); } // local 属性 Dubbo 官方文档中没有说明,不过 local 和 stub 在功能应该是一致的,用于配置本地存根 if (local != null) { if ("true".equals(local)) { local = interfaceName + "Local"; } Class<?> localClass; try { // 获取本地存根类 localClass = ClassHelper.forNameWithThreadContextClassLoader(local); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } // 检测本地存根类是否可赋值给接口类,若不可赋值则会抛出异常,提醒使用者本地存根类类型不合法 if (!interfaceClass.isAssignableFrom(localClass)) { throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName); } } // stub 和 local 均用于配置本地存根 if (stub != null) { // 此处的代码和上一个 if 分支的代码基本一致,这里省略了 } // 检测各类对象是否为空,为空则新建,或者抛出异常 checkApplication(); checkRegistry(); checkProtocol(); appendProperties(this); checkStubAndMock(interfaceClass); if (path == null || path.length() == 0) { path = interfaceName; } // 导出服务 doExportUrls(); // ProviderModel 表示服务提供者模型,此对象中存储了和服务提供者相关的信息。 // 好比服务的配置信息,服务实例等。每一个被导出的服务对应一个 ProviderModel。 // ApplicationModel 持有全部的 ProviderModel。 ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref); ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel); }
以上就是配置检查的相关分析,代码比较多,须要你们耐心看一下。下面对配置检查的逻辑进行简单的总结,以下:
配置检查并不是本文重点,所以我不打算对 doExport 方法所调用的方法进行分析(doExportUrls 方法除外)。在这些方法中,除了 appendProperties 方法稍微复杂一些,其余方法都还好。所以,你们可自行进行分析。好了,其余的就很少说了,继续向下分析。
Dubbo 容许咱们使用不一样的协议导出服务,也容许咱们向多个注册中心注册服务。Dubbo 在 doExportUrls 方法中对多协议,多注册中心进行了支持。相关代码以下:
private void doExportUrls() { // 加载注册中心连接 List<URL> registryURLs = loadRegistries(true); // 遍历 protocols,导出每一个服务 for (ProtocolConfig protocolConfig : protocols) { doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
上面代码比较简单,首先是经过 loadRegistries 加载注册中心连接,而后再遍历 ProtocolConfig 集合导出每一个服务。并在导出服务的过程当中,将服务注册到注册中心处。下面,咱们先来看一下 loadRegistries 方法的逻辑。
protected List<URL> loadRegistries(boolean provider) { // 检测是否存在注册中心配置类,不存在则抛出异常 checkRegistry(); List<URL> registryList = new ArrayList<URL>(); if (registries != null && !registries.isEmpty()) { for (RegistryConfig config : registries) { String address = config.getAddress(); if (address == null || address.length() == 0) { // 若 address 为空,则将其设为 0.0.0.0 address = Constants.ANYHOST_VALUE; } // 从系统属性中加载注册中心地址 String sysaddress = System.getProperty("dubbo.registry.address"); if (sysaddress != null && sysaddress.length() > 0) { address = sysaddress; } // 判断 address 是否合法 if (address.length() > 0 && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) { Map<String, String> map = new HashMap<String, String>(); // 添加 ApplicationConfig 中的字段信息到 map 中 appendParameters(map, application); // 添加 RegistryConfig 字段信息到 map 中 appendParameters(map, config); map.put("path", RegistryService.class.getName()); map.put("dubbo", Version.getProtocolVersion()); map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); } if (!map.containsKey("protocol")) { if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) { map.put("protocol", "remote"); } else { map.put("protocol", "dubbo"); } } // 解析获得 URL 列表,address 可能包含多个注册中心 ip, // 所以解析获得的是一个 URL 列表 List<URL> urls = UrlUtils.parseURLs(address, map); for (URL url : urls) { url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol()); // 将 URL 协议头设置为 registry url = url.setProtocol(Constants.REGISTRY_PROTOCOL); // 经过判断条件,决定是否添加 url 到 registryList 中,条件以下: // (服务提供者 && register = true 或 null) // || (非服务提供者 && subscribe = true 或 null) if ((provider && url.getParameter(Constants.REGISTER_KEY, true)) || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) { registryList.add(url); } } } } } return registryList; }
上面代码不是很复杂,包含以下逻辑:
关于多协议多注册中心导出服务就先分析到这,代码不是不少,就不过多叙述了。接下来分析 URL 组装过程。
配置检查完毕后,紧接着要作的事情是根据配置,以及其余一些信息组装 URL。前面说过,URL 是 Dubbo 配置的载体,经过 URL 可以让 Dubbo 的各类配置在各个模块之间传递。URL 之于 Dubbo,犹如水之于鱼,很是重要。你们在阅读 Dubbo 服务导出相关源码的过程当中,要注意 URL 内容的变化。既然 URL 如此重要,那么下面咱们来了解一下 URL 组装的过程。
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { String name = protocolConfig.getName(); // 若是协议名为空,或空串,则将协议名变量设置为 dubbo if (name == null || name.length() == 0) { name = "dubbo"; } Map<String, String> map = new HashMap<String, String>(); // 添加 side、版本、时间戳以及进程号等信息到 map 中 map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE); map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion()); map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); } // 经过反射将对象的字段信息到 map 中 appendParameters(map, application); appendParameters(map, module); appendParameters(map, provider, Constants.DEFAULT_KEY); appendParameters(map, protocolConfig); appendParameters(map, this); // methods 为 MethodConfig 集合,MethodConfig 中存储了 <dubbo:method> 标签的配置信息 if (methods != null && !methods.isEmpty()) { // 这段代码用于添加 Callback 配置到 map 中,代码太长,待会单独分析 } // 检测 generic 是否为 "true",并根据检测结果向 map 中添加不一样的信息 if (ProtocolUtils.isGeneric(generic)) { map.put(Constants.GENERIC_KEY, generic); map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); } else { String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put("revision", revision); } // 为接口生成包裹类 Wrapper,Wrapper 中包含了接口的详细信息,好比接口方法名数组,字段信息等 String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); // 添加方法名到 map 中,若是包含多个方法名,则用逗号隔开,好比 method = init,destroy if (methods.length == 0) { logger.warn("NO method found in service interface ..."); map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); } else { // 将逗号做为分隔符链接方法名,并将链接后的字符串放入 map 中 map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } // 添加 token 到 map 中 if (!ConfigUtils.isEmpty(token)) { if (ConfigUtils.isDefault(token)) { map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString()); } else { map.put(Constants.TOKEN_KEY, token); } } // 判断协议名是否为 injvm if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) { protocolConfig.setRegister(false); map.put("notify", "false"); } // 获取上下文路径 String contextPath = protocolConfig.getContextpath(); if ((contextPath == null || contextPath.length() == 0) && provider != null) { contextPath = provider.getContextpath(); } // 获取 host 和 port String host = this.findConfigedHosts(protocolConfig, registryURLs, map); Integer port = this.findConfigedPorts(protocolConfig, name, map); // 组装 URL URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map); // 省略无关代码 }
上面的代码首先是将一些信息,好比版本、时间戳、方法名以及各类配置对象的字段信息放入到 map 中,map 中的内容将做为 URL 的查询字符串。构建好 map 后,紧接着是获取上下文路径、主机名以及端口号等信息。最后将 map 和主机名等数据传给 URL 构造方法建立 URL 对象。须要注意的是,这里出现的 URL 并不是 java.net.URL,而是 com.alibaba.dubbo.common.URL。
上面省略了一段代码,这里简单分析一下。这段代码用于检测 <dubbo:argument> 标签中的配置信息,并将相关配置添加到 map 中。代码以下:
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { // ... // methods 为 MethodConfig 集合,MethodConfig 中存储了 <dubbo:method> 标签的配置信息 if (methods != null && !methods.isEmpty()) { for (MethodConfig method : methods) { // 添加 MethodConfig 对象的字段信息到 map 中,键 = 方法名.属性名。 // 好比存储 <dubbo:method name="sayHello" retries="2"> 对应的 MethodConfig, // 键 = sayHello.retries,map = {"sayHello.retries": 2, "xxx": "yyy"} appendParameters(map, method, method.getName()); String retryKey = method.getName() + ".retry"; if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); // 检测 MethodConfig retry 是否为 false,如果,则设置重试次数为0 if ("false".equals(retryValue)) { map.put(method.getName() + ".retries", "0"); } } // 获取 ArgumentConfig 列表 List<ArgumentConfig> arguments = method.getArguments(); if (arguments != null && !arguments.isEmpty()) { for (ArgumentConfig argument : arguments) { // 检测 type 属性是否为空,或者空串(分支1 ⭐️) if (argument.getType() != null && argument.getType().length() > 0) { Method[] methods = interfaceClass.getMethods(); if (methods != null && methods.length > 0) { for (int i = 0; i < methods.length; i++) { String methodName = methods[i].getName(); // 比对方法名,查找目标方法 if (methodName.equals(method.getName())) { Class<?>[] argtypes = methods[i].getParameterTypes(); if (argument.getIndex() != -1) { // 检测 ArgumentConfig 中的 type 属性与方法参数列表 // 中的参数名称是否一致,不一致则抛出异常(分支2 ⭐️) if (argtypes[argument.getIndex()].getName().equals(argument.getType())) { // 添加 ArgumentConfig 字段信息到 map 中, // 键前缀 = 方法名.index,好比: // map = {"sayHello.3": true} appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { throw new IllegalArgumentException("argument config error: ..."); } } else { // 分支3 ⭐️ for (int j = 0; j < argtypes.length; j++) { Class<?> argclazz = argtypes[j]; // 从参数类型列表中查找类型名称为 argument.type 的参数 if (argclazz.getName().equals(argument.getType())) { appendParameters(map, argument, method.getName() + "." + j); if (argument.getIndex() != -1 && argument.getIndex() != j) { throw new IllegalArgumentException("argument config error: ..."); } } } } } } } // 用户未配置 type 属性,但配置了 index 属性,且 index != -1 } else if (argument.getIndex() != -1) { // 分支4 ⭐️ // 添加 ArgumentConfig 字段信息到 map 中 appendParameters(map, argument, method.getName() + "." + argument.getIndex()); } else { throw new IllegalArgumentException("argument config must set index or type"); } } } } } // ... }
上面这段代码 for 循环和 if else 分支嵌套太多,致使层次太深,不利于阅读,须要耐心看一下。你们在看这段代码时,注意把几个重要的条件分支找出来。只要理解了这几个分支的意图,就能够弄懂这段代码。我在上面代码中用⭐️符号标识出了4个重要的分支,下面用伪代码解释一下这几个分支的含义。
// 获取 ArgumentConfig 列表 for (遍历 ArgumentConfig 列表) { if (type 不为 null,也不为空串) { // 分支1 1. 经过反射获取 interfaceClass 的方法列表 for (遍历方法列表) { 1. 比对方法名,查找目标方法 2. 经过反射获取目标方法的参数类型数组 argtypes if (index != -1) { // 分支2 1. 从 argtypes 数组中获取下标 index 处的元素 argType 2. 检测 argType 的名称与 ArgumentConfig 中的 type 属性是否一致 3. 添加 ArgumentConfig 字段信息到 map 中,或抛出异常 } else { // 分支3 1. 遍历参数类型数组 argtypes,查找 argument.type 类型的参数 2. 添加 ArgumentConfig 字段信息到 map 中 } } } else if (index != -1) { // 分支4 1. 添加 ArgumentConfig 字段信息到 map 中 } }
在本节分析的源码中,appendParameters 这个方法出现的次数比较多,该方法用于将对象字段信息添加到 map 中。实现上则是经过反射获取目标对象的 getter 方法,并调用该方法获取属性值。而后再经过 getter 方法名解析出属性名,好比从方法名 getName 中可解析出属性 name。若是用户传入了属性名前缀,此时须要将属性名加入前缀内容。最后将 <属性名,属性值> 键值对存入到 map 中就好了。限于篇幅缘由,这里就不分析 appendParameters 方法的源码了,你们请自行分析。
前置工做作完,接下来就能够进行服务导出工做。服务导出,分为导出到本地 (JVM),和导出到远程。在深刻分析服务导出源码前,咱们先来从宏观层面上看一下服务导出逻辑。以下:
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { // 省略无关代码 if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { // 加载 ConfiguratorFactory,并生成 Configurator 配置 url url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); } String scope = url.getParameter(Constants.SCOPE_KEY); // 若是 scope = none,则什么都不作 if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) { // scope != remote,导出到本地 if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { exportLocal(url); } // scope != local,导出到远程 if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { if (registryURLs != null && !registryURLs.isEmpty()) { for (URL registryURL : registryURLs) { url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY)); // 加载监视器连接 URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { // 将监视器连接做为参数添加到 url 中 url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } String proxy = url.getParameter(Constants.PROXY_KEY); if (StringUtils.isNotEmpty(proxy)) { registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy); } // 为服务提供类(ref)生成 Invoker Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); // DelegateProviderMetaDataInvoker 仅用于持有 Invoker 和 ServiceConfig DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); // 导出服务,并生成 Exporter Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } } else { // 不存在注册中心,仅导出服务 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); Exporter<?> exporter = protocol.export(wrapperInvoker); exporters.add(exporter); } } } this.urls.add(url); }
上面代码根据 url 中的 scope 参数决定服务导出方式,分别以下:
无论是导出到本地,仍是远程。进行服务导出以前,均须要先建立 Invoker。这是一个很重要的步骤,所以接下来我会先分析 Invoker 的建立过程。
在 Dubbo 中,Invoker 是一个很是重要的模型。在服务提供端,以及服务引用端均会出现 Invoker。Dubbo 官方文档中对 Invoker 进行了说明,这里引用一下。
Invoker 是实体域,它是 Dubbo 的核心模型,其它模型都向它靠扰,或转换成它,它表明一个可执行体,可向它发起 invoke 调用,它有多是一个本地的实现,也多是一个远程的实现,也可能一个集群实现。
既然 Invoker 如此重要,那么咱们颇有必要搞清楚 Invoker 的用途。Invoker 是由 ProxyFactory 建立而来,Dubbo 默认的 ProxyFactory 实现类是 JavassistProxyFactory。下面咱们到 JavassistProxyFactory 代码中,探索 Invoker 的建立过程。以下:
-- JavassistProxyFactory public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // 为目标类建立 Wrapper final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); // 建立匿名 Invoker 类对象,并实现 doInvoke 方法。 return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { // 调用 Wrapper 的 invokeMethod 方法,invokeMethod 最终会调用目标方法 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
如上,JavassistProxyFactory 建立了一个继承自 AbstractProxyInvoker 类的匿名对象,并覆写了抽象方法 doInvoke。覆写后的 doInvoke 逻辑比较简单,仅是将调用请求转发给了 Wrapper 类的 invokeMethod 方法。Wrapper 用于“包裹”目标类,Wrapper 是一个抽象类,仅可经过 getWrapper(Class) 方法建立子类。在建立 Wrapper 子类的过程当中,子类代码生成逻辑会对 getWrapper 方法传入的 Class 对象进行解析,拿到诸如类方法,类成员变量等信息。以及生成 invokeMethod 方法代码,和其余一些方法代码。代码生成完毕后,经过 Javassist 生成 Class 对象,最后再经过反射建立 Wrapper 实例。相关的代码以下:
public static Wrapper getWrapper(Class<?> c) { while (ClassGenerator.isDynamicClass(c)) c = c.getSuperclass(); if (c == Object.class) return OBJECT_WRAPPER; // 访存 Wrapper ret = WRAPPER_MAP.get(c); if (ret == null) { // 缓存未命中,建立 Wrapper ret = makeWrapper(c); // 写入缓存 WRAPPER_MAP.put(c, ret); } return ret; }
getWrapper 方法只是包含了一些缓存操做逻辑,非重点。下面咱们重点关注 makeWrapper 方法。
private static Wrapper makeWrapper(Class<?> c) { // 检测 c 是否为私有类型,如果则抛出异常 if (c.isPrimitive()) throw new IllegalArgumentException("Can not create wrapper for primitive type: " + c); String name = c.getName(); ClassLoader cl = ClassHelper.getClassLoader(c); // c1 用于存储 setPropertyValue 方法代码 StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, String n, Object v){ "); // c2 用于存储 getPropertyValue 方法代码 StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, String n){ "); // c3 用于存储 invokeMethod 方法代码 StringBuilder c3 = new StringBuilder("public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws " + InvocationTargetException.class.getName() + "{ "); // 生成类型转换代码及异常捕捉代码,好比: // DemoService w; try { w = ((DemoServcie) $1); }}catch(Throwable e){ throw new IllegalArgumentException(e); } c1.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }"); c2.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }"); c3.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }"); // pts 用于存储成员变量名和类型 Map<String, Class<?>> pts = new HashMap<String, Class<?>>(); // ms 用于存储方法描述信息(可理解为方法签名)及 Method 实例 Map<String, Method> ms = new LinkedHashMap<String, Method>(); // mns 为方法名列表 List<String> mns = new ArrayList<String>(); // dmns 用于存储定义在当前类中的方法的名称 List<String> dmns = new ArrayList<String>(); // --------------------------------✨ 分割线1 ✨------------------------------------- // 获取 public 访问级别的字段,并为全部字段生成条件判断语句 for (Field f : c.getFields()) { String fn = f.getName(); Class<?> ft = f.getType(); if (Modifier.isStatic(f.getModifiers()) || Modifier.isTransient(f.getModifiers())) // 忽略关键字 static 或 transient 修饰的变量 continue; // 生成条件判断及赋值语句,好比: // if( $2.equals("name") ) { w.name = (java.lang.String) $3; return;} // if( $2.equals("age") ) { w.age = ((Number) $3).intValue(); return;} c1.append(" if( $2.equals(\"").append(fn).append("\") ){ w.").append(fn).append("=").append(arg(ft, "$3")).append("; return; }"); // 生成条件判断及返回语句,好比: // if( $2.equals("name") ) { return ($w)w.name; } c2.append(" if( $2.equals(\"").append(fn).append("\") ){ return ($w)w.").append(fn).append("; }"); // 存储 <字段名, 字段类型> 键值对到 pts 中 pts.put(fn, ft); } // --------------------------------✨ 分割线2 ✨------------------------------------- Method[] methods = c.getMethods(); // 检测 c 中是否包含在当前类中声明的方法 boolean hasMethod = hasMethods(methods); if (hasMethod) { c3.append(" try{"); } for (Method m : methods) { if (m.getDeclaringClass() == Object.class) // 忽略 Object 中定义的方法 continue; String mn = m.getName(); // 生成方法名判断语句,示例以下: // if ( "sayHello".equals( $2 ) c3.append(" if( \"").append(mn).append("\".equals( $2 ) "); int len = m.getParameterTypes().length; // 生成运行时传入参数的数量与方法的参数列表长度判断语句,示例以下: // && $3.length == 2 c3.append(" && ").append(" $3.length == ").append(len); boolean override = false; for (Method m2 : methods) { // 检测方法是否存在重载状况,条件为:方法对象不一样 && 方法名相同 if (m != m2 && m.getName().equals(m2.getName())) { override = true; break; } } // 对重载方法进行处理,考虑下面的方法: // 1. void sayHello(Integer, String) // 2. void sayHello(Integer, Integer) // 方法名相同,参数列表长度也相同,所以不能仅经过这两项判断两个方法是否相等。 // 须要进一步判断方法的参数类型 if (override) { if (len > 0) { for (int l = 0; l < len; l++) { // && $3[0].getName().equals("java.lang.Integer") // && $3[1].getName().equals("java.lang.String") c3.append(" && ").append(" $3[").append(l).append("].getName().equals(\"") .append(m.getParameterTypes()[l].getName()).append("\")"); } } } // 添加 ) {,完成方法判断语句,此时生成的方法可能以下(已格式化): // if ("sayHello".equals($2) // && $3.length == 2 // && $3[0].getName().equals("java.lang.Integer") // && $3[1].getName().equals("java.lang.String")) { c3.append(" ) { "); // 根据返回值类型生成目标方法调用语句 if (m.getReturnType() == Void.TYPE) // w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]); return null; c3.append(" w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");").append(" return null;"); else // return w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]); c3.append(" return ($w)w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");"); // 添加 }, 当前”方法判断条件“代码生成完毕,示例代码以下(已格式化): // if ("sayHello".equals($2) // && $3.length == 2 // && $3[0].getName().equals("java.lang.Integer") // && $3[1].getName().equals("java.lang.String")) { // // w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]); // return null; // } c3.append(" }"); // 添加方法名到 mns 集合中 mns.add(mn); // 检测当前方法是否在 c 中被声明的 if (m.getDeclaringClass() == c) // 如果,则将当前方法名添加到 dmns 中 dmns.add(mn); ms.put(ReflectUtils.getDesc(m), m); } if (hasMethod) { // 添加异常捕捉语句 c3.append(" } catch(Throwable e) { "); c3.append(" throw new java.lang.reflect.InvocationTargetException(e); "); c3.append(" }"); } // 添加 NoSuchMethodException 异常抛出代码 c3.append(" throw new " + NoSuchMethodException.class.getName() + "(\"Not found method \\\"\"+$2+\"\\\" in class " + c.getName() + ".\"); }"); // --------------------------------✨ 分割线3 ✨------------------------------------- Matcher matcher; // 处理 get/set 方法 for (Map.Entry<String, Method> entry : ms.entrySet()) { String md = entry.getKey(); Method method = (Method) entry.getValue(); // 匹配以 get 开头的方法 if ((matcher = ReflectUtils.GETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) { // 获取属性名 String pn = propertyName(matcher.group(1)); // 生成属性判断以及返回语句,示例以下: // if( $2.equals("name") ) { return ($w).w.getName(); } c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }"); pts.put(pn, method.getReturnType()); // 匹配以 is/has/can 开头的方法 } else if ((matcher = ReflectUtils.IS_HAS_CAN_METHOD_DESC_PATTERN.matcher(md)).matches()) { String pn = propertyName(matcher.group(1)); // 生成属性判断以及返回语句,示例以下: // if( $2.equals("dream") ) { return ($w).w.hasDream(); } c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }"); pts.put(pn, method.getReturnType()); // 匹配以 set 开头的方法 } else if ((matcher = ReflectUtils.SETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) { Class<?> pt = method.getParameterTypes()[0]; String pn = propertyName(matcher.group(1)); // 生成属性判断以及 setter 调用语句,示例以下: // if( $2.equals("name") ) { w.setName((java.lang.String)$3); return; } c1.append(" if( $2.equals(\"").append(pn).append("\") ){ w.").append(method.getName()).append("(").append(arg(pt, "$3")).append("); return; }"); pts.put(pn, pt); } } // 添加 NoSuchPropertyException 异常抛出代码 c1.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" filed or setter method in class " + c.getName() + ".\"); }"); c2.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" filed or setter method in class " + c.getName() + ".\"); }"); // --------------------------------✨ 分割线4 ✨------------------------------------- long id = WRAPPER_CLASS_COUNTER.getAndIncrement(); // 建立类生成器 ClassGenerator cc = ClassGenerator.newInstance(cl); // 设置类名及超类 cc.setClassName((Modifier.isPublic(c.getModifiers()) ? Wrapper.class.getName() : c.getName() + "$sw") + id); cc.setSuperClass(Wrapper.class); // 添加默认构造方法 cc.addDefaultConstructor(); // 添加字段 cc.addField("public static String[] pns;"); cc.addField("public static " + Map.class.getName() + " pts;"); cc.addField("public static String[] mns;"); cc.addField("public static String[] dmns;"); for (int i = 0, len = ms.size(); i < len; i++) cc.addField("public static Class[] mts" + i + ";"); // 添加方法代码 cc.addMethod("public String[] getPropertyNames(){ return pns; }"); cc.addMethod("public boolean hasProperty(String n){ return pts.containsKey($1); }"); cc.addMethod("public Class getPropertyType(String n){ return (Class)pts.get($1); }"); cc.addMethod("public String[] getMethodNames(){ return mns; }"); cc.addMethod("public String[] getDeclaredMethodNames(){ return dmns; }"); cc.addMethod(c1.toString()); cc.addMethod(c2.toString()); cc.addMethod(c3.toString()); try { // 生成类 Class<?> wc = cc.toClass(); // 设置字段值 wc.getField("pts").set(null, pts); wc.getField("pns").set(null, pts.keySet().toArray(new String[0])); wc.getField("mns").set(null, mns.toArray(new String[0])); wc.getField("dmns").set(null, dmns.toArray(new String[0])); int ix = 0; for (Method m : ms.values()) wc.getField("mts" + ix++).set(null, m.getParameterTypes()); // 建立 Wrapper 实例 return (Wrapper) wc.newInstance(); } catch (RuntimeException e) { throw e; } catch (Throwable e) { throw new RuntimeException(e.getMessage(), e); } finally { cc.release(); ms.clear(); mns.clear(); dmns.clear(); } }
上面代码很长,你们耐心看一下。我在上面代码中作了大量的注释,并按功能对代码进行了分块,以帮助你们理解代码逻辑。下面对这段代码进行讲解。首先咱们把目光移到分割线1之上的代码,这段代码主要用于进行一些初始化操做。好比建立 c一、c二、c3 以及 pts、ms、mns 等变量,以及向 c一、c二、c3 中添加方法定义和类型类型转换代码。接下来是分割线1到分割线2之间的代码,这段代码用于为 public 级别的字段生成条件判断取值与赋值代码。这段代码不是很难看懂,就很少说了。继续向下看,分割线2和分隔线3之间的代码用于为定义在当前类中的方法生成判断语句,和方法调用语句。由于须要对方法重载进行校验,所以到这这段代码看起来有点复杂。不过耐心开一下,也不是很难理解。接下来是分割线3和分隔线4之间的代码,这段代码用于处理 getter、setter 以及以 is/has/can 开头的方法。处理方式是经过正则表达式获取方法类型(get/set/is/...),以及属性名。以后为属性名生成判断语句,而后为方法生成调用语句。最后咱们再来看一下分隔线4如下的代码,这段代码经过 ClassGenerator 为刚刚生成的代码构建 Class 类,并经过反射建立对象。ClassGenerator 是 Dubbo 本身封装的,该类的核心是 toClass() 的重载方法 toClass(ClassLoader, ProtectionDomain),该方法经过 javassist 构建 Class。这里就不分析 toClass 方法了,你们请自行分析。
阅读 Wrapper 类代码须要对 javassist 框架有所了解。关于 javassist,你们若是不熟悉,请自行查阅资料,本节不打算介绍 javassist 相关内容。
好了,关于 Wrapper 类生成过程就分析到这。若是你们看的不是很明白,能够单独为 Wrapper 建立单元测试,而后单步调试。并将生成的代码拷贝出来,格式化后再进行观察和理解。好了,本节先到这。
本节咱们来看一下服务导出相关的代码,按照代码执行顺序,本节先来分析导出服务到本地的过程。相关代码以下:
private void exportLocal(URL url) { // 若是 URL 的协议头等于 injvm,说明已经导出到本地了,无需再次导出 if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { URL local = URL.valueOf(url.toFullString()) .setProtocol(Constants.LOCAL_PROTOCOL) // 设置协议头为 injvm .setHost(LOCALHOST) .setPort(0); ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref)); // 建立 Invoker,并导出服务,这里的 protocol 会在运行时调用 InjvmProtocol 的 export 方法 Exporter<?> exporter = protocol.export( proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); exporters.add(exporter); } }
exportLocal 方法比较简单,首先根据 URL 协议头决定是否导出服务。若需导出,则建立一个新的 URL 并将协议头、主机名以及端口设置成新的值。而后建立 Invoker,并调用 InjvmProtocol 的 export 方法导出服务。下面咱们来看一下 InjvmProtocol 的 export 方法都作了哪些事情。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // 建立 InjvmExporter return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap); }
如上,InjvmProtocol 的 export 方法仅建立了一个 InjvmExporter,无其余逻辑。到此导出服务到本地就分析完了,接下来,咱们继续分析导出服务到远程的过程。
与导出服务到本地相比,导出服务到远程的过程要复杂很多,其包含了服务导出与服务注册两个过程。这两个过程涉及到了大量的调用,所以比较复杂。不过无论再难,咱们都要看一下,万一看懂了呢。按照代码执行顺序,本节先来分析服务导出逻辑,服务注册逻辑将在下一节进行分析。下面开始分析,咱们把目光移动到 RegistryProtocol 的 export 方法上。
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // 导出服务 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); // 获取注册中心 URL,以 zookeeper 注册中心为例,获得的示例 URL 以下: // zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider URL registryUrl = getRegistryUrl(originInvoker); // 根据 URL 加载 Registry 实现类,好比 ZookeeperRegistry final Registry registry = getRegistry(originInvoker); // 获取已注册的服务提供者 URL,好比: // dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker); // 获取 register 参数 boolean register = registeredProviderUrl.getParameter("register", true); // 向服务提供者与消费者注册表中注册服务提供者 ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); // 根据 register 的值决定是否注册服务 if (register) { // 向注册中心注册服务 register(registryUrl, registeredProviderUrl); ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); } // 获取订阅 URL,好比: // provider://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?category=configurators&check=false&anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl); // 建立监听器 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); // 向注册中心进行订阅 override 数据 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); // 建立并返回 DestroyableExporter return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl); }
上面代码看起来比较复杂,主要作以下一些操做:
在以上操做中,除了建立并返回 DestroyableExporter 没啥难度外,其余几步操做都不是很简单。这其中,导出服务和注册服务是本章要重点分析的逻辑。 订阅 override 数据这个是非重点内容,后面会简单介绍一下。下面开始本节的分析,先来分析 doLocalExport 方法的逻辑,以下:
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) { String key = getCacheKey(originInvoker); // 访问缓存 ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { // 建立 Invoker 为委托类对象 final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); // 调用 protocol 的 export 方法导出服务 exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); // 写缓存 bounds.put(key, exporter); } } } return exporter; }
上面的代码是典型的双重检查,这个你们应该都知道。接下来,咱们把重点放在 Protocol 的 export 方法上。假设运行时协议为 dubbo,此处的 protocol 会在运行时加载 DubboProtocol,并调用 DubboProtocol 的 export 方法。咱们目光转移到 DubboProtocol 的 export 方法上,相关分析以下:
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // 获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成。好比: // demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880 String key = serviceKey(url); // 建立 DubboExporter DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); // 将 <key, exporter> 键值对放入缓存中 exporterMap.put(key, exporter); // 如下代码应该和本地存根有关,代码不难看懂,但具体用途暂时不清楚,先忽略 Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { // 省略日志打印代码 } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } // 启动服务器 openServer(url); // 优化序列化 optimizeSerialization(url); return exporter; }
如上,咱们重点关注 DubboExporter 的建立以及 openServer 方法,其余逻辑看不懂也不要紧,不影响理解服务导出过程。另外,DubboExporter 的代码比较简单,就不分析了。下面分析 openServer 方法。
private void openServer(URL url) { // 获取 host:port,并将其做为服务器实例的 key,用于标识当前的服务器实例 String key = url.getAddress(); boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); if (isServer) { // 访问缓存 ExchangeServer server = serverMap.get(key); if (server == null) { // 建立服务器实例 serverMap.put(key, createServer(url)); } else { // 服务器已建立,则根据 url 中的配置重置服务器 server.reset(url); } } }
如上,在同一台机器上(单网卡),同一个端口上仅容许启动一个服务器实例。若某个端口上已有服务器实例,此时则调用 reset 方法重置服务器的一些配置。考虑到篇幅问题,关于服务器实例重置的代码就不分析了。接下来分析服务器实例的建立过程。以下:
private ExchangeServer createServer(URL url) { url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, // 添加心跳检测配置到 url 中 url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // 获取 server 参数,默认为 netty String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); // 经过 SPI 检测是否存在 server 参数所表明的 Transporter 拓展,不存在则抛出异常 if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url); // 添加编码解码器参数 url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); ExchangeServer server; try { // 建立 ExchangeServer server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server..."); } // 获取 client 参数,可指定 netty,mina str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { // 获取全部的 Transporter 实现类名称集合,好比 supportedTypes = [netty, mina] Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); // 检测当前 Dubbo 所支持的 Transporter 实现类名称列表中, // 是否包含 client 所表示的 Transporter,若不包含,则抛出异常 if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type..."); } } return server; }
如上,createServer 包含三个核心的操做。第一是检测是否存在 server 参数所表明的 Transporter 拓展,不存在则抛出异常。第二是建立服务器实例。第三是检测是否支持 client 参数所表示的 Transporter 拓展,不存在也是抛出异常。两次检测操做所对应的代码比较直白了,无需多说。但建立服务器的操做目前还不是很清晰,咱们继续往下看。
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); // 获取 Exchanger,默认为 HeaderExchanger。 // 紧接着调用 HeaderExchanger 的 bind 方法建立 ExchangeServer 实例 return getExchanger(url).bind(url, handler); }
上面代码比较简单,就很少说了。下面看一下 HeaderExchanger 的 bind 方法。
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { // 建立 HeaderExchangeServer 实例,该方法包含了多步操做,本别以下: // 1. new HeaderExchangeHandler(handler) // 2. new DecodeHandler(new HeaderExchangeHandler(handler)) // 3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))) return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }
HeaderExchanger 的 bind 方法包含的逻辑比较多,但目前咱们仅需关心 Transporters 的 bind 方法逻辑便可。该方法的代码以下:
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { // 若是 handlers 元素数量大于1,则建立 ChannelHandler 分发器 handler = new ChannelHandlerDispatcher(handlers); } // 获取自适应 Transporter 实例,并调用实例方法 return getTransporter().bind(url, handler); }
如上,getTransporter() 方法获取的 Transporter 是在运行时动态建立的,类名为 Transporter$Adaptive,也就是自适应拓展类。我在上一篇文章中详细分析了自适应拓展类的生成过程,对自适应拓展类不了解的同窗能够参考我以前的文章,这里再也不赘述。Transporter$Adaptive 会在运行时根据传入的 URL 参数决定加载什么类型的 Transporter,默认为 NettyTransporter。下面咱们继续跟下去,此次分析的是 NettyTransporter 的 bind 方法。
public Server bind(URL url, ChannelHandler listener) throws RemotingException { // 建立 NettyServer return new NettyServer(url, listener); }
这里仅有一句建立 NettyServer 的代码,没啥好讲的,咱们继续向下看。
public class NettyServer extends AbstractServer implements Server { public NettyServer(URL url, ChannelHandler handler) throws RemotingException { // 调用父类构造方法 super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); } } public abstract class AbstractServer extends AbstractEndpoint implements Server { public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { // 调用父类构造方法,这里就不用跟进去了,没什么复杂逻辑 super(url, handler); localAddress = getUrl().toInetSocketAddress(); // 获取 ip 和端口 String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost()); int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort()); if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) { // 设置 ip 为 0.0.0.0 bindIp = NetUtils.ANYHOST; } bindAddress = new InetSocketAddress(bindIp, bindPort); // 获取最大可接受链接数 this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); try { // 调用模板方法 doOpen 启动服务器 doOpen(); } catch (Throwable t) { throw new RemotingException("Failed to bind "); } DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort())); } protected abstract void doOpen() throws Throwable; protected abstract void doClose() throws Throwable; }
上面多数代码为赋值代码,不须要多讲。咱们重点关注 doOpen 抽象方法,该方法须要子类实现。下面回到 NettyServer 中。
protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); // 建立 boss 和 worker 线程池 ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); // 建立 ServerBootstrap bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); bootstrap.setOption("child.tcpNoDelay", true); // 设置 PipelineFactory bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } }); // 绑定到指定的 ip 和端口上 channel = bootstrap.bind(getBindAddress()); }
以上就是 NettyServer 建立的过程,dubbo 默认使用的 NettyServer 是基于 netty 3.x 版本实现的,比较老了。所以 Dubbo 中另外提供了 netty 4.x 版本的 NettyServer,你们可在使用 Dubbo 的过程当中按需进行配置。
到此,关于服务导出的过程就分析完了。整个过程比较复杂,你们在分析的过程当中耐心一些。而且多写 Demo 进行进行调试,以便可以更好的理解代码逻辑。好了,本节内容先到这里,接下来分析服务导出的另外一块逻辑 -- 服务注册。
本节咱们来分析服务注册过程,服务注册操做对于 Dubbo 来讲不是必需的,经过服务直连的方式就能够绕过注册中心。但一般咱们不会这么作,直连方式不利于服务治理,仅推荐在测试环境测试服务时使用。对于 Dubbo 来讲,注册中心虽不是必需,但倒是必要的。所以,关于注册中心以及服务注册相关逻辑,咱们也须要搞懂。
本节内容以 Zookeeper 注册中心做为分析目标,其余类型注册中心你们可自行分析。下面从服务注册的入口方法开始分析,咱们把目光再次移到 RegistryProtocol 的 export 方法上。以下:
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // ${导出服务} // 省略其余代码 boolean register = registeredProviderUrl.getParameter("register", true); if (register) { // 注册服务 register(registryUrl, registeredProviderUrl); ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); } final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); // 订阅 override 数据 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); // 省略部分代码 }
RegistryProtocol 的 export 方法包含了服务导出,注册,以及数据订阅等逻辑。其中服务导出逻辑上一节已经分析过了,本节将分析服务注册逻辑,数据订阅逻辑将在下一节进行分析。下面开始本节的分析,相关代码以下:
public void register(URL registryUrl, URL registedProviderUrl) { // 获取 Registry Registry registry = registryFactory.getRegistry(registryUrl); // 注册服务 registry.register(registedProviderUrl); }
register 方法包含两步操做,第一步是获取注册中心实例,第二步是向注册中心注册服务。接下来,我分两节内容对这两步操做进行分析。按照顺序,先来分析获取注册中心的逻辑。
本节内容以 Zookeeper 注册中心为例进行分析。下面先来看一下 getRegistry 方法的源码,这个方法由 AbstractRegistryFactory 实现。以下:
public Registry getRegistry(URL url) { url = url.setPath(RegistryService.class.getName()) .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()) .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY); String key = url.toServiceString(); LOCK.lock(); try { // 访问缓存 Registry registry = REGISTRIES.get(key); if (registry != null) { return registry; } // 缓存未命中,建立 Registry 实例 registry = createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry..."); } // 写入缓存 REGISTRIES.put(key, registry); return registry; } finally { LOCK.unlock(); } } protected abstract Registry createRegistry(URL url);
如上,getRegistry 方法先访问缓存,缓存未命中则调用 createRegistry 建立 Registry,而后写入缓存。这里的 createRegistry 是一个模板方法,由具体的子类实现。所以,下面咱们到 ZookeeperRegistryFactory 中探究一番。
public class ZookeeperRegistryFactory extends AbstractRegistryFactory { // zookeeperTransporter 由 SPI 在运行时注入,类型为 ZookeeperTransporter$Adaptive private ZookeeperTransporter zookeeperTransporter; public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) { this.zookeeperTransporter = zookeeperTransporter; } @Override public Registry createRegistry(URL url) { // 建立 ZookeeperRegistry return new ZookeeperRegistry(url, zookeeperTransporter); } }
ZookeeperRegistryFactory 的 createRegistry 方法仅包含一句代码,无需解释,继续跟下去。
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { super(url); if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } // 获取组名,默认为 dubbo String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); if (!group.startsWith(Constants.PATH_SEPARATOR)) { // group = "/" + group group = Constants.PATH_SEPARATOR + group; } this.root = group; // 建立 Zookeeper 客户端,默认为 CuratorZookeeperTransporter zkClient = zookeeperTransporter.connect(url); // 添加状态监听器 zkClient.addStateListener(new StateListener() { @Override public void stateChanged(int state) { if (state == RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); }
在上面的代码代码中,咱们重点关注 ZookeeperTransporter 的 connect 方法调用,这个方法用于建立 Zookeeper 客户端。建立好 Zookeeper 客户端,意味着注册中心的建立过程就结束了。不过,显然咱们不能就此中止,难道你们没有兴趣了解一下 Zookeeper 客户端的建立过程吗?若是有,那么继续向下看。没有的话,直接跳到下一节。那我接着分析了。
前面说过,这里的 zookeeperTransporter 类型为自适应拓展类,所以 connect 方法会在被调用时决定加载什么类型的 ZookeeperTransporter 拓展,默认为 CuratorZookeeperTransporter。下面咱们到 CuratorZookeeperTransporter 中看一看。
public ZookeeperClient connect(URL url) { // 建立 CuratorZookeeperClient return new CuratorZookeeperClient(url); }
上面方法仅用于建立 CuratorZookeeperClient 实例,没什么好说的,继续往下看。
public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher> { private final CuratorFramework client; public CuratorZookeeperClient(URL url) { super(url); try { // 建立 CuratorFramework 构造器 CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() .connectString(url.getBackupAddress()) .retryPolicy(new RetryNTimes(1, 1000)) .connectionTimeoutMs(5000); String authority = url.getAuthority(); if (authority != null && authority.length() > 0) { builder = builder.authorization("digest", authority.getBytes()); } // 构建 CuratorFramework 实例 client = builder.build(); // 添加监听器 client.getConnectionStateListenable().addListener(new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState state) { if (state == ConnectionState.LOST) { CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED); } else if (state == ConnectionState.CONNECTED) { CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED); } else if (state == ConnectionState.RECONNECTED) { CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED); } } }); // 启动客户端 client.start(); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } }
CuratorZookeeperClient 构造方法主要用于建立和启动 CuratorFramework 实例。以上基本上都是 Curator 框架的代码,你们若是对 Curator 框架不是很了解,能够参考 Curator 官方文档,并写点 Demo 跑跑。
本节分析了 ZookeeperRegistry 实例的建立过程,整个过程并非很复杂。你们在看完分析后,能够自行调试,以加深印象。如今注册中心实例建立好了,接下来要作的事情是向注册中心注册服务,咱们继续往下看。
以 Zookeeper 为例,所谓的服务注册,本质上是将服务配置数据写入到 Zookeeper 的某个路径的节点下。为了验证这个说法,下面咱们将 Dobbo 官方提供提供的实例跑起来,而后经过 Zookeeper 可视化客户端 ZooInspector 查看节点数据。以下:
从上图中能够看到 com.alibaba.dubbo.demo.DemoService 这个服务对应的配置信息(存储在 URL 中)最终被注册到了 /dubbo/com.alibaba.dubbo.demo.DemoService/providers/ 节点下。搞懂了服务注册的本质,那么接下来咱们就能够去阅读服务注册的代码了。服务注册的接口为 register(URL),这个方法定义在 FailbackRegistry 抽象类中。方法代码以下:
public void register(URL url) { super.register(url); failedRegistered.remove(url); failedUnregistered.remove(url); try { // 模板方法,由子类实现 doRegister(url); } catch (Exception e) { Throwable t = e; // 获取 check 参数,若 check = true 将会直接抛出异常 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register"); } else { logger.error("Failed to register"); } // 记录注册失败的连接 failedRegistered.add(url); } } protected abstract void doRegister(URL url);
如上,咱们重点关注 doRegister 方法调用便可,其余的代码先忽略。doRegister 方法是一个模板方法,所以咱们到 FailbackRegistry 子类 ZookeeperRegistry 中进行分析。以下:
protected void doRegister(URL url) { try { // 经过 Zookeeper 客户端建立节点,节点路径由 toUrlPath 方法生成,路径格式以下: // /${group}/${serviceInterface}/providers/${url} // 好比 // /dubbo/com.tianxiaobo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1...... zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register..."); } }
如上,ZookeeperRegistry 在 doRegister 中调用了 Zookeeper 客户端建立服务节点。节点路径由 toUrlPath 方法生成,该方法逻辑不难理解,就不分析了。接下来分析 create 方法,以下:
public void create(String path, boolean ephemeral) { if (!ephemeral) { // 若是要建立的节点类型非临时节点,那么这里要检测节点是否存在 if (checkExists(path)) { return; } } int i = path.lastIndexOf('/'); if (i > 0) { create(path.substring(0, i), false); // 递归建立上一级路径 } // 根据 ephemeral 的值建立临时或持久节点 if (ephemeral) { createEphemeral(path); } else { createPersistent(path); } }
上面方法先是经过递归建立当前节点的上一级路径,而后再根据 ephemeral 的值决定建立临时仍是持久节点。createEphemeral 和 createPersistent 这两个方法都比较简单,这里简单分析其中的一个。以下:
public void createEphemeral(String path) { try { // 经过 Curator 框架建立节点 client.create().withMode(CreateMode.EPHEMERAL).forPath(path); } catch (NodeExistsException e) { } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } }
好了,到此关于服务注册的过程就分析完了。整个过程可简单总结为:先建立注册中心实例,以后再经过注册中心实例注册服务。本节先到这,接下来分析数据订阅过程。
订阅 override 数据对应的代码我粗略看了一遍,这部分代码的主要目的是为了在服务配置发生变化时,从新导出服务。具体的使用场景应该当咱们经过 Dubbo 管理后台修改了服务配置后,Dubbo 获得服务配置被修改的通知,而后从新导出服务。这个使用场景只是猜想,我并未进行过验证。若是你们有兴趣能够自行验证。
override 数据订阅相关代码也不是不多,考虑到文章篇幅问题以及重要性,遂决定不对此逻辑进行详细的分析。若是你们有兴趣,可自行分析。
本篇文章详细分析了 Dubbo 服务导出过程,包括配置检测,URL 组装,Invoker 建立过程、导出服务以及注册服务等等。篇幅比较大,须要你们耐心阅读。对于这篇文章,我建议你们当成一个工具书使用。须要的时候跳到指定章节看一下,通读可能会有点累。因为文章篇幅比较大,所以可能会隐藏一些我没意识到的错误。若你们在阅读的过程当中发现了错误,还请指出。若是可以不吝赐教,那就更好了,先在这里说声谢谢。
好了,本篇文章就到这了。谢谢阅读。
本文在知识共享许可协议 4.0 下发布,转载需在明显位置处注明出处
做者:田小波
本文同步发布在个人我的博客:http://www.tianxiaobo.com
本做品采用知识共享署名-非商业性使用-禁止演绎 4.0 国际许可协议进行许可。