Dubbo的插件化实现很是相似于原生的JAVA的SPI:它只是提供一种协议,并无提供相关插件化实施的接口。用过的同窗都知道,它有一种java原生的支持类:ServiceLoader,经过声明接口的实现类,在META-INF/services中注册一个实现类,而后经过ServiceLoader去生成一个接口实例,当更换插件的时候只须要把本身实现的插件替换到META-INF/services中便可。java
在Dubbo中,SPI是一个很是核心的机制,贯穿在几乎全部的流程中。弄明白这一块可以帮咱们明白dubo源码mysql
Dubbo是基于Java原生SPI机制思想的一个改进,因此,先从JAVA SPI机制开始了解什么时SPI之后再去学习Dubbo的SPI就比较简单spring
SPI全称(service provider interface),是JDK内置的一种服务提供发现机制,目前市面上有不少框架都是用它来作服务的扩展发现,你们耳熟能详的如JDBC、日志框架都有用到;sql
简单来讲,它是一种动态替换发现的机制。举个简单的例子,若是咱们定义了一个规范,须要第三方厂商去实现,那么对于咱们应用方来讲,只须要集成对应厂商的插件,既能够完成对应规范的实现机制。 造成一种插拔式的扩展手段。数据库
META-INF/services
在该目录下建立一个properties文件,该文件须要知足如下几个条件json
resouces
的META-INF/services
中建立对应的文件,而且经过properties
规则配置实现类的全路径 以及对应调用方引入api接口,和对应产商的jarapi
而且在对应的resouces中引入接口,若是引入了多个产商的jar,那么会取到多个产商的东西缓存
SPI在不少地方有应用,你们能够看看最经常使用的java.sql.Driver驱动。JDK官方提供了java.sql.Driver这个驱动扩展点,可是大家并无看到JDK中有对应的Driver实现。 那在哪里实现呢?安全
以链接Mysql为例,咱们须要添加mysql-connector-java依赖。而后,大家能够在这个jar包中找到SPI的配置信息。以下图,因此java.sql.Driver由各个数据库厂商自行实现。这就是SPI的实际应用。固然除了这个意外,你们在spring的包中也能够看到相应的痕迹服务器
使用原生spi,若是路径下有多个实现都会加载进来,若是有一个加载失败,会比较麻烦
Dubbo的SPI并不是原生的SPI,Dubbo的规则是在
/META-INF/dubbo
/META-INF/internal
/META-INF/service
而且基于SPI接口去建立一个文件下面以须要实现的接口去建立一个文件,而且在文件中以properties规则同样配置实现类的全面以及分配实现的一个名称。
文件名称和接口名称保持一致,文件内容和SPI有差别,内容是key对应value
咱们看一下dubbo-cluster模块的META-INF.dubbo.internal:
META-INF/dubbo/com.alibaba.dubbo.rpc.Protocol
文件,文件内容为
defineProtocol=com.gupaoedu.dubbo.protocol.DefineProtocol
import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.rpc.Exporter; import com.alibaba.dubbo.rpc.Invoker; import com.alibaba.dubbo.rpc.Protocol; import com.alibaba.dubbo.rpc.RpcException; public class DefineProtocol implements Protocol { @Override public int getDefaultPort() { return 8888; } @Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { return null; } @Override public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { return null; } @Override public void destroy() { } }
public class App { public static void main(String[] args) throws IOException, InterruptedException { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext ("dubbo-client.xml"); Protocol protocol=ExtensionLoader.getExtensionLoader(Protocol.class). getExtension("defineProtocol"); System.out.println(protocol.getDefaultPort()); System.in.read(); } }
切入点
Protocol protocol=ExtensionLoader.getExtensionLoader(Protocol.class). getExtension("defineProtocol");
dubbo的扩展点框架主要位于这个包下:
com.alibaba.dubbo.common.extension
大概结构以下:
com.alibaba.dubbo.common.extension
|--factory|--AdaptiveExtensionFactory 自适应扩展点工厂 |--SpiExtensionFactory SPI扩展点工厂|--support
|--ActivateComparator|--Activate 自动激活加载扩展的注解
|--Adaptive 自适应扩展点的注解 |--ExtensionFactory 扩展点对象生成工厂接口|--ExtensionLoader 扩展点加载器,扩展点的查找,校验,加载等核心逻辑的实现类
|--SPI @SPI告诉当前应用其实一个扩展点例如Protocol 必定能够在对应的meta-inf/dubbo.internal 中看到dubbo-config-spring
|- extension
|--SpringExtensionFactory
其中最核心的类就是ExtensionLoader,几乎全部特性都在这个类中实现。
ExtensionLoader没有提供public的构造方法,可是提供了一个public static的getExtensionLoader,这个方法就是获取ExtensionLoader实例的工厂方法。其public成员方法中有三个比较重要的方法:
getActivateExtension :根据条件获取当前扩展可自动激活的实现
getExtension : 根据名称获取当前扩展的指定实现
getAdaptiveExtension : 获取当前扩展的自适应实现
@SPI("dubbo") public interface Protocol { /** * 获取缺省端口,当用户没有配置端口时使用。 * * @return 缺省端口 */ int getDefaultPort(); /** * 暴露远程服务:<br> * 1. 协议在接收请求时,应记录请求来源方地址信息:RpcContext.getContext().setRemoteAddress();<br> * 2. export()必须是幂等的,也就是暴露同一个URL的Invoker两次,和暴露一次没有区别。<br> * 3. export()传入的Invoker由框架实现并传入,协议不须要关心。<br> * * @param <T> 服务的类型 * @param invoker 服务的执行体 * @return exporter 暴露服务的引用,用于取消暴露 * @throws RpcException 当暴露服务出错时抛出,好比端口已占用 */ @Adaptive <T> Exporter<T> export(Invoker<T> invoker) throws RpcException; /** * 引用远程服务:<br> * 1. 当用户调用refer()所返回的Invoker对象的invoke()方法时,协议需相应执行同URL远端export()传入的Invoker对象的invoke()方法。<br> * 2. refer()返回的Invoker由协议实现,协议一般须要在此Invoker中发送远程请求。<br> * 3. 当url中有设置check=false时,链接失败不能抛出异常,并内部自动恢复。<br> * * @param <T> 服务的类型 * @param type 服务的类型 * @param url 远程服务的URL地址 * @return invoker 服务的本地代理 * @throws RpcException 当链接服务提供方失败时抛出 */ @Adaptive <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException; /** * 释放协议:<br> * 1. 取消该协议全部已经暴露和引用的服务。<br> * 2. 释放协议所占用的全部资源,好比链接和端口。<br> * 3. 协议在释放后,依然能暴露和引用新的服务。<br> */ void destroy(); }
从上述Protocol的源码,能够看到两个注解@SPI("duo")
和 @Adaptive
@SPI :表示当前这个接口是一个扩展点,能够实现本身的扩展实现
@Adaptive 表示一个自适应扩展点,在方法级别上,会动态生成一个适配器类
Protocol protocol=ExtensionLoader.getExtensionLoader(Protocol.class). getExtension("defineProtocol");
@SuppressWarnings("unchecked") public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) { if (type == null) throw new IllegalArgumentException("Extension type == null"); if (!type.isInterface()) { throw new IllegalArgumentException("Extension type(" + type + ") is not interface!"); } if (!withExtensionAnnotation(type)) { throw new IllegalArgumentException("Extension type(" + type + ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!"); } ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type); if (loader == null) { EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type)); loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type); } return loader; }
根据一个类型class得到一个ExtensionLoader,须要一个class类型的参数,这个参数必须是接口,并且此接口必需要@SPI
注解注释,不然拒绝处理。检查经过以后首先会检查ExtensionLoader
缓存中是否已经存在该扩展对应的ExtensionLoader
,若是有则返回,不然建立一个新的ExtensionLoader负责加载此扩展实现,同时缓存起来。因此每个扩展,dubbo中只会有一个对应的ExtensionLoader
实例
接下来看下ExtensionLoader的私有构造函数:
private ExtensionLoader(Class<?> type) { this.type = type; objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension()); }
这里保存了对应的扩展类型,而且设置了一个额外的objectFactory属性,他是一个ExtensionFactory类型,ExtensionFactory主要用于加载扩展的实现:
ExtensionFactory主要用于加载扩展的实现:
@SPI public interface ExtensionFactory { /** * Get extension. * * @param type object type. * @param name object name. * @return object instance. */ <T> T getExtension(Class<T> type, String name); }
ExtensionFactory有@SPI注解,说明当前这个接口是一个扩展点。从extension包的结构图能够看到。Dubbo内部提供了两个实现类:SpiExtensionFactory
和AdaptiveExtensionFactory
。不一样的实现能够以不一样的方式来完成扩展点实现的加载。
对应上述ExtensionLoader的getAdaptiveExtension()
咱们查看对应的getAdaptivesion()
方法得到一个自适应的扩展点
若是是配置在类级别上,表示自定义适配器,若是是配置在方法上,表示须要动态生成适配器类
表示当前是自定义扩展点
默认的ExtensionFactory
实现中,AdaptiveExtensionFactotry
被@Adaptive
注解注释,也就是它就是ExtensionFactory对应的自适应扩展实现(每一个扩展点最多只能有一个自适应实现,若是全部实现中没有被@Adaptive注释的,那么dubbo会动态生成一个自适应实现类),也就是说,全部对ExtensionFactory调用的地方,实际上调用的都是AdpativeExtensionFactory,那么咱们看下他的实现代码:
@Adaptive public class AdaptiveExtensionFactory implements ExtensionFactory { private final List<ExtensionFactory> factories; public AdaptiveExtensionFactory() { ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class); List<ExtensionFactory> list = new ArrayList<ExtensionFactory>(); for (String name : loader.getSupportedExtensions()) { list.add(loader.getExtension(name)); } factories = Collections.unmodifiableList(list); } public <T> T getExtension(Class<T> type, String name) { for (ExtensionFactory factory : factories) { T extension = factory.getExtension(type, name); if (extension != null) { return extension; } } return null; }
这段代码,其实就至关于一个代理入口,它会遍历当前系统中全部的ExtensionFactory实现来获取指定的扩展实现,获取到扩展实现,遍历完全部ExtensionFactory实现,调用ExtensionLoader的getSupportedExtensions方法来获取ExtensionFactory的全部实现
从前面ExtensionLoader的私有构造函数中能够看出,在选择ExtensionFactory的时候,并非调用getExtension(name)来获取某个具体的实现类,而是调用getAdaptiveExtension来获取一个自适应的实现。那么首先咱们就来分析一下getAdaptiveExtension这个方法的实现吧:
@SuppressWarnings("unchecked") public T getAdaptiveExtension() { Object instance = cachedAdaptiveInstance.get(); if (instance == null) { if(createAdaptiveInstanceError == null) { synchronized (cachedAdaptiveInstance) { instance = cachedAdaptiveInstance.get(); if (instance == null) { try { instance = createAdaptiveExtension(); cachedAdaptiveInstance.set(instance); } catch (Throwable t) { createAdaptiveInstanceError = t; throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t); } } } } else { throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError); } } return (T) instance; }
首先检查缓存的adaptiveInstance是否存在,若是存在则直接使用,不然的话调用createAdaptiveExtension方法来建立新的adaptiveInstance而且缓存起来。也就是说对于某个扩展点,每次调用ExtensionLoader.getAdaptiveExtension获取到的都是同一个实例。
private T createAdaptiveExtension() { try { return injectExtension((T) getAdaptiveExtensionClass().newInstance()); } catch (Exception e) { throw new IllegalStateException("Can not create adaptive extenstion " + type + ", cause: " + e.getMessage(), e); } }
在`createAdaptiveExtension`方法中,首先经过`getAdaptiveExtensionClass`方法获取到最终的自适应实现类型,而后实例化一个自适应扩展实现的实例,最后进行扩展点注入操做
private Class<?> getAdaptiveExtensionClass() { getExtensionClasses(); if (cachedAdaptiveClass != null) { return cachedAdaptiveClass; } return cachedAdaptiveClass = createAdaptiveExtensionClass(); }
上述代码中主要作了两件事情
getExtensionClasses()
加载全部路径下的扩展点createAdaptiveExtensionClass()
动态建立一个扩展点cachedAdaptiveClass这里有个判断,用来判断当前Protocol这个扩展点是否存在一个自定义的适配器,若是有,则直接返回自定义适配器,不然,就动态建立,这个值是在getExtensionClasses中赋值的,这块代码咱们稍后再看
private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<Map<String,Class<?>>>(); private Map<String, Class<?>> getExtensionClasses() { //com.alibaba.dubbo.rpc.Protocal=>[xx,xx] Map<String, Class<?>> classes = cachedClasses.get(); if (classes == null) { synchronized (cachedClasses) { classes = cachedClasses.get(); if (classes == null) { classes = loadExtensionClasses(); cachedClasses.set(classes); } } } return classes; }
上述代码主要作了几件事
loadExtensionClasses()
,去加载扩展点的实现// 此方法已经getExtensionClasses方法同步过。 private Map<String, Class<?>> loadExtensionClasses() { //type->Protocol.class final SPI defaultAnnotation = type.getAnnotation(SPI.class); if(defaultAnnotation != null) { String value = defaultAnnotation.value(); if(value != null && (value = value.trim()).length() > 0) { String[] names = NAME_SEPARATOR.split(value); if(names.length > 1) { throw new IllegalStateException("more than 1 default extension name on extension " + type.getName() + ": " + Arrays.toString(names)); } if(names.length == 1) cachedDefaultName = names[0]; } } Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>(); loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY); loadFile(extensionClasses, DUBBO_DIRECTORY); loadFile(extensionClasses, SERVICES_DIRECTORY); return extensionClasses; }
从代码里面能够看到,在loadExtensionClasses中首先会检测扩展点在@SPI注解中配置的默认扩展实现的名称,并将其赋值给cachedDefaultName属性进行缓存,后面想要获取该扩展点的默认实现名称就能够直接经过访问cachedDefaultName字段来完成,好比getDefaultExtensionName方法就是这么实现的。从这里的代码中又能够看到,具体的扩展实现类型,是经过调用loadFile方法来加载,分别从一下三个地方加载:
META-INF/dubbo/internal
META-INF/dubbo
META-INF/services/
主要逻辑:
Protocol.class
这个类的注解@SPI
@SPI
中的value值@SPI("dubbo")
,则吧此dubbo的值赋值给cachedDefaultName
,这就是为何咱们可以经过ExtensionLoader.getExtensionLoader(Protocol.class).getDefaultExtension()
可以获取到DubboProtocol
这个扩展点的缘由private void loadFile(Map<String, Class<?>> extensionClasses, String dir) { String fileName = dir + type.getName(); try { Enumeration<java.net.URL> urls; ClassLoader classLoader = findClassLoader(); if (classLoader != null) { urls = classLoader.getResources(fileName); } else { urls = ClassLoader.getSystemResources(fileName); } if (urls != null) { while (urls.hasMoreElements()) { java.net.URL url = urls.nextElement(); try { BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8")); try { String line = null; while ((line = reader.readLine()) != null) { final int ci = line.indexOf('#'); if (ci >= 0) line = line.substring(0, ci); line = line.trim(); if (line.length() > 0) { try { String name = null; int i = line.indexOf('='); if (i > 0) { name = line.substring(0, i).trim(); line = line.substring(i + 1).trim(); } if (line.length() > 0) { Class<?> clazz = Class.forName(line, true, classLoader); if (! type.isAssignableFrom(clazz)) { throw new IllegalStateException("Error when load extension class(interface: " + type + ", class line: " + clazz.getName() + "), class " + clazz.getName() + "is not subtype of interface."); } //若是在类级别上,表示自定义适配器 //若是是在方法上,表示须要动态生成适配器类 if (clazz.isAnnotationPresent(Adaptive.class)) { if(cachedAdaptiveClass == null) { cachedAdaptiveClass = clazz; } else if (! cachedAdaptiveClass.equals(clazz)) { throw new IllegalStateException("More than 1 adaptive class found: " + cachedAdaptiveClass.getClass().getName() + ", " + clazz.getClass().getName()); } } else { try { clazz.getConstructor(type); Set<Class<?>> wrappers = cachedWrapperClasses; if (wrappers == null) { cachedWrapperClasses = new ConcurrentHashSet<Class<?>>(); wrappers = cachedWrapperClasses; } wrappers.add(clazz); } catch (NoSuchMethodException e) { clazz.getConstructor(); if (name == null || name.length() == 0) { name = findAnnotationName(clazz); if (name == null || name.length() == 0) { if (clazz.getSimpleName().length() > type.getSimpleName().length() && clazz.getSimpleName().endsWith(type.getSimpleName())) { name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase(); } else { throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url); } } } String[] names = NAME_SEPARATOR.split(name); if (names != null && names.length > 0) { Activate activate = clazz.getAnnotation(Activate.class); if (activate != null) { cachedActivates.put(names[0], activate); } for (String n : names) { if (! cachedNames.containsKey(clazz)) { cachedNames.put(clazz, n); } Class<?> c = extensionClasses.get(n); if (c == null) { extensionClasses.put(n, clazz); } else if (c != clazz) { throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName()); } } } } } } } catch (Throwable t) { IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + url + ", cause: " + t.getMessage(), t); exceptions.put(line, e); } } } // end of while read lines } finally { reader.close(); } } catch (Throwable t) { logger.error("Exception when load extension class(interface: " + type + ", class file: " + url + ") in " + url, t); } } // end of while urls } } catch (Throwable t) { logger.error("Exception when load extension class(interface: " + type + ", description file: " + fileName + ").", t); } }
解析指定路径下的文件,获取对应的扩展点,经过反射的方式进行实例化以后,put到extensionClasses
这个Map集合中
调用loadFile方法,代码比较长,主要作了几个事情,有几个变量会赋值
cachedAdaptiveClass : 当前Extension类型对应的AdaptiveExtension类型(只能一个)
cachedWrapperClasses : 当前Extension类型对应的全部Wrapper实现类型(无顺序)
cachedActivates : 当前Extension实现自动激活实现缓存(map,无序)
cachedNames : 扩展点实现类对应的名称(如配置多个名称则值为第一个)
当loadExtensionClasses方法执行完成以后,还有如下变量被赋值:
cachedDefaultName : 当前扩展点的默认实现名称
当getExtensionClasses方法执行完成以后,除了上述变量被赋值以外,还有如下变量被赋值:
cachedClasses : 扩展点实现名称对应的实现类(一个实现类可能有多个名称)
其实也就是说,在调用了getExtensionClasses方法以后,当前扩展点对应的实现类的一些信息就已经加载进来了而且被缓存了。后面的许多操做均可以直接经过这些缓存数据来进行处理了。
private Class<?> createAdaptiveExtensionClass() { //生成字节码代码 String code = createAdaptiveExtensionClassCode(); //得到类加载器 ClassLoader classLoader = findClassLoader(); com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension(); //动态编译字节码 return compiler.compile(code, classLoader); }
动态生成适配器代码,以及动态编译
compilier.compile
进行编译(默认状况下使用的是javassist)上生成的code和类是
import com.alibaba.dubbo.common.extension.ExtensionLoader; public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol { public void destroy() { throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!"); } public int getDefaultPort() { throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!"); } public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException { if (arg1 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg1; String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.refer(arg0, arg1); } public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null"); com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.export(arg0); } }
ExtensionLoader.getExtensionLoader(扩展接口类).getExtension(扩展接口实现类名称)
,而后调用类的方法refer是一个引用,获得一个url的参数,经过参数判断是走哪一个协议发布服务
回到createAdaptiveExtension方法,他调用了getExtesionClasses方法加载扩展点实现信息完成以后,就能够直接经过判断cachedAdaptiveClass缓存字段是否被赋值盘肯定当前扩展点是否有默认的AdaptiveExtension实现。若是没有,那么就调用createAdaptiveExtensionClass方法来动态生成一个。在dubbo的扩展点框架中大量的使用了缓存技术。
建立自适应扩展点实现类型和实例化就已经完成了,下面就来看下扩展点自动注入的实现
@Adaptive public class AdaptiveExtensionFactory implements ExtensionFactory { private final List<ExtensionFactory> factories; public AdaptiveExtensionFactory() { ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class); List<ExtensionFactory> list = new ArrayList<ExtensionFactory>(); for (String name : loader.getSupportedExtensions()) { list.add(loader.getExtension(name)); } factories = Collections.unmodifiableList(list); } public <T> T getExtension(Class<T> type, String name) { for (ExtensionFactory factory : factories) { T extension = factory.getExtension(type, name); if (extension != null) { return extension; } } return null; } }
这里能够看到,扩展点自动注入的一句就是根据setter方法对应的参数类型和property名称从ExtensionFactory中查询,若是有返回扩展点实例,那么就进行注入操做。到这里getAdaptiveExtension方法就分析完毕了。
须要明白一点dubbo的内部传参基本上都是基于Url来实现的,也就是说Dubbo是基于URL驱动的技术
因此,适配器类的目的是在运行期获取扩展的真正实现来调用,解耦接口和实现,这样的话要不咱们本身实现适配器类,要不dubbo帮咱们生成,而这些都是经过Adpative来实现。
到目前为止,咱们的AdaptiveExtension的主线走完了,能够简单整理一下他们的调用关系以下
在整个过程当中,最重要的两个方法getExtensionClasses和createAdaptiveExtensionClass
getExtensionClasses
这个方法主要是读取META-INF/services 、META-INF/dubbo、META-INF/internal目录下的文件内容
分析每一行,若是发现其中有哪一个类的annotation是@Adaptive,就找到对应的AdaptiveClass。若是没有的话,就动态建立一个
createAdaptiveExtensionClass
该方法是在getExtensionClasses方法找不到AdaptiveClass的状况下被调用,该方法主要是经过字节码的方式在内存中新生成一个类,它具备AdaptiveClass的功能,Protocol就是经过这种方式得到AdaptiveClass类的。
NamespaceHandler: 注册BeanDefinitionParser, 利用它来解析
BeanDefinitionParser: 解析配置文件的元素
spring会默认加载jar包下/META-INF/spring.handlers
找到对应的NamespaceHandler
当spring容器初始化完之后,会调用afterPropertiesSet方法
bean被销毁的时候调用destory方法
容器初始化完成以后会主动注入applicationContext
事件监听
对象初始化完以后会获取bean的自己属性
delay可以控制延迟发布
首先咱们要关注的是服务的发布和服务的消费这两个主要的流程,那么就能够基于这个点去找到源码分析的突破口。那么天然而然咱们就能够想到spring的配置
Dubbo中spring扩展就是使用spring的自定义类型,因此一样也有NamespaceHandler、BeanDefinitionParser。而NamespaceHandler是DubboNamespaceHandler
public class DubboNamespaceHandler extends NamespaceHandlerSupport { static { Version.checkDuplicate(DubboNamespaceHandler.class); } public void init() { registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true)); registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true)); registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true)); registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true)); registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true)); registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true)); registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true)); registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true)); registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false)); registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true)); } }
BeanDefinitionParser所有都使用了DubboBeanDefinitionParser,若是咱们向看<dubbo:service>的配置,就直接看DubboBeanDefinitionParser中
这个里面主要作了一件事,把不一样的配置分别转化成spring容器中的bean对象
application
对应ApplicationConfig
registry
对应RegistryConfig
monitor
对应MonitorConfig
provider
对应ProviderConfig
consumer
对应ConsumerConfig
为了在spring启动的时候,也相应的启动provider发布服务注册服务的过程,而同时为了让客户端在启动的时候自动订阅发现服务,加入了两个bean
ServiceBean
、ReferenceBean
。
分别继承了ServiceConfig
和ReferenceConfig
同时还分别实现了InitializingBean
、DisposableBean
, ApplicationContextAware
, ApplicationListener
, BeanNameAware
InitializingBean
接口为bean提供了初始化方法的方式,它只包括afterPropertiesSet方法,凡是继承该接口的类,在初始化bean的时候会执行该方法。
DisposableBean
bean被销毁的时候,spring容器会自动执行destory方法,好比释放资源
ApplicationContextAware
实现了这个接口的bean,当spring容器初始化的时候,会自动的将ApplicationContext注入进来
ApplicationListener
ApplicationEvent事件监听,spring容器启动后会发一个事件通知
BeanNameAware
得到自身初始化时,自己的bean的id属性
那么基本的实现思路能够整理出来了
serviceBean
是服务发布的切入点,经过afterPropertiesSet
方法,调用export()
方法进行发布。
export
为父类ServiceConfig
中的方法,因此跳转到SeviceConfig
类中的export
方法
public synchronized void export() { if (provider != null) { if (export == null) { export = provider.getExport(); } if (delay == null) { delay = provider.getDelay(); } } if (export != null && ! export.booleanValue()) { return; } if (delay != null && delay > 0) { Thread thread = new Thread(new Runnable() { public void run() { try { Thread.sleep(delay); } catch (Throwable e) { } doExport(); } }); thread.setDaemon(true); thread.setName("DelayExportServiceThread"); thread.start(); } else { doExport(); } }
咱们发现,delay的做用就是延迟暴露,而延迟的方式也很直截了当,Thread.sleep(delay)
doExport()
方法。一样是一堆初始化代码继续看doExport(),最终会调用到doExportUrls()中:
private void doExportUrls() { List<URL> registryURLs = loadRegistries(true);//是否是得到注册中心的配置 for (ProtocolConfig protocolConfig : protocols) { //是否是支持多协议发布 doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
对应上述protocols
长成这样<dubbo:protocol name="dubbo" port="20880" id ="dubbo"/>
protocols也是根据配置装配出来的,接下来进入对应的duExportUrlsFor1Protocol
方法查看对应s具体实现
最终实现逻辑
//配置为none不暴露 if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) { //配置不是remote的状况下作本地暴露 (配置为remote,则表示只暴露远程服务) if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { exportLocal(url); } //若是配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务) if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){ if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } if (registryURLs != null && registryURLs.size() > 0 && url.getParameter("register", true)) { for (URL registryURL : registryURLs) { url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic")); URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); Exporter<?> exporter = protocol.export(invoker); exporters.add(exporter); } } else { Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); Exporter<?> exporter = protocol.export(invoker); exporters.add(exporter); } } }
从上述代码doExportUrlsFor1Protocol
方法,先建立两个URL,分别以下
dubbo://192.168.xx.xx:20880/com.zzjson.IHello
registry://192.168.xx
其对应的url就是services
的providers
的信息
在上面这段代码中能够看到Dubbo的比较核心的抽象:Invoker, Invoker是一个代理类,从ProxyFactory中生成。这个地方能够作一个小结
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol { public void destroy() { throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!"); } public int getDefaultPort() { throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!"); } public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException { if (arg1 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg1; String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.refer(arg0, arg1); } public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null"); com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.export(arg0); } }
上述代码作两件事
protocol
的协议地址,若是protocol
为空,表示经过dubbo
协议发布服务,不然根据配置的谢意类型来发布服务ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName)
public T getExtension(String name) { if (name == null || name.length() == 0) throw new IllegalArgumentException("Extension name == null"); if ("true".equals(name)) { return getDefaultExtension(); } Holder<Object> holder = cachedInstances.get(name); if (holder == null) { cachedInstances.putIfAbsent(name, new Holder<Object>()); holder = cachedInstances.get(name); } Object instance = holder.get(); if (instance == null) { synchronized (holder) { instance = holder.get(); if (instance == null) { instance = createExtension(name); holder.set(instance); } } } return (T) instance; }
@SuppressWarnings("unchecked") private T createExtension(String name) { Class<?> clazz = getExtensionClasses().get(name); if (clazz == null) { throw findException(name); } try { T instance = (T) EXTENSION_INSTANCES.get(clazz); if (instance == null) { EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance()); instance = (T) EXTENSION_INSTANCES.get(clazz); } //对获取的实例进行依赖注入 injectExtension(instance); //在loadFile中进行赋值的 Set<Class<?>> wrapperClasses = cachedWrapperClasses; if (wrapperClasses != null && wrapperClasses.size() > 0) { for (Class<?> wrapperClass : wrapperClasses) { //对实例进行包装,分别调用带Protocol参数的构造函数建立实例,而后进行依赖注入 instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)); } } return instance; } catch (Throwable t) { throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type + ") could not be instantiated: " + t.getMessage(), t); } }
主要作以下四个事情
Protocol
参数的构造函数建立实例,而后进行依赖注入dubbo-rpc-api
的resources
路径下,找到com.alibaba.dubbo.rpc.Protocol
文件中存在filter/listener
cachedWrapperClass
对DubboProtocol
进行包装,会经过ProtocolFilterWrapper
,ProtocolListenerWrapper
包装private Map<String, Class<?>> getExtensionClasses() { Map<String, Class<?>> classes = cachedClasses.get(); if (classes == null) { synchronized (cachedClasses) { classes = cachedClasses.get(); if (classes == null) { classes = loadExtensionClasses(); cachedClasses.set(classes); } } } return classes; }
总结
在ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
这段代码中,当extName
为registry
的时候,咱们不须要再次去阅读这块代码了,直接能够在扩展点中找到相应的实现扩展类,/dubbo-registry-api/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol
配置以下
registry=com.alibaba.dubbo.registry.integration.RegistryProtocol
因此咱们定位到RegistryProtocolRegistryProtocol
这个类中的export
方法
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //export invoker final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); //registry provider final Registry registry = getRegistry(originInvoker); final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); registry.register(registedProviderUrl); // 订阅override数据 // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,由于subscribed以服务名为缓存的key,致使订阅信息覆盖。 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //保证每次export都返回一个新的exporter实例 return new Exporter<T>() { public Invoker<T> getInvoker() { return exporter.getInvoker(); } public void unexport() { try { exporter.unexport(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { registry.unregister(registedProviderUrl); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { overrideListeners.remove(overrideSubscribeUrl); registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } }; }
上述doLoaclExport源码为:
private Protocol protocol; public void setProtocol(Protocol protocol) { this.protocol = protocol; } @SuppressWarnings("unchecked") private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker){ String key = getCacheKey(originInvoker); ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return (ExporterChangeableWrapper<T>) exporter; }
上述代码Protocol代码是怎么复制的呢,是在injectExtension
方法中对已经加载的扩展点的属性进行依赖注入了
所以咱们知道protocol是一个自适应扩展点,Protocol$Adaptive
,而后调用这个自适应扩展点中的export
方法,这个时候传入的协议地址应该是
dubbo://127.0.0.1/xxx
所以在Protocol$Adaptive.export
方法中,ExtensionLoader.getExtension(Protocol.class).getExtension
就是基于DubboProtocol协议发布服务了么?固然不是,此处获取的不是一个单纯的DubboProtocol
扩展点,而是会经过Wrapper
对Protocl
进行装饰,装饰器分别为ProtocolFilterWrapper
或者是ProtoclListenerWrapper
,至于为何MockProtocol
为何不在装饰器里面呢?咱们能够想到,在ExtensionLoader.loadFile
这段代码的时候,装饰器必需要有一个Protocol
的构造方法,以下
public ProtocolFilterWrapper(Protocol protocol){ if (protocol == null) { throw new IllegalArgumentException("protocol == null"); } this.protocol = protocol; }
至此,咱们能够知道Protocol$Adaptive
中的export
方法会调用ProtocolFilterWrapper
以及ProtocolListenerWrapper
类的方法
public class ProtocolFilterWrapper implements Protocol { private final Protocol protocol; public ProtocolFilterWrapper(Protocol protocol) { if (protocol == null) { throw new IllegalArgumentException("protocol == null"); } this.protocol = protocol; } //此方法读取全部的filter类,利用这些类封装invoker private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; //自动激活扩展点,根据条件获取当前扩展可自动激活的实现 List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (!filters.isEmpty()) { for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { @Override public Class<T> getInterface() { return invoker.getInterface(); } @Override public URL getUrl() { return invoker.getUrl(); } @Override public boolean isAvailable() { return invoker.isAvailable(); } @Override public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } @Override public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; } @Override public int getDefaultPort() { return protocol.getDefaultPort(); } @Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); } @Override public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER); } @Override public void destroy() { protocol.destroy(); } }
ProtocolFilterWrapper
这个类很是重要
Protocol protocol
构造参数Protocol
接口export
和refer
函数进行了封装咱们查看以下文件dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Filter
echo=com.alibaba.dubbo.rpc.filter.EchoFilter generic=com.alibaba.dubbo.rpc.filter.GenericFilter genericimpl=com.alibaba.dubbo.rpc.filter.GenericImplFilter token=com.alibaba.dubbo.rpc.filter.TokenFilter accesslog=com.alibaba.dubbo.rpc.filter.AccessLogFilter activelimit=com.alibaba.dubbo.rpc.filter.ActiveLimitFilter classloader=com.alibaba.dubbo.rpc.filter.ClassLoaderFilter context=com.alibaba.dubbo.rpc.filter.ContextFilter consumercontext=com.alibaba.dubbo.rpc.filter.ConsumerContextFilter exception=com.alibaba.dubbo.rpc.filter.ExceptionFilter executelimit=com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter deprecated=com.alibaba.dubbo.rpc.filter.DeprecatedFilter compatible=com.alibaba.dubbo.rpc.filter.CompatibleFilter timeout=com.alibaba.dubbo.rpc.filter.TimeoutFilter
能够看到,invoker经过以下的Filter组装成一个责任链
其中涉及到不少功能,包括权限验证,异常,超时等等,固然能够预计计算调用时间等等应该也是在这其中的某个类实现的,这里咱们能够看到export
和refer
过程都会被filter
过滤
在这里咱们能够看到export
和refer
分别对应了不一样的Wrapper
;export
对应的ListenerExporterWrapper
这块暂不去分析,由于此地方并无提供实现类
public class ProtocolListenerWrapper implements Protocol { private final Protocol protocol; public ProtocolListenerWrapper(Protocol protocol){ if (protocol == null) { throw new IllegalArgumentException("protocol == null"); } this.protocol = protocol; } public int getDefaultPort() { return protocol.getDefaultPort(); } public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } return new ListenerExporterWrapper<T>(protocol.export(invoker), Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class) .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY))); } public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } return new ListenerInvokerWrapper<T>(protocol.refer(type, url), Collections.unmodifiableList( ExtensionLoader.getExtensionLoader(InvokerListener.class) .getActivateExtension(url, Constants.INVOKER_LISTENER_KEY))); } public void destroy() { protocol.destroy(); } }
咱们看一下dubboProtocol的export方法:openServer(url)
经过上述代码分析完之后,咱们可以定位到DubboProtocol.export()
方法
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. String key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispaching event Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice){ String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){ if (logger.isWarnEnabled()){ logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } //暴露服务 openServer(url); return exporter; }
接着调用openServer
private void openServer(URL url) { // find server. String key = url.getAddress();//116.62.221.6:20880 //client 也能够暴露一个只有server能够调用的服务。 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true); if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null) {//没有的话就建立服务 serverMap.put(key, createServer(url)); } else { //server支持reset,配合override功能使用 server.reset(url); } } }
继续看其中的createServer方法:
private ExchangeServer createServer(URL url) { //默认开启server关闭时发送readonly事件 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); //默认开启heartbeat url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url); url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; }
建立服务,而后开启心跳监测,默认使用netty
。组装url
发现ExchangeServer是经过Exchangers建立的,直接看Exchanger.bind方法
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); return getExchanger(url).bind(url, handler); }
getExchanger方法实际上调用的是ExtensionLoader的相关方法,这里的ExtensionLoader是dubbo插件化的核心,咱们会在后面的插件化讲解中详细讲解,这里咱们只须要知道Exchanger的默认实现只有一个:HeaderExchanger。上面一段代码最终调用的是:
public static Exchanger getExchanger(URL url) { String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER); return getExchanger(type); } public static Exchanger getExchanger(String type) { return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type); }
public class HeaderExchanger implements Exchanger { public static final String NAME = "header"; public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } }
能够看到Server与Client实例均是在这里建立的,HeaderExchangeServer须要一个Server类型的参数,来自Transporters.bind()
:
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } return getTransporter().bind(url, handler); }
getTransporter()获取的实例来源于配置,默认返回一个NettyTransporter:
经过NettyTranport建立基于Netty
的server
服务
public class NettyTransporter implements Transporter { public static final String NAME = "netty"; public Server bind(URL url, ChannelHandler listener) throws RemotingException { return new NettyServer(url, listener); } public Client connect(URL url, ChannelHandler listener) throws RemotingException { return new NettyClient(url, listener); } }
在调用HeaderExchanger.bind()
方法的时候,是先new一个HeaderExchangeServer
这个server是干吗呢,是对当前这个连接去创建心跳机制
public class HeaderExchangeServer implements ExchangeServer { protected final Logger logger = LoggerFactory.getLogger(getClass()); private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1,new NamedThreadFactory("dubbo-remoting-server-heartbeat", true); // 心跳定时器 private ScheduledFuture<?> heatbeatTimer; // 心跳超时,毫秒。缺省0,不会执行心跳。 private int heartbeat; private int heartbeatTimeout; private final Server server; private volatile boolean closed = false; public HeaderExchangeServer(Server server) { if (server == null) { throw new IllegalArgumentException("server == null"); } this.server = server; this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); if (heartbeatTimeout < heartbeat * 2) { throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); } //心跳 startHeatbeatTimer(); } private void startHeatbeatTimer() { //关闭心跳定时 stopHeartbeatTimer(); if (heartbeat > 0) { //每隔heartbeat时间执行一次 heatbeatTimer = scheduled.scheduleWithFixedDelay( new HeartBeatTask( new HeartBeatTask.ChannelProvider() { //获取channels public Collection<Channel> getChannels() { return Collections.unmodifiableCollection( HeaderExchangeServer.this.getChannels() ); } }, heartbeat, heartbeatTimeout), heartbeat, heartbeat,TimeUnit.MILLISECONDS); } } //关闭心跳定时 private void stopHeartbeatTimer() { try { ScheduledFuture<?> timer = heatbeatTimer; if (timer != null && ! timer.isCancelled()) { timer.cancel(true); } } catch (Throwable t) { logger.warn(t.getMessage(), t); } finally { heatbeatTimer =null; } } }
心跳线程HeartBeatTask
在超时时间以内发送数据,在超时时间以外,是客户端的话,重连;是服务端,那么关闭服务端发布。
前面咱们知道,基于Spring的这个解析入口,到发布服务的过程,接着基于DubboProtocol
去发布,最终调用Netty
的api建立了一个NettyServer
。
那么继续沿着RegistryProtocol.export
这个方法,来看看注册服务的代码
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //export invoker final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); //registry provider final Registry registry = getRegistry(originInvoker); final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); registry.register(registedProviderUrl); // 订阅override数据 // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,由于subscribed以服务名为缓存的key,致使订阅信息覆盖。 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //保证每次export都返回一个新的exporter实例 return new Exporter<T>() { public Invoker<T> getInvoker() { return exporter.getInvoker(); } public void unexport() { try { exporter.unexport(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { registry.unregister(registedProviderUrl); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { overrideListeners.remove(overrideSubscribeUrl); registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } }; }
private RegistryFactory registryFactory; public void setRegistryFactory(RegistryFactory registryFactory) { this.registryFactory = registryFactory; } private Registry getRegistry(final Invoker<?> originInvoker){ URL registryUrl = originInvoker.getUrl();//得到registry://192.168.xx.xx:2181的协议地址 if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) { //获得zk的谢意地址 String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY); //registryUrl就会变成了zookeeper://192.168.xx.xx registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY); } //registryFactory是什么 return registryFactory.getRegistry(registryUrl); }
上述代码很明显了,经过分析,其实就是把registry
的协议头改为服务提供者配置的协议地址,就是咱们配置
的
<dubbo:registry address="zookeeper://192.168.xx.xx.2181"/>
而后registryFactory.getRegistry
的目的,就是经过协议地址匹配到对应的注册中心。那registryFactory
是一个什么样的对象呢,从上述咱们能够才,其是一个扩展点,而且咱们可以注意到这里面的一个方法上有一个@Adaptive
的注解,说明了其实一个自适应扩展点,按照咱们以前看过的代码,自适应扩展点加在方法层面上,表示会动态生成一个自适应的适配器,因此这个自适应适配器应该是RegistryFactory$Adaptive
@SPI("dubbo") public interface RegistryFactory { /** * 链接注册中心. * * 链接注册中心需处理契约:<br> * 1. 当设置check=false时表示不检查链接,不然在链接不上时抛出异常。<br> * 2. 支持URL上的username:password权限认证。<br> * 3. 支持backup=10.20.153.10备选注册中心集群地址。<br> * 4. 支持file=registry.cache本地磁盘文件缓存。<br> * 5. 支持timeout=1000请求超时设置。<br> * 6. 支持session=60000会话超时或过时设置。<br> * * @param url 注册中心地址,不容许为空 * @return 注册中心引用,总不返回空 */ @Adaptive({"protocol"}) Registry getRegistry(URL url); }
public class RegistryFactory$Adaptive implements com.alibaba.dubbo.registry.RegistryFactory { public com.alibaba.dubbo.registry.Registry getRegistry(com.alibaba.dubbo.common.URL arg0) { if (arg0 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg0; String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) " +"name from url(" + url.toString() + ") usekeys([protocol])")"; com.alibaba.dubbo.registry.RegistryFactory extension = (com.alibaba.dubbo.registry.RegistryFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.registry.Reg istryFactory.class).getExtension(extName); return extension.getRegistry(arg0); } }
咱们拿到这个动态生成的自适应扩展点,看看这段代码中的实现
zookeeper://
ExtensionLoader.getExtensionLoader(RegistryFactory.class).getExtension("zookeeper")
去得到一个指定的扩展点,而这个扩展点的配置在/dubbo-registry/dubbo-registry-zookeeper/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.registry.RegistryFactory
内容为
zookeeper=com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistryFactory
public class ZookeeperRegistryFactory extends AbstractRegistryFactory { private ZookeeperTransporter zookeeperTransporter; public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) { this.zookeeperTransporter = zookeeperTransporter; } public Registry createRegistry(URL url) { return new ZookeeperRegistry(url, zookeeperTransporter); } }
此方法中并无getRegistry
方法,而是在父类AbstractRegistryFactory
public Registry getRegistry(URL url) { url = url.setPath(RegistryService.class.getName()) .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()) .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY); String key = url.toServiceString(); // 锁定注册中心获取过程,保证注册中心单一实例 LOCK.lock(); try { Registry registry = REGISTRIES.get(key); if (registry != null) { return registry; } registry = createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry " + url); } REGISTRIES.put(key, registry); return registry; } finally { // 释放锁 LOCK.unlock(); } }
上述方法
REGISTRIES
中,根据key得到对应的Registry
public Registry createRegistry(URL url) { return new ZookeeperRegistry(url, zookeeperTransporter); }
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { super(url); if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); if (! group.startsWith(Constants.PATH_SEPARATOR)) { group = Constants.PATH_SEPARATOR + group; } this.root = group; zkClient = zookeeperTransporter.connect(url); zkClient.addStateListener(new StateListener() { public void stateChanged(int state) { if (state == RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); }
代码分析到这,咱们对于getRegistry
得出结论根据当前注册中心的配置信息,得到一个匹配的注册中心,也就是ZookeeperRegistry registry.register(registedProviderUrl)
继续往下看会调用对应的registry.register
去把dubbo://
的谢意地址注册到zookeeper
上,这个方法会调用FailbackRegistry
类中的register
,由于其父类FailbackRegistry
中存在register
方法,而这个类重写了此方法,因此咱们能够直接定位到FailbackRegistry
这个类中的register
方法中
@Override public void register(URL url) { super.register(url); failedRegistered.remove(url); failedUnregistered.remove(url); try { // 向服务器端发送注册请求 doRegister(url); } catch (Exception e) { Throwable t = e; // 若是开启了启动时检测,则直接抛出异常 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && ! Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if(skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); } // 将失败的注册请求记录到失败列表,定时重试 failedRegistered.add(url); } }
从名字上来看,是一个失败重试机制,调用父类的register
方法把当前url添加到缓存集合中,调用子类的doRegister
方法
protected void doRegister(URL url) { try { zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
能够看到,调用了zkclient.create
在zookeeper中建立节点
最后RegistryProtocol.export这个方法以后的代码再也不分析了,就是去服务提供端注册一个zookeeper监听,当监听发生变化的时候,服务端作相应的处理。