Dubbo源码解析之Directory

注:Dubbo版本是2.5.7java

                              

                                                     图1 AbstractDirectory有俩个实现类缓存

1.StaticDirectory

    StaticDirectory中的List<Invoker>是固定的,源码以下所示。构造方法中传入List<Invoker>后,类内部就没有改变了。安全

public StaticDirectory(URL url, List<Invoker<T>> invokers, List<Router> routers) {
    super(url == null && invokers != null && invokers.size() > 0 ? invokers.get(0).getUrl() : url, routers);
    if (invokers == null || invokers.size() == 0)
        throw new IllegalArgumentException("invokers == null");
    this.invokers = invokers;
}

@Override
protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
    return invokers;
}

2.RegistryDirectory

    RegistryDirectory与StaticDirectory不一样的是,RegistryDirectory中的属性会变化——注册中心的服务变更时,会推送到这里。多线程

    以前的Dubbo集群容错架构中,从Directory中获取List<Invoker>,获取List<Invoker>与RegistryDirectory的methodInvokerMap有关,以下所示。架构

public List<Invoker<T>> doList(Invocation invocation) {
    if (forbidden) {
        // 1. 没有服务提供者 2. 服务提供者被禁用
        throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
            "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +  NetUtils.getLocalHost()
                + " use dubbo version " + Version.getVersion() + ", may be providers disabled or not registered ?");
    }
    List<Invoker<T>> invokers = null;
    Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
    if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
        String methodName = RpcUtils.getMethodName(invocation);
        Object[] args = RpcUtils.getArguments(invocation);
        if (args != null && args.length > 0 && args[0] != null
                && (args[0] instanceof String || args[0].getClass().isEnum())) {
            invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // 可根据第一个参数枚举路由
        }
        if (invokers == null) {
            invokers = localMethodInvokerMap.get(methodName);
        }
        if (invokers == null) {
            invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
        }
        if (invokers == null) {
            Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
            if (iterator.hasNext()) {
                invokers = iterator.next();
            }
        }
    }
    return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}

    那么methodInvokeMap的值是从哪来的,以下所示,notify(List<URL>)是NotifyListener的方法,方法中传入一个List<URL>,解析这个List<URL>,configurators的值、routers的值都是在这个方法中更新的,methodInvokerMap是在refreshInvoker中更新的。以下所示。框架

public synchronized void notify(List<URL> urls) {
    List<URL> invokerUrls = new ArrayList<URL>();
    List<URL> routerUrls = new ArrayList<URL>();
    List<URL> configuratorUrls = new ArrayList<URL>();
    for (URL url : urls) {
        String protocol = url.getProtocol();
        String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
        if (Constants.ROUTERS_CATEGORY.equals(category)
                || Constants.ROUTE_PROTOCOL.equals(protocol)) {
            routerUrls.add(url);
        } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
                || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
            configuratorUrls.add(url);
        } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
            invokerUrls.add(url);
        } else {
            logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
        }
    }
    // configurators
    if (configuratorUrls != null && configuratorUrls.size() > 0) {
        this.configurators = toConfigurators(configuratorUrls);
    }
    // routers
    if (routerUrls != null && routerUrls.size() > 0) {
        List<Router> routers = toRouters(routerUrls);
        if (routers != null) { // null - do nothing
            setRouters(routers);
        }
    }
    List<Configurator> localConfigurators = this.configurators; // local reference
    // 合并override参数
    this.overrideDirectoryUrl = directoryUrl;
    if (localConfigurators != null && localConfigurators.size() > 0) {
        for (Configurator configurator : localConfigurators) {
            this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
        }
    }
    // providers
    refreshInvoker(invokerUrls);
}

    下面咱们来看refreshInvoker中的实现,涉及缓存之类的,可是咱们丢开细节,看主要的,能够看到this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap。这个方法中还更新了urlInvokerMap。以下所示。ide

private void refreshInvoker(List<URL> invokerUrls) {
    if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
            && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
        this.forbidden = true; // 禁止访问
        this.methodInvokerMap = null; // 置空列表
        destroyAllInvokers(); // 关闭全部Invoker
    } else {
        this.forbidden = false; // 容许访问
        Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
        if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null) {
            invokerUrls.addAll(this.cachedInvokerUrls);
        } else {
            this.cachedInvokerUrls = new HashSet<URL>();
            this.cachedInvokerUrls.addAll(invokerUrls);//缓存invokerUrls列表,便于交叉对比
        }
        if (invokerUrls.size() == 0) {
            return;
        }
        Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// 将URL列表转成Invoker列表
        Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 换方法名映射Invoker列表
        // state change
        //若是计算错误,则不进行处理.
        if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
            logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));
            return;
        }
        this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
        this.urlInvokerMap = newUrlInvokerMap;
        try {
            destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // 关闭未使用的Invoker
        } catch (Exception e) {
            logger.warn("destroyUnusedInvokers error. ", e);
        }
    }
}

3.思考

    1.RegistryDirectory中,methodInvokerMap是否有多线程的问题?由于服务注册中心变动的同时,会有服务消费者访问。methodInvokerMapnotify(List<URL> urls)方法上有synchronized;refreshInvoker(List<URL> invokerUrls)中更新methodInvokerMap是对其彻底赋值,而不是修改methodInvokerMap中的元素。基于这俩点来看是没有线程安全问题的。性能

    2.RegistryDirectory中为性能考虑,用了缓存。其实不止这里,Spring的不少地方都用了缓存。若是咱们的项目业务里相似的场景,咱们是否能够添加缓存的功能,哪些框架源码能够为咱们提供借鉴呢?this

相关文章
相关标签/搜索