Dubbo服务治理

准确来讲这里所说的集群主要是指Dubbo对于集群内容的支持,主要涉及:集群容错,负载均衡,路由逻辑三块。下面对于这三块的内容分别进行介绍。

集群容错

容错主要是指服务出现了非业务异常以后采起的一些弥补措施,注意我这里讲的是非业务异常,由于业务异常出现的绝大多数状况都是代码异常,因此及时采起了重试等逻辑仍是会出现同样的业务异常(代码出问题了,锅固然要本身背喽)。
Dubbo中对于容错的处理主要集中在Cluster中,Cluster包装了底层调用的Invoker而且在Cluster本身本层作了一些出现异常以后的处理。
对于Dubbo的容错主要是有两层。第一层是mock,第二层是用户配置的容错策略。对于集群容错的包装逻辑入口就在于RegistryProtocol的doRefer()方法最后的cluster.join(directory),该方法返回了集群包装事后的invoker,这里的cluser其实就是MockClusterWrapper(至于为何能肯定是MockClusterInvoker,就须要你们去理解一下Dubbo的SPI机制了),下面一块儿来看一下MockClusterInvoke的具体内容:java

//真正起做用的是MockClusterInvoker
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        //由于cluster下面以后一层包装,因此这里的this.cluser就是默认的FialoverCluster
        return new MockClusterInvoker<T>(directory,
                this.cluster.join(directory));
    }
    //MockClusterInvoker.invoke()
    public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;
        //获取URL中配置的mock参数
        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); 
        if (value.length() == 0 || value.equalsIgnoreCase("false")){
            //若是没有配置mock的话就直接进行调用
            result = this.invoker.invoke(invocation);
            //若是配置了强制的mock就直接调用mock,不走正常调用逻辑
        } else if (value.startsWith("force")) {
            if (logger.isWarnEnabled()) {
                logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " +  directory.getUrl());
            }
            result = doMockInvoke(invocation, null);
        } else {
            //调用失败以后再进行mock操做
            try {
                result = this.invoker.invoke(invocation);
            }catch (RpcException e) {
                //mock并不会处理义务异常
                if (e.isBiz()) {
                    throw e;
                } else {
                    if (logger.isWarnEnabled()) {
                        logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " +  directory.getUrl(), e);
                    }
                    result = doMockInvoke(invocation, e);
                }
            }
        }
        return result;
    }
    //主要是挑选出可用的MockInvoker类而后调用其invoke方法返回结果
    private Result doMockInvoke(Invocation invocation,RpcException e){
        Result result = null;
        Invoker<T> minvoker ;
        //选取可用的MockInvoker
        List<Invoker<T>> mockInvokers = selectMockInvoker(invocation);
        if (mockInvokers == null || mockInvokers.size() == 0){
            minvoker = (Invoker<T>) new MockInvoker(directory.getUrl());
        } else {
            minvoker = mockInvokers.get(0);
        }
        try {
            result = minvoker.invoke(invocation);
        } catch (RpcException me) {
            //若是是业务异常就封装结果(注意这里和上面的区别),由于biz异常是用户本身本身在mock信息中配置的异常,不是预想以外的异常
            if (me.isBiz()) {
                result = new RpcResult(me.getCause());
            } else {
                throw new RpcException(me.getCode(), getMockExceptionMessage(e, me), me.getCause());
            }
        } catch (Throwable me) {
            throw new RpcException(getMockExceptionMessage(e, me), me.getCause());
        }
        return result;
    }
    
    //核心最后仍是调用了MockInvoker的invoker方法
    public Result invoke(Invocation invocation) throws RpcException {
        String mock = getUrl().getParameter(invocation.getMethodName()+"."+Constants.MOCK_KEY);
        if (invocation instanceof RpcInvocation) {
            ((RpcInvocation) invocation).setInvoker(this);
        }
        if (StringUtils.isBlank(mock)){
            mock = getUrl().getParameter(Constants.MOCK_KEY);
        }
        
        if (StringUtils.isBlank(mock)){
            //这个错误比较常见,缘由就在于客户端调用的时候返回的异常信息是非业务异常,可是客户端又没有配置mock信息,所以就会抛出这个异常
            throw new RpcException(new IllegalAccessException("mock can not be null. url :" + url));
        }
        //解析出mock的类型,若是是mock=fail:AA,就返回AA,若是是mock=xx.Service就返回xx.Service,若是是mock=force:XX,就返回XX
        mock = normallizeMock(URL.decode(mock));
        //若是配置的是:mock=fail:return,就直接返回空结果
        if (Constants.RETURN_PREFIX.trim().equalsIgnoreCase(mock.trim())){
            RpcResult result = new RpcResult();
            result.setValue(null);
            return result;
            //若是配置的是mock=fail:return **,就解析**为对应的可返回内容而后返回
        } else if (mock.startsWith(Constants.RETURN_PREFIX)) {
            mock = mock.substring(Constants.RETURN_PREFIX.length()).trim();
            mock = mock.replace('`', '"');
            try {
                Type[] returnTypes = RpcUtils.getReturnTypes(invocation);
                Object value = parseMockValue(mock, returnTypes);
                return new RpcResult(value);
            } catch (Exception ew) {
                throw new RpcException("mock return invoke error. method :" + invocation.getMethodName() + ", mock:" + mock + ", url: "+ url , ew);
            }
            //若是配置的是mock=fail:throw **(用户自定义的异常信息),就解析**为对应的可返回内容而后返回
        } else if (mock.startsWith(Constants.THROW_PREFIX)) {
            mock = mock.substring(Constants.THROW_PREFIX.length()).trim();
            mock = mock.replace('`', '"');
            if (StringUtils.isBlank(mock)){
                throw new RpcException(" mocked exception for Service degradation. ");
            } else { //用户自定义类
                Throwable t = getThrowable(mock);
                throw new RpcException(RpcException.BIZ_EXCEPTION, t);
            }
        } else { //若是mock信息为ServiceMock的话就直接找到对应的Mock类进行mock调用,而后返回结果
             try {
                 Invoker<T> invoker = getInvoker(mock);
                 return invoker.invoke(invocation);
             } catch (Throwable t) {
                 throw new RpcException("Failed to create mock implemention class " + mock , t);
             }
        }
    }

从上面的逻辑上来看mock主要是根据用户的一些配置,作一些很是具体的容错逻辑,精确到方法界别的,因此算是容错的最小粒度了。咱们常常在使用中对一些不可靠服务进行mock处理,防止在其出现异常时候影响咱们的核心调用流程。
方法级别的容错整体来讲是针对业务异常的一种容错,而针对非业务异常的容错逻辑就是另一个概念了,好比说因为网络抖动致使某次调用没有成功,针对相似的异常Dubbo也有本身的容错措施,具体以下面几种:web

  • Failover Cluster 失败自动切换,当出现失败,重试其它服务器。(缺省) 一般用于读操做,但重试会带来更长延迟。
  • Failfast Cluster 快速失败,只发起一次调用,失败当即报错。一般用于非幂等性的写操做,好比新增记录。
  • Failsafe Cluster 失败安全,出现异常时,直接忽略。一般用于写入审计日志等操做。
  • Failback Cluster 失败自动恢复,后台记录失败请求,定时重发。 一般用于消息通知操做。
  • Forking Cluster 并行调用多个服务器,只要一个成功即返回。一般用于实时性要求较高的读操做,但须要浪费更多服务资源。可经过forks="2"来设置最大并行数。
  • Broadcast Cluster 广播调用全部提供者,逐个调用,任意一台报错则报错。(2.1.0开始支持) 一般用于通知全部提供者更新缓存或日志等本地资源信息。
    由于种类比较多,所有讲一遍太费时间了,因此选择比较经常使用的几个进行介绍。

Failover

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyinvokers = invokers;
        //检查copyinvokers是否为null
        checkInvokers(copyinvokers, invocation);
        //重试次数,默认为3次,不包含第一次调用
        int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // last exception.
        RpcException le = null;
        //已经调用的Invoker列表
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size());
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
            //重试时,进行从新选择,避免重试时invoker列表已发生变化.
            //注意:若是列表发生了变化,那么invoked判断会失效,由于invoker示例已经改变
            if (i > 0) {
                checkWheatherDestoried();
                copyinvokers = list(invocation);
                //从新检查一下有没有对应的提供者
                checkInvokers(copyinvokers, invocation);
            }
            //经过loadbalance去选出目标Invoker
            //这里默认的LoadBalance是RandomLoadBalance,选择时候是根据权重来选择目标的Invoker,固然也能够配置其余的LoadBalance
            Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
            invoked.add(invoker);
            //添加到上下文环境中去,可是这里为何会把失败的invoker也加进来,感受失败的Invoker信息并无什么意义
            RpcContext.getContext().setInvokers((List)invoked);
            try {
                //这里才是最后的调用,使用通过loadbalance选出的invoker去调用
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + invocation.getMethodName()
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers 
                            + " (" + providers.size() + "/" + copyinvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                return result;
            } catch (RpcException e) {
                //业务异常不会重试,直接抛出
                if (e.isBiz()) {
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
                + invocation.getMethodName() + " in the service " + getInterface().getName() 
                + ". Tried " + len + " times of the providers " + providers 
                + " (" + providers.size() + "/" + copyinvokers.size() 
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
    }

Failfast

//快速失败的逻辑最简单了,就是什么都不作,有调用异常的话就直接往上抛出
    public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        checkInvokers(invokers, invocation);
        Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
        try {
            return invoker.invoke(invocation);
            //简单区分异常类型
        } catch (Throwable e) {
            if (e instanceof RpcException && ((RpcException)e).isBiz()) { // biz exception.
                throw (RpcException) e;
            }
            throw new RpcException(e instanceof RpcException ? ((RpcException)e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);
        }
    }

failover与failfast的代码基本同样,主要区别就是出现异常以后直接忽略,而后返回空的RpcResult。其他的几种集群策略在平时用的比较少,我就很少介绍了,其实现也都比较简单。
Dubbo的容错不只体如今provider的cluster,对于注册中心也有提供cluster内容(AvailableCluster),只不过该内容比较简单,只是随机选取了一个可用的注册中心。算法

负载均衡

负载均衡的概念:从多个目标服务器中选择出其中一个服务器供客户端调用,这个选择的具体过程就叫作负载均衡(纯粹是本身给小白用户的解释)。
通常的服务性框架都会有负载均衡的内容,Dubbo也是基于本身的URL机制作了一层负载均衡,咱们看到上面的集群内容时候就看到集群内部其实就依赖了负载均衡策略来从多个Invoker中选取其中的一个,只不过一次负载所选择到的Invoker并不必定能知足条件,好比在Failover策略下,失败以后Loadbalance从新选择的Invoker仍是失败过的那个,那就要从新计算了。
Dubbo的负载均衡策略主要有如下几种:缓存

  • Random LoadBalance 随机选取服务提供者 最简单的无状态的负载均衡算法
  • RoundRobin LoadBalance 以轮训的方式调用服务提供者 缺点是有状态,必须在并发之下记住上一次到谁了
  • LeastActive LoadBalance 调用最少活跃调调用的服务提供者,这里有一点须要注意,这里的服务调用统计维度是在方法级别的,也就是说是方法级别的LoadBalance。
  • ConsistentHash LoadBalance 一致性Hash(用的很少,很少讲解)

首先看下LoadBalance的接口就直到它是作什么的:安全

//从invokers列表中根据url和invocation信息选出一个合适的Invoker供consumer端调用
    <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

在全部的LoadBalance中都提到了一个概念:weight。正常状况下咱们不管配置在provider仍是service中,对应的全部服务端的providerUrl的weight都是同样的,这种状况其实weight配置不配置意义不大。可是一旦动态针对某个服务调整过weight值,这个影响就出来了。例如:override://10.20.153.10/com.foo.BarService?category=configurators&dynamic=false&weight=200 配置以后,就将10.20.153.10服务器上的com.foo.BarServic的权值改成了200,那么它在以后的LoadBalance中被选取的可能性就更大了(默认的权值为100)。服务器

Random LoadBalance

private final Random random = new Random();

    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size(); // 总个数
        int totalWeight = 0; // 总权重
        boolean sameWeight = true; // 权重是否都同样
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            totalWeight += weight; // 累计总权重
            if (sameWeight && i > 0
                    && weight != getWeight(invokers.get(i - 1), invocation)) {
                sameWeight = false; // 计算全部权重是否同样
            }
        }
        if (totalWeight > 0 && ! sameWeight) {
            // 若是权重不相同且权重大于0则按总权重数随机
            int offset = random.nextInt(totalWeight);
            // 随机数落到哪一个片断上,就取哪一个片断对应的Invoker对象(击鼓传花式日后切割)
            for (int i = 0; i < length; i++) {
                offset -= getWeight(invokers.get(i), invocation);
                if (offset < 0) {
                    return invokers.get(i);
                }
            }
        }
        // 若是权重相同或权重为0则均等随机
        return invokers.get(random.nextInt(length));
    }

RoundRobin LoadBalance

//轮训是针对方法级别的,并非全部服务调用
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        int length = invokers.size(); // 总个数
        int maxWeight = 0; // 最大权重
        int minWeight = Integer.MAX_VALUE; // 最小权重
        // invoker->weight,IntegerWrapper就是一个简单的Integer包装类
        final LinkedHashMap<Invoker<T>, IntegerWrapper> invokerToWeightMap = new LinkedHashMap<Invoker<T>, IntegerWrapper>();
        int weightSum = 0;
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            maxWeight = Math.max(maxWeight, weight); // 累计最大权重
            minWeight = Math.min(minWeight, weight); // 累计最小权重
            if (weight > 0) {
                invokerToWeightMap.put(invokers.get(i), new IntegerWrapper(weight));
                weightSum += weight;
            }
        }
        AtomicPositiveInteger sequence = sequences.get(key);
        if (sequence == null) {
            sequences.putIfAbsent(key, new AtomicPositiveInteger());
            sequence = sequences.get(key);
        }
        //currentSequence表明某个方法是第多少次被调用的,例如第1W次
        int currentSequence = sequence.getAndIncrement();
        if (maxWeight > 0 && minWeight < maxWeight) { // 权重不同
            // 能够把weightSum理解成一个权重范围内的总集,mod就带表在这个总集中具体执行到的位置
            int mod = currentSequence % weightSum;
            //weightSum < maxWeight*length
            for (int i = 0; i < maxWeight; i++) {
                for (Map.Entry<Invoker<T>, IntegerWrapper> each : invokerToWeightMap.entrySet()) {
                    final Invoker<T> k = each.getKey();
                    final IntegerWrapper v = each.getValue();
                    //这里的逻辑比较抽象,本质上就是谁的权重越大,轮询到谁的次数就越多
                    if (mod == 0 && v.getValue() > 0) {
                        return k;
                    }
                    if (v.getValue() > 0) {
                        v.decrement();
                        mod--;
                    }
                }
            }
        }
        // 取模轮循
        return invokers.get(currentSequence % length);
    }

简单题下RoundRobin的弊端:若是某个服务有3台服务器,权重依次是10,1000,100,可能配置的人的本意是在轮训的时候走到三台机器的比例是:110,可是实际上确实是1000个请求压倒了第二台机器上。。。网络

LeastActive LoadBalance

private final Random random = new Random();

    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size(); // 总个数
        int leastActive = -1; // 最小的活跃数
        int leastCount = 0; // 相同最小活跃数的个数
        int[] leastIndexs = new int[length]; // 相同最小活跃数的下标
        int totalWeight = 0; // 总权重
        int firstWeight = 0; // 第一个权重,用于于计算是否相同
        boolean sameWeight = true; // 是否全部权重相同
        for (int i = 0; i < length; i++) {
            Invoker<T> invoker = invokers.get(i);
            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // 活跃数
            int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); // 权重
            if (leastActive == -1 || active < leastActive) { // 若是是初始状况下或者某台机器的active数量小于如今保存的leastActive数量,就会从新开始
                leastActive = active; // 记录最小活跃数
                leastCount = 1; // 从新统计相同最小活跃数的个数
                leastIndexs[0] = i; // 从新记录最小活跃数下标
                totalWeight = weight; // 从新累计总权重
                firstWeight = weight; // 记录第一个权重
                sameWeight = true; // 还原权重相同标识
            } else if (active == leastActive) { // 累计相同最小的活跃数
                leastIndexs[leastCount ++] = i; // 累计相同最小活跃数下标
                totalWeight += weight; // 累计总权重
                // 判断全部权重是否同样
                if (sameWeight && i > 0 
                        && weight != firstWeight) {
                    sameWeight = false;
                }
            }
        }
        // 若是只有一个最小值的话就直接调用
        if (leastCount == 1) {
            // 若是只有一个最小则直接返回
            return invokers.get(leastIndexs[0]);
        }
        if (! sameWeight && totalWeight > 0) {
            // 若是权重不相同且权重大于0则按总权重数随机
            int offsetWeight = random.nextInt(totalWeight);
            // 并肯定随机值落在哪一个片段上
            for (int i = 0; i < leastCount; i++) {
                int leastIndex = leastIndexs[i];
                offsetWeight -= getWeight(invokers.get(leastIndex), invocation);
                if (offsetWeight <= 0)
                    return invokers.get(leastIndex);
            }
        }
        // 若是权重相同或权重为0则均等随机
        return invokers.get(leastIndexs[random.nextInt(leastCount)]);
    }

一致性哈希对应的LoadBalance本次不讲解。
我我的对于Dubbo 提供的LoadBalance的见解是:基本上知足平常使用,可是应该更加丰富。由于咱们平常使用的是leastactive,可是由于该负载均衡策略是基于方法级别的,因此没法控制其余的方法对于应用的影响,这里若是将统计的维度上升到机器纬度,其实能够作到相似于整个集群的leastactive,这样的话就不容易出现部分几台机器负载特别高,而其他的大部分机器都有不少资源结余。固然也能够提供接口纬度的负载均衡,这个彻底能够根据具体的业务实现定值,由于SPI机制的缘故,自定义负载均衡策略实现起来仍是比较方便的。并发

路由

说到路由仍是贴一张官方的图会比较好理解一些 app

服务治理细节负载均衡

从上图中能看出来router的做用实际上是在LB以前的,也就是说LB的入参其实就是router的结果。
由于router可能理解起来并不直观,所以仍是大体介绍一下router的含义。
在平常的服务这里过程当中,好比我想给某个接口开个特权,专门设置一些提供者只供其调用;读写分离,读操做配置专门的机器,写操做配置专门的机器;某个消费者有问题,想及时下掉等等。都是能够经过router来实现的,明白了router的具体含义以后咱们来一块儿看一下router的实现:
首先为了明确路由的基本规则,把官方的案例拿过来一块儿看一下:

服务调用信息,如:method, argument 等 (暂不支持参数路由) URL自己的字段,如:protocol, host, port 等 以及URL上的全部参数,如:application, organization 等
条件支持:
等号"="表示"匹配",如:host = 10.20.153.10 不等号"!="表示"不匹配",如:host != 10.20.153.10
值支持:
以逗号","分隔多个值,如:host != 10.20.153.10,10.20.153.11 以星号"_"结尾,表示通配,如:host != 10.20._ 以美圆符"$"开头,表示引用消费者参数,如:host = $host
示例:

  1. 排除预发布机:

RegistryFactoryregistryFactory=ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
Registryregistry=registryFactory.getRegistry(URL.valueOf("zookeeper://10.20.153.10:2181")); registry.register(URL.valueOf("condition://0.0.0.0/com.foo.BarService?category=routers&dynamic=false&rule="+ URL.encode("host=10.20.153.10=>host=10.20.153.11")+"));

host=10.20.153.10=>host=10.20.153.11
"=>"以前的为消费者匹配条件,全部参数和消费者的URL进行对比,当消费者知足匹配条件时,对该消费者执行后面的过滤规则。 "=>"以后为提供者地址列表的过滤条件,全部参数和提供者的URL进行对比,消费者最终只拿到过滤后的地址列表。 若是匹配条件为空,表示对全部消费方应用,如:=> host != 10.20.153.11
若是过滤条件为空,表示禁止访问,如:host = 10.20.153.10 =>host!=172.22.3.91

  1. 白名单:(注意:一个服务只能有一条白名单规则,不然两条规则交叉,就都被筛选掉了)
    host!=10.20.153.10,10.20.153.11=>
  2. 黑名单:
    host=10.20.153.10,10.20.153.11=>
  3. 服务寄宿在应用上,只暴露一部分的机器,防止整个集群挂掉:
    =>host=172.22.3.1_,172.22.3.2_
  4. 为重要应用提供额外的机器:
    application!=kylin=>host!=172.22.3.95,172.22.3.96
  5. 读写分离:
    method=find_,list_,get_,is_=>host=172.22.3.94,172.22.3.95,172.22.3.96
    method!=find_,list_,get_,is_=>host=172.22.3.97,172.22.3.98
  6. 先后台分离:
    application=bops=>host=172.22.3.91,172.22.3.92,172.22.3.93
    application!=bops=>host=172.22.3.94,172.22.3.95,172.22.3.96
  7. 隔离不一样机房网段:
    host!=172.22.3._=>host!=172.22.3._
  8. 提供者与消费者部署在同集群内,本机只访问本机的服务:
    =>host=$host

下面以ConditionRouter为例来看一下路由规则的具体实现

//构造函数初始化的时候难点在于when和then的初始化:
    public ConditionRouter(URL url) {
        this.url = url;
        //路由规则的优先级,用于排序,优先级越大越靠前执行,可不填,缺省为0。
        this.priority = url.getParameter(Constants.PRIORITY_KEY, 0);
        //当路由结果为空时,是否强制执行,若是不强制执行,路由结果为空的路由规则将自动失效,可不填,缺省为flase
        this.force = url.getParameter(Constants.FORCE_KEY, false);
        try {
            String rule = url.getParameterAndDecoded(Constants.RULE_KEY);
            if (rule == null || rule.trim().length() == 0) {
                throw new IllegalArgumentException("Illegal route rule!");
            }
            rule = rule.replace("consumer.", "").replace("provider.", "");
            int i = rule.indexOf("=>");
            // =>前的部分
            String whenRule = i < 0 ? null : rule.substring(0, i).trim();
            // =>后的部分
            String thenRule = i < 0 ? rule.trim() : rule.substring(i + 2).trim();
            Map<String, MatchPair> when = StringUtils.isBlank(whenRule) || "true".equals(whenRule) ? new HashMap<String, MatchPair>() : parseRule(whenRule);
            Map<String, MatchPair> then = StringUtils.isBlank(thenRule) || "false".equals(thenRule) ? null : parseRule(thenRule);
            // NOTE: When条件是容许为空的,外部业务来保证相似的约束条件
            this.whenCondition = when;
            this.thenCondition = then;
        } catch (ParseException e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }
    
   
    /**
     * 将对应的rule信息解析为对应的MatchPair
     * host=10.20.153.10解析出来就是一个host:MatchPair,Matcher的matches内容为10.20.153.10
     * host!=10.20.153.10解析出来就是一个host:MatchPair,Matcher的mismatches内容为10.20.153.10
     * 能够理解为MatcherPair就是区分matches和mismatches的具体聚合类,拿到这个Matcher就拿到表达式初步解析后的数据
     * @param rule
     * @return
     * @throws ParseException
     */
    private static Map<String, MatchPair> parseRule(String rule)
            throws ParseException {
        Map<String, MatchPair> condition = new HashMap<String, MatchPair>();
        if(StringUtils.isBlank(rule)) {
            return condition;
        }        
        // 匹配或不匹配Key-Value对
        MatchPair pair = null;
        // 多个Value值
        Set<String> values = null;
        final Matcher matcher = ROUTE_PATTERN.matcher(rule);
        //例如:host=10.20.153.10 第一次匹配的group1='',group2='host',第二次匹配的group1='=',group2='10.20.153.10'
        while (matcher.find()) { // 逐个匹配
            String separator = matcher.group(1);
            String content = matcher.group(2);
            // 表达式开始
            if (separator == null || separator.length() == 0) {
                pair = new MatchPair();
                //'host':new MatchPair()
                condition.put(content, pair);
            }
            // &符号尚未遇到过
            else if ("&".equals(separator)) {
                if (condition.get(content) == null) {
                    pair = new MatchPair();
                    condition.put(content, pair);
                } else {
                    condition.put(content, pair);
                }
            }
            // 匹配=号部分
            else if ("=".equals(separator)) {
                if (pair == null)
                    throw new RuntimeException();
                values = pair.matches;
                values.add(content);
            }
            // 匹配!=号部分
            else if ("!=".equals(separator)) {
                if (pair == null)
                    throw new RuntimeException();
                values = pair.mismatches;
                values.add(content);
            }
            // ,号直接跟着前面的=或者!=走
            else if (",".equals(separator)) {
                if (values == null || values.size() == 0)
                    throw new RuntimeException();
                values.add(content);
            } else {
                throw new RuntimeException();
            }
        }
        return condition;
    }
    
    public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation)
            throws RpcException {
        if (invokers == null || invokers.size() == 0) {
            return invokers;
        }
        try {
            //若是没有匹配到的前置条件后直接返回,意思就是当前做用的consumer信息不须要通过路由操做
            // 若是路由的配置值有$开头的话就将其替换为URL中对应的key的value
            if (! matchWhen(url)) {
                return invokers;
            }
            List<Invoker<T>> result = new ArrayList<Invoker<T>>();
            //黑名单
            if (thenCondition == null) {
                logger.warn("The current consumer in the service blacklist. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey());
                return result;
            }
            for (Invoker<T> invoker : invokers) {
                //逐个匹配后置条件
                if (matchThen(invoker.getUrl(), url)) {
                    result.add(invoker);
                }
            }
            if (result.size() > 0) {
                return result;
                //感受强制执行的话返回一个空的List并无卵用呀
            } else if (force) {
                logger.warn("The route result is empty and force execute. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey() + ", router: " + url.getParameterAndDecoded(Constants.RULE_KEY));
                return result;
            }
        } catch (Throwable t) {
            logger.error("Failed to execute condition router rule: " + getUrl() + ", invokers: " + invokers + ", cause: " + t.getMessage(), t);
        }
        return invokers;
    }

经过对于上面三方面的介绍咱们已经基本了解Dubbo对于服务治理的支持点,这三个内容是做为一个完善的RPC框架的强心剂。让咱们干了这碗鸡汤~~~

相关文章
相关标签/搜索