dubbo源码分析02:服务引用

1、什么时候建立服务引用 


  引用官方文档的原话,若是将Dubbo托管在Spring-IOC容器下,Dubbo服务引用的时机有两个,第一个是在Spring容器调用ReferenceBean的afterPropertiesSet方法时引用服务,第二个是在ReferenceBean对应的服务被注入到其余类中时引用。这两个引用服务的时机区别在于,第一个是饿汉式的,第二个是懒汉式的。默认状况下,Dubbo使用懒汉式引用服务。若是须要使用饿汉式,可经过配置 <dubbo:reference> 的init属性开启。下面咱们按照Dubbo默认配置进行分析,整个分析过程从ReferenceBean的getObject方法开始。当咱们的服务被注入到其余类中时,Spring会第一时间调用getObject方法,并由该方法执行服务引用逻辑。按照惯例,在进行具体工做以前,需先进行配置检查与收集工做。接着根据收集到的信息决定服务用的方式,有三种,第一种是引用本地(JVM)服务,第二是经过直连方式引用远程服务,第三是经过注册中心引用远程服务。不论是哪一种引用方式,最后都会获得一个Invoker实例。若是有多个注册中心,多个服务提供者,这个时候会获得一组Invoker实例,此时须要经过集群管理类Cluster将多个Invoker合并成一个实例。合并后的Invoker实例已经具有调用本地或远程服务的能力了,但并不能将此实例暴露给用户使用,这会对用户业务代码形成侵入。此时框架还须要经过代理工厂类(ProxyFactory)为服务接口生成代理类,并让代理类去调用 Invoker逻辑。避免了Dubbo框架代码对业务代码的侵入,同时也让框架更容易使用。
  直接从ReferenceConfig开始分析,API的形式使用dubbo时,设置好ReferenceConfig实例的各类参数后,调用get方法获取服务引用实例。另外,须要了解一点dubbo的配置,能够参考官网的“schema配置参考手册”。
 

2、服务引用的建立流程


  该类的get方法为获取服务引用方法。该方法比较简单,首先检查销毁标识“destroyed”,若是为true,表示该引用已经被销毁,不该再进行使用。接着检查服务引用是否为空,若是为空,则调用init方法进行初始化,不然返回该服务引用。
public synchronized T get() {
    if (destroyed) {
        throw new IllegalStateException("Already destroyed!");
    }
    if (ref == null) {
        init();
    }
    return ref;
}

  下面来重点看看init方法的源码,追踪一下服务引用是如何建立的,方法比较长,有较长的篇幅都在进行参数的校验、补全,须要点耐心看完。java

private void init() {
    //检查初始化标识,防止重复初始化
    if (initialized) {
        return;
    }
    initialized = true;
    
    //检查接口名是否合法
    if (interfaceName == null || interfaceName.length() == 0) {
        throw new IllegalStateException("<dubbo:reference interface=\"\"... not allow null!");
    }
    
    //检查ConsumerConfig变量是否为空(ConsumerConfig为ReferenceConfig提供了某些属性的默认值):
    //(1)若是ConsumerConfig为null,则new一个;
    //(2)调用appendProperties(AbstractConfig config)方法完善ConsumerConfig的配置;
    checkDefault();
    
    //调用appendProperties(AbstractConfig config)方法完善ReferenceConfig的配置,该方法逻辑以下:
    //(1)检查AbstractConfig中每个setXXX(原始类型)或isXXX(原始类型)的方法,对XXX属性进行配置;
    //(2)按优先级从高到低的顺序,依次从System.getProperty、配置中心、AbstractConfig对应getXXX返回值、
    //dubbo本地配置文件中进行查找XXX的属性值并进行设置;
    appendProperties(this);
    
    //设置成员变量“泛化引用标识”,若是为空则从成员变量ConsumerConfig中获取该标识的值
    if (getGeneric() == null && getConsumer() != null) {
        setGeneric(getConsumer().getGeneric());
    }
    
    //判断泛化标识的值是否为真,作这个判断的缘由是由于泛化标识为字符串类型
    if (ProtocolUtils.isGeneric(getGeneric())) {
        //若是为真,则将interfaceClass设置为GenericService
        interfaceClass = GenericService.class;
    } else {
        //若是为假,则经过当前的类加载器加载interfaceName,获取interfaceClass
        try {
            interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader());
        } catch (ClassNotFoundException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
        //(1)校验interfaceClass是否为null、是否为接口类型;
        //(2)若是配置了List<MethodConfig>,须要校验interfaceClass是否有相应的方法
        checkInterfaceAndMethods(interfaceClass, methods);
    }
    
    /****************************** begin ******************************/  
    //下面代码块的做用是尝试从系统属性或配置文件中获取interfaceName的配置,
    //该配置值赋给成员变量String url,用于服务消费方点对点调用服务提供方。    
    String resolve = System.getProperty(interfaceName);
    String resolveFile = null;
    if (resolve == null || resolve.length() == 0) {
        resolveFile = System.getProperty("dubbo.resolve.file");
        if (resolveFile == null || resolveFile.length() == 0) {
            File userResolveFile = new File(new File(System.getProperty("user.home")), "dubbo-resolve.properties");
            if (userResolveFile.exists()) {
                resolveFile = userResolveFile.getAbsolutePath();
            }
        }
        if (resolveFile != null && resolveFile.length() > 0) {
            Properties properties = new Properties();
            FileInputStream fis = null;
            try {
                fis = new FileInputStream(new File(resolveFile));
                properties.load(fis);
            } catch (IOException e) {
                throw new IllegalStateException("Unload " + resolveFile + ", cause: " + e.getMessage(), e);
            } finally {
                try {
                    if (null != fis) fis.close();
                } catch (IOException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
            resolve = properties.getProperty(interfaceName);
        }
    }
    if (resolve != null && resolve.length() > 0) {
        url = resolve;
          //省略了日志打印的代码
    }    
    /****************************** end ******************************/  
    
    /****************************** begin ******************************/  
    //下面代码块的做用是检测ApplicationConfig、ModuleConfig、RegistryConfig、MonitorConfig
    //这几个核心配置是否为空。若是为空,则尝试从其余配置中获取。
    if (consumer != null) {
        if (application == null) {
            application = consumer.getApplication();
        }
        if (module == null) {
            module = consumer.getModule();
        }
        if (registries == null) {
            registries = consumer.getRegistries();
        }
        if (monitor == null) {
            monitor = consumer.getMonitor();
        }
    }
    if (module != null) {
        if (registries == null) {
            registries = module.getRegistries();
        }
        if (monitor == null) {
            monitor = module.getMonitor();
        }
    }
    if (application != null) {
        if (registries == null) {
            registries = application.getRegistries();
        }
        if (monitor == null) {
            monitor = application.getMonitor();
        }
    }
    //相似于checkDefault方法检查ConsumerConfig,该方法检查ApplicationConfig是否为空并完善其各字段值
    checkApplication(); 
    //检查ReferenceConfig的local、stub、mock配置项是否正确
    checkStubAndMock(interfaceClass);
    /****************************** end ******************************/  
    
    /****************************** start ******************************/  
    //下面代码块的做用是收集配置,并将配置存储在一个map中
    Map<String, String> map = new HashMap<String, String>();
    Map<Object, Object> attributes = new HashMap<Object, Object>();
    map.put(Constants.SIDE_KEY, Constants.CONSUMER_SIDE); //side=consumer
    map.put(Constants.DUBBO_VERSION_KEY, Version.getVersion()); //dubbo=2.6.2
    map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); //timestamp=时间戳
    if (ConfigUtils.getPid() > 0) {
        map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); //pid=进程pid
    }
    //判断是不是泛化引用
    if (!isGeneric()) {
        //非泛化引用,设置revision=interfaceClass的jar版本号
        String revision = Version.getVersion(interfaceClass, version);
        if (revision != null && revision.length() > 0) {
            map.put("revision", revision);
        }
        //设置接口方法,methods=xxx1,xxx2,...
        String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
        if (methods.length == 0) {
            logger.warn("NO method found in service interface " + interfaceClass.getName());
            map.put("methods", Constants.ANY_VALUE);
        } else {
            map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
        }
    }
    map.put(Constants.INTERFACE_KEY, interfaceName); //interface=interfaceName
    //将ApplicationConfig、ModuleConfig、ConsumerConfig、ReferenceConfig的值设置到map中,
    //(1)获取方法名为getXXX或者isXXX、public、无方法参数、返回值为原始值的非getClass方法;
    //(2)获取(1)方法上的@Paramter注解,根据该注解的excluded、escaped、append属性判断该属性值
    //是否须要被忽略、是否须要URLEncoded、是否须要以追加的形式设置入map(","做为追加值分隔符);
    //(3)获取方法名为getParameters、public、无方法参数、返回值为Map的方法;
    //(4)将(3)中的方法返回值Map的key-value键值对作key处理以后(添加前缀、"-"变"."),设置入map;
    appendParameters(map, application);
    appendParameters(map, module);
    appendParameters(map, consumer, Constants.DEFAULT_KEY);
    appendParameters(map, this);
    /****************************** end ******************************/  

    /****************************** start ******************************/  
    //下面代码块的做用是处理MethodConfig 实例。该实例包含了事件通知配置如onreturn、onthrow、oninvoke等。
    //因为通常不会使用到MethodConfig配置,咱们先暂时忽略这个配置的代码
    String prefix = StringUtils.getServiceKey(map);
    if (methods != null && !methods.isEmpty()) {
        for (MethodConfig method : methods) {
            appendParameters(map, method, method.getName());
            String retryKey = method.getName() + ".retry";
            if (map.containsKey(retryKey)) {
                String retryValue = map.remove(retryKey);
                if ("false".equals(retryValue)) {
                    map.put(method.getName() + ".retries", "0");
                }
            }
            appendAttributes(attributes, method, prefix + "." + method.getName());
            checkAndConvertImplicitConfig(method, map, attributes);
        }
    }
    /****************************** end ******************************/  

    /****************************** start ******************************/  
    //下面代码块的做用是设置服务消费者的IP并存储到map中
    String hostToRegistry = ConfigUtils.getSystemProperty(Constants.DUBBO_IP_TO_REGISTRY);
    if (hostToRegistry == null || hostToRegistry.length() == 0) {
        hostToRegistry = NetUtils.getLocalHost();
    } else if (isInvalidLocalHost(hostToRegistry)) {
        throw new IllegalArgumentException("Specified invalid registry ip from property ... value: ...");
    }
    map.put(Constants.REGISTER_IP_KEY, hostToRegistry); //register.ip=实际的IP地址
    /****************************** end ******************************/  

    //将attributes存入静态上下文
    StaticContext.getSystemContext().putAll(attributes);
    
    //根据map建立服务应用代理,下一小节将对该方法进行详细说明
    ref = createProxy(map);
    
    //将服务接口名、ReferenceConfig、服务引用实例、服务接口方法包装成ConsumerModel并存入ApplicationModel
    ConsumerModel consumerModel = new ConsumerModel(getUniqueServiceName(), this, ref, interfaceClass.getMethods());
    ApplicationModel.initConsumerModel(getUniqueServiceName(), consumerModel);
}

 

3、建立代理对象


  本小节对createProxy方法重点讲解,它是初始化Reference中最重要的方法:
private T createProxy(Map<String, String> map) {
    //构建临时的URL,构造方法参数依次为protocol、host、port、parameter
    URL tmpUrl = new URL("temp", "localhost", 0, map);
    
     //判断是不是JVM内的引用
    final boolean isJvmRefer; 
    if (isInjvm() == null) {       
        if (url != null && url.length() > 0) { 
            //点对点直连参数"url"有值,则不为JVM内引用
            isJvmRefer = false;
        } else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
            //调用InjvmProtocol的isInjvmRefer方法,判断是不是JVM内引用
            isJvmRefer = true;
        } else {
            //默认不是JVM内引用
            isJvmRefer = false;
        }
    } else {
        //获取injvm配置值
        isJvmRefer = isInjvm().booleanValue();
    }
    
    /******************************* injvm调用 *******************************/
    if (isJvmRefer) {
        //若是是JVM内的引用,则建立JVM调用的上下文URL,protocol=injvm,host=127.0.0.1,
        //port=0,path=interfaceClass.getName(),而且将参数map设置入URL的parameters中
        URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);        
        //refprotocol为SPI接口Protocol的自适应扩展类,且refer方法有@Adaptive注解,
        //运用SPI机制源码分析中的知识,Protocol接口的自适应扩展类的refer代码,
        //会经过调用URL类型参数的getProtocol方法获得实际应该获取到的扩展类name,即injvm。
        //在源码的dubbo-rpc-injvm模块下,找到protocol的配置文件,
        //其中配置了injvm的扩展类为org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol。
        //那么这里获取到的invoker即为InjvmProtocol.refer的返回结果,即InjvmInvoker
        invoker = refprotocol.refer(interfaceClass, url);        
    } else {
        /******************************* 点对点直连调用 *******************************/   
        //若是成员变量url不为空,表示要作直连调用。url是一个String,维护服务提供者地址
        if (url != null && url.length() > 0) { 
            //使用";"切分url字符串,表示若是想传入多个地址,使用";"分割便可
            String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
            //进行遍历
            if (us != null && us.length > 0) {
                for (String u : us) {
                    //解析每一个地址字符串,将其转换为URL对象。地址字符串支持协议头、验证用户密码、IP、PORT、
                    //等其余调用参数(如protocol)
                    URL url = URL.valueOf(u);
                    //若是地址字符串中未包含服务路径,则进行补全,即接口的全限定名
                    if (url.getPath() == null || url.getPath().length() == 0) {
                        url = url.setPath(interfaceName);
                    }
                    //检测url协议是否为registry,如果,代表用户想使用指定的注册中心
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        //将map转换为查询字符串,并做为refer参数的值添加到url的map字段中,
                        //注意:这里经过字符串生成的url还没维护该方法的传入参数map。
                        urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, 
                                                            StringUtils.toQueryString(map)));
                    } else {
                        //合并用户经过直连地址字符串配置的调用参数与其余途径配置的调用参数:
                        //(1)移除服务提供者的一些配置(这些配置来源于用户配置的url属性),如线程池相关配置;
                        //(2)保留服务提供者的部分配置,好比版本,group,时间戳等;
                        //(3)最后将合并后的配置设置为url查询字符串中。
                        urls.add(ClusterUtils.mergeUrl(url, map));
                    }
                }
            }        
        } else {  
            /******************************* 经过注册中心调用 *******************************/
            //加载注册中心配置List<Registry>,转换为List<URL>
            List<URL> us = loadRegistries(false);
            //遍历注册中心URL
            if (us != null && !us.isEmpty()) {
                for (URL u : us) {
                    //获取监控URL
                    URL monitorUrl = loadMonitor(u);
                    if (monitorUrl != null) {
                        map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                    }
                    //(1)将本方法的请求参数map转换为查询字符串形式"key1=value1&key2=value2...";
                    //(2)以refer做为key,(1)的结果encoded做为value,存入注册中心URL的paramters参数中;
                    //(3)将注册中心URL存入ReferenceConfig的全局变量List<URL> urls中;
                    urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                }
            }
            if (urls == null || urls.isEmpty()) {
                throw new IllegalStateException("No such any registry to reference ...");
            }
        }
              
        //若是成员变量List<URL> urls大小为1,则直接经过Protocol自适应拓展类构建Invoker实例接口。
        //该自适应扩展类由字节码技术生成,其refer方法具备@Adaptive注解,根据SPI机制的源码知识,
        //refer方法按照参数URL中getProtocol的值查找实际的扩展类实例,若是getProtocol没有值,
        //则取Protocol接口的@SPI注解value值"dubbo"做为name查找扩展类实例。通常来讲,若是经过注册
        //中心进行调用,则getProtocol获取到的值为registry,对应RegistryProtocol这个扩展类;而若是
        //直连调用,getProtocol为空或者是指定的协议(通常为dubbo协议),对应扩展类DubboProtocol。
        if (urls.size() == 1) {
            invoker = refprotocol.refer(interfaceClass, urls.get(0));
        } else {
            List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
            URL registryURL = null;
            for (URL url : urls) {
                invokers.add(refprotocol.refer(interfaceClass, url));
                if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                    registryURL = url; // use last registry url
                }
            }
            if (registryURL != null) { // registry url is available
                // use AvailableCluster only when register's cluster is available
                URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                invoker = cluster.join(new StaticDirectory(u, invokers));
            } else { // not a registry url
                invoker = cluster.join(new StaticDirectory(invokers));
            }
        }
    }

    //校验标识
    Boolean c = check;
    if (c == null && consumer != null) {
        c = consumer.isCheck();
    }
    if (c == null) {
        c = true; // default true
    }
    //检查invoker是否可用,最终调用的是Curator客户端的getZookeeperClient().isConnected()方法
    if (c && !invoker.isAvailable()) {
        throw new IllegalStateException("Failed to check the status of the service ...");
    }
    //...省略日志打印代码
    
    //建立代理对象,proxyFactory是SPI接口ProxyFactory的自适应扩展类,经过ProxyFactory的定义可知,
    //在未设置URL的proxy属性时,获取到默认的扩展类JavassistProxyFactory,可是ProxyFactory接口拥有
    //一个包装扩展类StubProxyFactoryWrapper,所以实际获取到的是StubProxyFactoryWrapper实例,并调用
    //它的getProxy方法
    return (T) proxyFactory.getProxy(invoker);
}

 

3.2.注册中心协议:RegistryProtocol


  该Protocol的扩展实现类,根据承载了注册中心信息的URL以及服务接口类型建立服务引用Invoker,建立方法为refer。
/**
 * 获取服务引用的I
 */
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
    //从url的parameters字段中获取key=registry即注册中心的协议头,如zookeeper,
    //在将其设置为protocol,以后移除paramters字段中registry这个key
    url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
    
    //在获取到RegistryProtocol这个扩展类实例时,dubbo SPI机制会自动装配它的RegistryFactory字段,
    //RegistryFactory是一个SPI接口,则实际装配的是RegistryFactory的自适应扩展类。
    //另外,RegistryFactory的getRegistry方法被@Adaptive注解,且注解的value值为"protocol",
    //所以RegistryFactory自定义扩展类会调用方法参数URL的getProtocol,以其返回值做为实际扩展类的name。
    //通常咱们使用的注册中心为zookeeper,那么最终会调用到ZookeeperRegistryFactory的getRegistry方法。
    Registry registry = registryFactory.getRegistry(url);
    
    //若是要获取的服务引用为RegistryService,直接调用proxyFactory的getInvoker方法获取Invoker
    if (RegistryService.class.equals(type)) {
        return proxyFactory.getInvoker((T) registry, type, url);
    }

    //将url中paremeters的key=refer的value查询字符串从新转为Map
    Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
    //获取group配置
    String group = qs.get(Constants.GROUP_KEY);
    //若group不为空,而且有多个分组
    if (group != null && group.length() > 0) {
        if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
            //经过SPI加载MergeableCluster实例,并调用doRefer继续执行获取服务引用逻辑
            return doRefer(getMergeableCluster(), registry, type, url);
        }
    }
    //单group或无group,使用RegistryProtocol默认装配的Cluster自适应扩展类调用doRefer方法
    return doRefer(cluster, registry, type, url);
}

/**
 * 建立Invoker对象
 */
 private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
     //建立RegistryDirectory服务目录,服务目录相关内容参考下一节,注意每次走到doRefer都会new。
     //维度是某个服务的(type)的在某个注册中心(URL)的服务目录
     RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
     directory.setRegistry(registry); //添加注册中心属性
     directory.setProtocol(protocol); //添加协议自适应扩展类
     
     //生成服务消费者URL,protocol=consumer,ip=register.ip的值,port=0,path=type的全限定名,格式:
     //consumer://192.168.54.1/com.alibaba.dubbo.rpc.service.GenericService?
     //application=monitor-app&check=false&dubbo=2.6.2&generic=true&
     //interface=com.bestpay.monitor.app.AlarmTestService&pid=6332&
     //protocol=dubbo&retries=-1&side=consumer&timeout=10000&timestamp=1561427241838
     Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());      
     URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
     
     //使用服务消费者URL进行注册
     if (!Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) {
         //调用Registry的register方法,通常为ZookeeperRegistry,它调用FailRegistry的register方法(见下)
         registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, 
                 Constants.CONSUMERS_CATEGORY,Constants.CHECK_KEY, String.valueOf(false)));
     }
     
     //向subscribeUrl的parameter参数添加键值对"category" -> "providers,configurators,routers"
     //订阅服务提供方的zkNode下的providers、configurators、routers等节点数据
     //(1)将RegistryDirectory中的consumerUrl设置为subscribeUrl;
     //(2)调用Registry的subscribe方法,该方法:
     //(2.1)调用父类AbstractRegistry方法,向成员变量subscribed设置值;
     //(2.2)移除failedSubscribed、failedUnsubscribed、failedNotified该subscribeUrl相关数据
     //(3)若是订阅失败,则尝试从ZookeeperRegistry初始化时从缓存文件读取到的数据中获取到URLs,
     //且若是URLs不为空,则向它们发送订阅失败的通知;若是为空,且check=true,则直接抛出异常;
     //不然将url加入failedSubscribed
     directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY
                + "," + Constants.ROUTERS_CATEGORY));

     //调用SPI接口Cluster的自适应扩展类的join,根据Cluster定义能够知道自适应扩展类
     //应该获取一个FailoverCluster实例,可是MockClusterWrapper是Cluster扩展类中的包装类,
     //所以FailoverCluster会被包装起来返回,最终自适应扩展类获取到MockClusterWrapper实例。
     //调用扩展类MockClusterWrapper的join方法,该方法建立了一个MockClusterInvoker实例,
     //并维护了directory以及一个FailoverClusterInvoker。
     Invoker invoker = cluster.join(directory);
     
     //将服务引用invoker、registryUrl、consumerUrl、服务目录directory包装成ConsumerInvokerWrapper,
     //而后以serviceUniqueName = consumerUrl.getServiceKey()作为key,存入
     //在ProviderConsumerRegTable中的static变量ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>>
     ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
     return invoker;
 }

 

3.3.zookeeper注册中心工厂:ZookeeperRegistryFactory


  注册中心协议RegistryProtocol在断定注册中心的协议为zookeeper后,会调用ZookeeperRegistryFactory的getRegistry方法来建立一个Zookeeper注册中心对象ZookeeperRegistry。该Factory缓存了全部zookeeper注册中心的实例。
//dubbo的SPI机制决定了Dubbo运行过程当中同一个扩展类实例只有一个,ZookeeperRegistryFactory中具备一个注册中心的缓
//存,key为"zookeeper://172.17.45.14:2181/com.alibaba.dubbo.registry.RegistryService",即以下格式
//"protocol://username:password@ip:port/com.alibaba.dubbo.registry.RegistryService"。更多格式参考
//URL类的toServiceString方法
private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap<String, Registry>();

//实例化扩展类ZookeeperRegistryFactory时,会经过SPI的注入ZookeeperTransporter的自适应扩展类
private ZookeeperTransporter zookeeperTransporter;

/**
 * 获取一个注册中心封装对象
 */
public Registry getRegistry(URL url) {
    //克隆url,并将Path设置为RegistryService的全限定名;
    //在URL的parameters参数中添加interface=RegistryService的全限定名键值对;
    //在URL的parameters参数中移除key=export、key=refer;
    url = url.setPath(RegistryService.class.getName())
        .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
        .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
    
    //获取注册中心Registry的缓存key
    String key = url.toServiceString();
    LOCK.lock();
    try {
        //从缓存中尝试获取
        Registry registry = REGISTRIES.get(key);
        if (registry != null) {
            return registry;
        }
        //若是没有获取,则调用建立方法建立缓存
        registry = createRegistry(url);
        if (registry == null) {
            throw new IllegalStateException("Can not create registry " + url);
        }
        //存入缓存
        REGISTRIES.put(key, registry);
        return registry;
    } finally {
        LOCK.unlock();
    }
}

/**
 * 建立缓存,直接new一个ZookeeperRegistry对象
 */
public Registry createRegistry(URL url) {
    return new ZookeeperRegistry(url, zookeeperTransporter);
}

 

3.4.zookeeper注册中心:ZookeeperRegistry


3.4.1.构造方法  


        ZookeeperRegistry为dubbo封装的基于zookeeper的注册中心,它并不为一个SPI接口,对于每一个注册中心都是经过new出来的实例,下面研究构造方法。
/**
 * ZookeeperRegistry的构造方法
 */
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    //调用父类FailbackRegistry的构造方法,方法见下
    super(url);
    //判断url中的host字段是否为"0.0.0.0"或者为表示anyhost的true
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    //获取url中key为group的参数值,若是未获取到,则给默认值"dubbo"
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    //判断group是否以"/"开头,若是不是,则为其添加"/"前缀
    if (!group.startsWith(Constants.PATH_SEPARATOR)) {
        group = Constants.PATH_SEPARATOR + group;
    }  
    this.root = group;
    
    //调用SPI接口ZookeeperTransporter的自适应扩展类,该扩展类为dubbo经过字节码技术生成。
    //connect方法被@Adaptive注解修饰,而且要求从key为client以及transporter中取值,若是
    //这两个参数没有值,则扩展类name取@SPI注解的value值"curator",通常来讲都是这个值。
    //那么自适应扩展类最终会调用CuratorZookeeperTransporter类的connect方法获取Zk客户端。
    zkClient = zookeeperTransporter.connect(url);
    
    //向获取到的zkClient设置监听器
    zkClient.addStateListener(new StateListener() {
        @Override
        public void stateChanged(int state) {
            if (state == RECONNECTED) {
                try {
                    //若是是RECONNECTED状态,则进行恢复
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    });
}

   FailbackRegistry为ZookeeperRegistry的父类,定义了注册中心的重试逻辑:apache

//定时任务线程池,用于周期性的执行重试任务
private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));

//定时任务线程执行结果
private final ScheduledFuture<?> retryFuture;

//须要重试的注册失败的URL
private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();

//须要重试的注销失败的URL
private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();

//须要重试的订阅失败的URL
private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();

//须要重试的解除订阅失败的URL
private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();

//须要重试的通知失败的URL
private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();

/**
 * FailbackRegistry构造函数
 */
public FailbackRegistry(URL url) {
    //调用父类AbstractRegistry的构造方法,方法见下
    super(url);
    //获取重试周期,默认5000毫秒
    int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
    //建立一个定时线程池,以retryPeriod为周期定时调用retry方法
    this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
        @Override
        public void run() {
            try {
                //尝试注册失败、解注册失败、订阅失败、解订阅失败、通知失败的列表
                retry();
            } catch (Throwable t) { // Defensive fault tolerance
                logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
            }
        }
    }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
}

   AbstractRegistry为FailbackRegistry的父类,其定义了从缓存文件加载配置、缓存配置到文件、注册、解注册、订阅、解除订阅等Registry的主要功能。api

//注册URL列表
private final Set<URL> registered = new ConcurrentHashSet<URL>();

//订阅URL列表
private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();

//通知URL列表
private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();

/**
 * AbstractRegistry构造函数
 */
public AbstractRegistry(URL url) {
    //设置成员变量registryUrl=url
    setUrl(url);
       //获取文件的同步保存标识
    syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
     //获取缓存文件名,格式为"C:\Users\chenjunyi/.dubbo/dubbo-registry-monitor-app-172.17.45.14:2181.cache"
    String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getParameter(Constants.APPLICATION_KEY) + "-" + url.getAddress() + ".cache");
    File file = null;
    if (ConfigUtils.isNotEmpty(filename)) {
        file = new File(filename);
        if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
            if (!file.getParentFile().mkdirs()) {
                throw new IllegalArgumentException("Invalid registry store file ...");
            }
        }
    }
    this.file = file;
    //读取缓存的配置文件,将读取结果存到Properties properties成员属性中
    loadProperties();
    //拆分注册中心URL中的address,将其backup的地址与主要地址拆成List<URL>,而后通知其监听器
    notify(url.getBackupUrls());
}

 

3.4.2.register方法


  ZookeeperRegistry的register方法继承自父类FailbackRegistry,它将服务消费/提供者的URL注册到注册中心上,并未重写该方法:
/**
 * 继承自父类FailbackRegistry的register方法,用于注册服务引用(消费者)的URL
 */
public void register(URL url) {
    //调用父类AbstractRegistry的register方法
    super.register(url);
    failedRegistered.remove(url); //从注册失败列表中移除该url    
    failedUnregistered.remove(url); //从注销失败列表中移除该url  
    try {
        //勾起实际的注册方法
        doRegister(url);
    } catch (Exception e) {
        //若是抛出异常
        Throwable t = e;
        //判断check标识,从ZookeeperRegistry的registerUrl,即注册中心URL的paramters获取key=check的值,
        //从消费者URL,即url的parameters获取key=check的值,以及获取url的protocol。
        //计算flag check的布尔值
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
            && url.getParameter(Constants.CHECK_KEY, true)
            && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
        //判断异常类型是否为SkipFailbackWrapperException
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        //若是都不该该跳过异常,则抛出异常,不然仅仅是打印异常
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        //对于注册失败的消费者URL,添加到注册失败列表中Set<URL> failedRegistered
        failedRegistered.add(url);
    }
}

/**
 * 由ZookeeperRegistry实现的doRegister方法
 */
protected void doRegister(URL url) {
    try {
        //调用CuratorZookeeperClient建立节点,以泛化调用为例,传入的参数URL格式为
        //consumer://192.168.54.1/com.alibaba.dubbo.rpc.service.GenericService?
        //application=monitor-app&category=consumers&check=false&dubbo=2.6.2&
        //generic=true&interface=com.bestpay.monitor.app.AlarmTestService&pid=19616&
        //protocol=dubbo&retries=-1&side=consumer&timeout=10000&timestamp=1561429026207
        //能够看出,它的path路径GenericService不必定等于interface
        //(1)toUrlPath(url)获取须要建立节点路径,以消费者为例,其格式为
        //"/dubbo/com.bestpay.monitor.app.AlarmTestService/consumers/consumer%3A%2F%2F192.168.54.1%2F
        //com.alibaba.dubbo.rpc.service.GenericService%3Fapplication%3Dmonitor-app%26..."
        //能够看出,真正的建立节点路径是interface接口做为path的路径
        //(2)url.getParameter(Constants.DYNAMIC_KEY, true)决定是否建立临时节点,true-临时节点。
        //而CuratorZookeeperClient内部的create逻辑为:
        //(1)截取示例中的"/dubbo/com.bestpay.monitor.app.AlarmTestService/consumers"做为
        //zkNode的父路径并一级级建立父节点consumers,父节点都为永久节点;
        //(2)根据第二个参数的值决定建立的每一个消费者节点(即截取父路径后遗留的字符串)是否为临时节点,
        //true-临时节点;false-永久节点;        
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

 

3.4.3.subscribe方法


  ZookeeperRegistry的subscribe方法继承自父类FailbackRegistry,它订阅服务提供者ZK节点下的routers节点、configurators节点以及providers节点,并未重写该方法。该方法在实际的调用过程当中,被RegistryDirectory的subscribe勾起:
/**
 * RegistryDirectory的subscribe方法
 */
public void subscribe(URL url) {
    setConsumerUrl(url); //简单的设置RegistryDirectory成员变量consumerUrl
    //勾起成员变量Registry的subscribe方法,并将自身做为NotifyListener传入
    registry.subscribe(url, this); 
}

   调起继承自FailbackRegistry的subscribe方法:缓存

/**
 * 继承自FailbackRegistry的subscribe方法
 */ 
public void subscribe(URL url, NotifyListener listener) {
    //调用父类的订阅方法,
    super.subscribe(url, listener);
    //从failedSubscribed、failedUnsubscribed、failedNotified列表中移除该url对应的listener
    removeFailedSubscribed(url, listener);
    try {
        //调用由ZookeeperRegistry实现的doSubscribe方法
        doSubscribe(url, listener);
    } catch (Exception e) {
        //若是doSubscribe执行抛出异常
        Throwable t = e;
        //从加载的文件缓存Properties中获取url.getServiceKey对应的缓存
        List<URL> urls = getCacheUrls(url);
        if (urls != null && !urls.isEmpty()) {
            //若是读取到的缓存不为空,则对该url进行通知,通知的内容是文件缓存的内容
            notify(url, listener, urls);
            logger.error("Failed to subscribe ... Using cached list: ...");
        } else {
            //获取check标识,getUrl方法获取的是ZookeeperRegistry维护的registryUrl,
            //而url指的是服务消费者的URL,从它们的parameters字段获取check这个key的value;
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true);
            //判断异常类型
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            //决定订阅失败的处理是继续抛出异常仍是打印错误日志
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to subscribe ... cause: ");
            } else {
                logger.error("Failed to subscribe url ... waiting for retry ...");
            }
        }
        //向成员变量 ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed添加订阅失败的数据
        addFailedSubscribed(url, listener);
    }
}

/**
 * 由ZookeeperRegistry实现的doSubscribe方法
 */
protected void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            //若是方法参数URL的paramters中key=interface为*,暂时先不讨论
            String root = toRootPath();
            ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
            if (listeners == null) {
                zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                listeners = zkListeners.get(url);
            }
            ChildListener zkListener = listeners.get(listener);
            if (zkListener == null) {
                listeners.putIfAbsent(listener, new ChildListener() {
                    @Override
                    public void childChanged(String parentPath, List<String> currentChilds) {
                        for (String child : currentChilds) {
                            child = URL.decode(child);
                            if (!anyServices.contains(child)) {
                                anyServices.add(child);
                                subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
                                                                           Constants.CHECK_KEY, String.valueOf(false)), listener);
                            }
                        }
                    }
                });
                zkListener = listeners.get(listener);
            }
            zkClient.create(root, false);
            List<String> services = zkClient.addChildListener(root, zkListener);
            if (services != null && !services.isEmpty()) {
                for (String service : services) {
                    service = URL.decode(service);
                    anyServices.add(service);
                    subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
                                                                 Constants.CHECK_KEY, String.valueOf(false)), listener);
                }
            }
        } else {
            //方法参数URL的paramters中key=interface不为*
            List<URL> urls = new ArrayList<URL>();
            //经过toCategoriesPath方法生成要订阅的ZK节点路径,以interface=AlarmTestService为例:
            //(1)/dubbo/com.bestpay.monitor.app.AlarmTestService/providers;
            //(2)/dubbo/com.bestpay.monitor.app.AlarmTestService/configurators;
            //(3)/dubbo/com.bestpay.monitor.app.AlarmTestService/routers;
            //而后遍历这3个路径
            for (String path : toCategoriesPath(url)) {
                //获取这个ZookeeperRegistry中的服务消费者URL对应的监听器Map
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                    listeners = zkListeners.get(url);
                }
                //获取NotifyListener,即RegistryDirectory对应的ChildListener。
                //通常来讲RegistryDirectory与注册中心Registry和服务引用接口(如GenericService)绑定。
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    //若是没有ChildListener,则建立一个并设置到listeners这个map中
                    listeners.putIfAbsent(listener, new ChildListener() {
                        @Override
                        public void childChanged(String parentPath, List<String> currentChilds) {
                            ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                //建立"XXXX/XXX/providers、configurators、providers"永久节点
                zkClient.create(path, false);
                //向Dubbo封装的ZkClient添加ChildListener
                List<String> children = zkClient.addChildListener(path, zkListener);
                if (children != null) {
                    urls.addAll(toUrlsWithEmpty(url, path, children));
                }
            }
            notify(url, listener, urls);
        }
    } catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

   相似于register方法,FallbackRegistry首先调用父类AbstractRegistry的subscribe,在通过一系列的校验以后,向成员变量ConcurrentMap<URL, Set<NotifyListener>> subscribed添加服务引用(即消费者)的URL和监听器。并发

/**
 * AbstractRegistry的subscribe方法
 */
public void subscribe(URL url, NotifyListener listener) {
    if (url == null) {
        throw new IllegalArgumentException("subscribe url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("subscribe listener == null");
    }
    if (logger.isInfoEnabled()) {
        logger.info("Subscribe: " + url);
    }
    Set<NotifyListener> listeners = subscribed.get(url);
    if (listeners == null) {
        subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
        listeners = subscribed.get(url);
    }
    listeners.add(listener);
}

 

3.4.4.恢复与重试


  为了保障链接可以重试与恢复,在ZookeeperRegistry中为CuratorZookeeperClient设置了状态监听器,对于Curator通知的链接RECONNECTED事件,会勾起一个recover方法;在FailbackRegistry的构造方法中,同时设置了一个定时线程,用于调用retry方法,该方法捞取注册失败、解注册失败、订阅失败、解订阅失败、通知失败的URL列表并重试。下面来看看这两个方法是如何工做的:
/**
 * 用于根据Curator客户端推送的链接状态,RECONNECTED进行恢复
 */
protected void recover() throws Exception {
    //获取恢复的注册中心的地址URL列表Set<URL>,getRegistered()获取成员变量Set<URL>
    Set<URL> recoverRegistered = new HashSet<URL>(getRegistered());
    if (!recoverRegistered.isEmpty()) {
        if (logger.isInfoEnabled()) {
            logger.info("Recover register url " + recoverRegistered);
        }
        //将getRegistered()添加到成员变量Set<URL> failedRegistered中
        for (URL url : recoverRegistered) {
            failedRegistered.add(url);
        }
    }
    
    //获取恢复的订阅列表Map<URL, Set<NotifyListener>>,getSubscribed()获取成员变量subscribed
    Map<URL, Set<NotifyListener>> recoverSubscribed = new HashMap<URL, Set<NotifyListener>>
                                                                            (getSubscribed());
    if (!recoverSubscribed.isEmpty()) {
        if (logger.isInfoEnabled()) {
            logger.info("Recover subscribe url " + recoverSubscribed.keySet());
        }
        for (Map.Entry<URL, Set<NotifyListener>> entry : recoverSubscribed.entrySet()) {
            URL url = entry.getKey();
            //添加到成员变量failedSubscribed中
            for (NotifyListener listener : entry.getValue()) {
                addFailedSubscribed(url, listener);
            }
        }
    }
}

/**
 * 对于成员变量failedRegistered、failedUnregistered、failedSubscribed、
 * failedUnsubscribed、failedNotified进行重试,能够看到,若是重试成功则将
 * 其移出相应的重试列表,若是重试失败,则忽略异常等待下次重试
 */
protected void retry() {
    if (!failedRegistered.isEmpty()) {
        Set<URL> failed = new HashSet<URL>(failedRegistered);
        if (failed.size() > 0) {
            if (logger.isInfoEnabled()) {
                logger.info("Retry register " + failed);
            }
            try {
                for (URL url : failed) {
                    try {
                        doRegister(url);
                        failedRegistered.remove(url);
                    } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                        logger.warn("Failed to retry register ... waiting for again, cause: ...");
                    }
                }
            } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                logger.warn("Failed to retry register ... waiting for again, cause: ...");
            }
        }
    }
    if (!failedUnregistered.isEmpty()) {
        Set<URL> failed = new HashSet<URL>(failedUnregistered);
        if (!failed.isEmpty()) {
            if (logger.isInfoEnabled()) {
                logger.info("Retry unregister " + failed);
            }
            try {
                for (URL url : failed) {
                    try {
                        doUnregister(url);
                        failedUnregistered.remove(url);
                    } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                        logger.warn("Failed to retry unregister ... waiting for again, cause: ...");
                    }
                }
            } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                logger.warn("Failed to retry unregister ... waiting for again, cause: ");
            }
        }
    }
    if (!failedSubscribed.isEmpty()) {
        Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedSubscribed);
        for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>
                                                         (failed).entrySet()) {
            if (entry.getValue() == null || entry.getValue().size() == 0) {
                failed.remove(entry.getKey());
            }
        }
        if (failed.size() > 0) {
            if (logger.isInfoEnabled()) {
                logger.info("Retry subscribe " + failed);
            }
            try {
                for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
                    URL url = entry.getKey();
                    Set<NotifyListener> listeners = entry.getValue();
                    for (NotifyListener listener : listeners) {
                        try {
                            doSubscribe(url, listener);
                            listeners.remove(listener);
                        } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                            logger.warn("Failed to retry subscribe ... waiting for again, cause: ...");
                        }
                    }
                }
            } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                logger.warn("Failed to retry subscribe ... waiting for again, cause: ");
            }
        }
    }
    if (!failedUnsubscribed.isEmpty()) {
        Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>
                                                                (failedUnsubscribed);
        for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>
                                                         (failed).entrySet()) {
            if (entry.getValue() == null || entry.getValue().isEmpty()) {
                failed.remove(entry.getKey());
            }
        }
        if (failed.size() > 0) {
            if (logger.isInfoEnabled()) {
                logger.info("Retry unsubscribe " + failed);
            }
            try {
                for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
                    URL url = entry.getKey();
                    Set<NotifyListener> listeners = entry.getValue();
                    for (NotifyListener listener : listeners) {
                        try {
                            doUnsubscribe(url, listener);
                            listeners.remove(listener);
                        } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
                            logger.warn("Failed to retry unsubscribe ... waiting for again, cause: ..");
                        }
                    }
                }
            } catch (Throwable t) { 
                logger.warn("Failed to retry unsubscribe ... waiting for again, cause: ...");
            }
        }
    }
    if (!failedNotified.isEmpty()) {
        Map<URL, Map<NotifyListener, List<URL>>> failed = new HashMap<URL, Map<NotifyListener, 
                                                    List<URL>>>(failedNotified);
        for (Map.Entry<URL, Map<NotifyListener, List<URL>>> entry : new HashMap<URL, Map<NotifyListener, 
                                                     List<URL>>>(failed).entrySet()) {
            if (entry.getValue() == null || entry.getValue().size() == 0) {
                failed.remove(entry.getKey());
            }
        }
        if (failed.size() > 0) {
            if (logger.isInfoEnabled()) {
                logger.info("Retry notify " + failed);
            }
            try {
                for (Map<NotifyListener, List<URL>> values : failed.values()) {
                    for (Map.Entry<NotifyListener, List<URL>> entry : values.entrySet()) {
                        try {
                            NotifyListener listener = entry.getKey();
                            List<URL> urls = entry.getValue();
                            listener.notify(urls);
                            values.remove(listener);
                        } catch (Throwable t) { 
                            logger.warn("Failed to retry notify ... waiting for again, cause: ...");
                        }
                    }
                }
            } catch (Throwable t) { 
                logger.warn("Failed to retry notify ... waiting for again, cause: ...");
            }
        }
    }
}

 

3.5.zookeeper封装客户端:CuratorZookeeperClient


  CuratorZookeeperTransporter的connect方法直接new一个Dubbo封装的CuratorZookeeperClient。
public ZookeeperClient connect(URL url) {
    return new CuratorZookeeperClient(url);
}

   而CuratorZookeeperClient是dubbo经过Curator封装出来的zookeeper客户端。它的构造函数经过Curator框架建立一个client,而且向该client添加一个链接状态的监听器。当有链接状态改变时,会向CuratorZookeeperClient维护的StateListener调用stateChanged方法,传入获取到的状态。app

 public CuratorZookeeperClient(URL url) {
     super(url);
     try {
         CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
             .connectString(url.getBackupAddress())
             .retryPolicy(new RetryNTimes(1, 1000))
             .connectionTimeoutMs(5000);
         String authority = url.getAuthority();
         if (authority != null && authority.length() > 0) {
             builder = builder.authorization("digest", authority.getBytes());
         }
         client = builder.build();
         client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
             @Override
             public void stateChanged(CuratorFramework client, ConnectionState state) {
                 //回调自身维护的监听器List<StateListener>
                 if (state == ConnectionState.LOST) {
                     CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
                 } else if (state == ConnectionState.CONNECTED) {
                     CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
                 } else if (state == ConnectionState.RECONNECTED) {
                     CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
                 }
             }
         });
         client.start();
     } catch (Exception e) {
         throw new IllegalStateException(e.getMessage(), e);
     }
 }

 

3.6.StubProxyFactoryWrapper


  看看getProxy方法:
/** 
 * StubProxyFactoryWrapper的getProxy方法
 */
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
    //它首先调用包装的JavassistProxyFactory的getProxy方法
    T proxy = proxyFactory.getProxy(invoker);
    if (GenericService.class != invoker.getInterface()) {
        //若是设置了本地代理类,则将获取到的Proxy包装为代理类对象
        String stub = invoker.getUrl().getParameter(
            Constants.STUB_KEY, invoker.getUrl().getParameter(Constants.LOCAL_KEY));
        if (ConfigUtils.isNotEmpty(stub)) {
            Class<?> serviceType = invoker.getInterface();
            if (ConfigUtils.isDefault(stub)) {
                if (invoker.getUrl().hasParameter(Constants.STUB_KEY)) {
                    stub = serviceType.getName() + "Stub";
                } else {
                    stub = serviceType.getName() + "Local";
                }
            }
            try {
                Class<?> stubClass = ReflectUtils.forName(stub);
                if (!serviceType.isAssignableFrom(stubClass)) {
                    throw new IllegalStateException("The stub implementation class ...");
                }
                try {
                    Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType);
                    proxy = (T) constructor.newInstance(new Object[]{proxy});
                    //export stub service
                    URL url = invoker.getUrl();
                    if (url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT)) {
                        url = url.addParameter(Constants.STUB_EVENT_METHODS_KEY, StringUtils.join(
                            Wrapper.getWrapper(proxy.getClass()).getDeclaredMethodNames(), ","));
                        url = url.addParameter(Constants.IS_SERVER_KEY, Boolean.FALSE.toString());
                        try {
                            export(proxy, (Class) invoker.getInterface(), url);
                        } catch (Exception e) {
                            LOGGER.error("export a stub service error.", e);
                        }
                    }
                } catch (NoSuchMethodException e) {
                    throw new IllegalStateException("No such constructor \"public ...");
                }
            } catch (Throwable t) {
                LOGGER.error("Failed to create stub implementation class ...");
            }
        }
    }
    return proxy;
}

/**
 * JavassistProxyFactory的getProxy方法,继承自AbstractProxyFactory
 */
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
    Class<?>[] interfaces = null;
    //获取接口列表
    String config = invoker.getUrl().getParameter("interfaces");
    if (config != null && config.length() > 0) {
        //切分接口列表
        String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
        if (types != null && types.length > 0) {
             //设置服务接口类和EchoService.class到interfaces中
            interfaces = new Class<?>[types.length + 2];
            interfaces[0] = invoker.getInterface();
            interfaces[1] = EchoService.class;
            for (int i = 0; i < types.length; i++) {
                interfaces[i + 1] = ReflectUtils.forName(types[i]);
            }
        }
    }
    //若是接口列表为空,则设置它为服务接口以及回声测试接口
    if (interfaces == null) {
        interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
    }
    
    return getProxy(invoker, interfaces);
}

/**
 * JavassistProxyFactory实现的getProxy方法,非继承,获取服务接口代理类对象
 */
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
    //经过字节码技术生成类Proxy的子类(Proxy是抽象类)
    //并调用Proxy子类的newInstance方法建立服务接口代理类实例,该实例维护一个InvokerInvocationHandler
    //代理类实例的每一个方法实现都会调用InvokerInvocationHandler的invoke方法,将服务接口的方法参数以及
    //调用的方法反射对象Method传入。InvokerInvocationHandler的invoke方法在一系列检查后最终执行以下方法:
    //return invoker.invoke(new RpcInvocation(method, args)).recreate();
    return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

 

3.7.Proxy


  Proxy是一个抽象类,该类提供一个Proxy方法,该方法经过字节码技术生成Proxy的之类。该子类有一个方法,用于生成Invoker代理类。
//Proxy子类实例缓存
private static final Map<ClassLoader, Map<String, Object>> ProxyCacheMap = 
    new WeakHashMap<ClassLoader, Map<String, Object>>();

/**
 * 生成Proxy子类,该子类封装了生成服务接口代理类的逻辑
 */
public static Proxy getProxy(Class<?>... ics) {
    return getProxy(ClassHelper.getClassLoader(Proxy.class), ics);
}

/**
 * 生成Proxy子类,该子类封装了生成服务接口代理类的逻辑
 */
public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
    if (ics.length > 65535)
        throw new IllegalArgumentException("interface limit exceeded");

    StringBuilder sb = new StringBuilder();
    //遍历接口列表
    for (int i = 0; i < ics.length; i++) {
        String itf = ics[i].getName();
        //检测是否为接口类型
        if (!ics[i].isInterface())
            throw new RuntimeException(itf + " is not a interface.");
        //使用提供的ClassLoader,即Proxy自身的ClassLoader加载当前遍历的接口
        Class<?> tmp = null;
        try {
            tmp = Class.forName(itf, false, cl);
        } catch (ClassNotFoundException e) {
        }
        //检测接口是否相同,这里至关于判断Proxy的类加载器与接口的类加载器是否为一个
        if (tmp != ics[i])
            throw new IllegalArgumentException(ics[i] + " is not visible from class loader");
        //拼接接口名称,格式"接口名1;接口名2..."
        sb.append(itf).append(';');
    }

    //获取类加载器对应的Proxy缓存,一个Map对象
    Map<String, Object> cache;
    synchronized (ProxyCacheMap) {
        //获取当前类加载器的Proxy实例缓存,key为ClassLoader
        cache = ProxyCacheMap.get(cl);
        if (cache == null) {
            cache = new HashMap<String, Object>();
            //缓存为null,设置该ClassLoader的缓存
            ProxyCacheMap.put(cl, cache);
        }
    }
    
    //以接口名做为key,从cache中获取这个key的Proxy实例缓存
    String key = sb.toString();
    Proxy proxy = null;
    //获取value时的并发控制,使用监视器锁
    synchronized (cache) {
        do {
            //若是value就是应用类型包装的,直接从引用中获取实例
            Object value = cache.get(key);
            if (value instanceof Reference<?>) {
                proxy = (Proxy) ((Reference<?>) value).get();
                if (proxy != null)
                    return proxy;
            }
            //若是value不是引用类型,则进行判断其是否等于PendingGenerationMarker,即一个Object对象。
            //这是使用了cache.wait(),让其余线程在cache这个对象上进行等待,缘由以下:
            //(1)首先一个在cache中未命中的key其value确定为null;那么咱们确定要建立这个value;
            //(2)既然value==null,则进入到else逻辑,设置一个key的标识,跳出循环,也跳出监视器锁同步块;
            //(3)当前线程代码继续执行去建立Proxy的实例,其余线程进入到这个监视器锁块,就会进行循环获取Proxy;
            //(4)不断地循环获取知足条件的Reference也没错,可是这样不断疯狂的循环,对程序有影响;
            //(5)所以,设置PendingGenerationMarker的目的也在于此,做为一个标识,若是发现key的value仍是它,
            //   就表示Proxy实例还没有建立完成,在此进行等待;直到实例建立完成并进行notify。
            //(6)固然,若使用监视器锁将建立Proxy的代码锁住也能够,可是这样锁住的代码块太大了。
            if (value == PendingGenerationMarker) {
                try {
                    cache.wait();
                } catch (InterruptedException e) {
                }
            } else {
                cache.put(key, PendingGenerationMarker);
                break;
            }
        }
        while (true);
    }

    //原子计数器+1,做为id,用于拼接生成的Proxy子类的名字
    long id = PROXY_CLASS_COUNTER.getAndIncrement();
    String pkg = null;
    //ccm用于为Proxy生成子类,ccp为服务接口生成代理类
    ClassGenerator ccp = null, ccm = null;
    try {
        /************************* 开始建立服务接口代理类 *************************/
        //建立生成服务接口代理类的ClassGenerator
        ccp = ClassGenerator.newInstance(cl);

        Set<String> worked = new HashSet<String>();
        List<Method> methods = new ArrayList<Method>();
        //遍历要代理的接口
        for (int i = 0; i < ics.length; i++) {
            //检测接口访问级别是否为public
            if (!Modifier.isPublic(ics[i].getModifiers())) {
                //不为public级别的接口,须要确保它们必须在同一个包下
                String npkg = ics[i].getPackage().getName();
                if (pkg == null) {
                    pkg = npkg;
                } else {
                    if (!pkg.equals(npkg))
                        throw new IllegalArgumentException("non-public interfaces from diff pack...");
                }
            }
            //添加接口到cpp中
            ccp.addInterface(ics[i]);
            //遍历服务接口的全部方法
            for (Method method : ics[i].getMethods()) {
                //获取方法描述,JVM格式"realTimePushList(Lcom/bestpay/messagecenter/product/
                //service/api/dto/push/RealTimePushListDTO;)Lcom/bestpay/dubbo/result/Result;"
                String desc = ReflectUtils.getDesc(method);
                //若是方法描述字符串已在worked中,则忽略。考虑A接口和B接口中包含一个彻底相同的方法的状况
                if (worked.contains(desc))
                    continue;
                worked.add(desc);

                //服务接口代理类方法大小 TODO 
                int ix = methods.size();
                Class<?> rt = method.getReturnType();
                Class<?>[] pts = method.getParameterTypes();

                //拼接代码字符串"Object[] args = new Object[N];",N是当前遍历的method参数个数
                StringBuilder code = new StringBuilder("Object[] args = new Object[").
                                    append(pts.length).append("];");
                //遍历method的参数列表
                for (int j = 0; j < pts.length; j++)
                    //拼接代码字符串"args[j]=($w)$k;",其中k=j+1。这个是args的赋值语句
                    code.append(" args[").append(j).append("] = ($w)$").append(j + 1).append(";");
                //拼接handler的调用语句,"Object ret = handler.invoke(this, methods[ix], args);"
                //handler是java动态代理InvocationHandler的一个实现类,ix为methods.size()。             
                code.append(" Object ret = handler.invoke(this, methods[" + ix + "], args);");
                //若方法返回类型不为void,则拼接返回类型并进行强制类型转换"return (类型)ret;"
                if (!Void.TYPE.equals(rt))
                    code.append(" return ").append(asArgument(rt, "ret")).append(";");
                //向List<Method>中添加该method
                methods.add(method);
                //添加方法名、访问控制符、返回类型、参数列表、抛出异常类型、方法体代码到ClassGenerator中 
                ccp.addMethod(method.getName(), method.getModifiers(), rt, pts,
                              method.getExceptionTypes(), code.toString());
            }
        }
        //设置服务接口代理类包名就为Proxy类所在的包名
        if (pkg == null)
            pkg = PACKAGE_NAME;
        //设置服务接口代理类类名为"pkg + ".proxy" + id",好比org.apache.dubbo.proxy0,注意是小写
        String pcn = pkg + ".proxy" + id;
        ccp.setClassName(pcn);
        //添加成员属性Method[] methods
        ccp.addField("public static java.lang.reflect.Method[] methods;");
        //添加成员属性InvocationHandler
        ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
        //添加带有InvocationHandler参数的构造方法,好比:
        //public proxy0(java.lang.reflect.InvocationHandler $1) { handler=$1; }
        ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, 
                           new Class<?>[0], "handler=$1;");
        //添加默认无参构造器
        ccp.addDefaultConstructor();
        //生成服务接口代理Class
        Class<?> clazz = ccp.toClass();
        //将服务接口代理类的static属性methods设置为上面收集到的methods列表
        clazz.getField("methods").set(null, methods.toArray(new Method[0]));

        /************************* 开始建立Proxy类的子类 *************************/          
        String fcn = Proxy.class.getName() + id; //类名=Proxy全限定名+id,如Proxy一、Proxy2等      
        ccm = ClassGenerator.newInstance(cl); //建立生成Proxy子类的ClassGenerator       
        ccm.setClassName(fcn); //设置类名       
        ccm.addDefaultConstructor(); //添加默认构造器     
        ccm.setSuperClass(Proxy.class); //设置父类
        //添加方法newInstance,该方法调用构造方法,格式以下:
        //public Object newInstance(java.lang.reflect.InvocationHandler $1) { 
        //    return new org.apache.dubbo.proxy0($1);
        //}
        ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + 
                      " h){ return new " + pcn + "($1); }");
        Class<?> pc = ccm.toClass(); //生成Proxy子类Class
        proxy = (Proxy) pc.newInstance(); //生成Proxy子类的实例
    } catch (RuntimeException e) {
        throw e;
    } catch (Exception e) {
        throw new RuntimeException(e.getMessage(), e);
    } finally {
        //释放资源
        if (ccp != null)
            ccp.release();
        if (ccm != null)
            ccm.release();
        //同步设置该Proxy子类的实例缓存,使用弱引用
        synchronized (cache) {
            if (proxy == null)
                cache.remove(key);
            else
                cache.put(key, new WeakReference<Proxy>(proxy));
            //通知全部在cache上等待的线程
            cache.notifyAll();
        }
    }
    return proxy;
}