该模块的使用介绍请参考dubbo官方用户手册以下章节内容。java
其中注册中心实际上是对于目录服务的一种实现方式,本文不会对注册中心进行详细讲解。算法
各节点关系:express
因为每种接口都有多种实现类,篇幅和时间有限,咱们选择其中最为典型的一种来进行源码分析。apache
集群的源码以下。api
package com.alibaba.dubbo.rpc.cluster; import com.alibaba.dubbo.common.extension.Adaptive; import com.alibaba.dubbo.common.extension.SPI; import com.alibaba.dubbo.rpc.Invoker; import com.alibaba.dubbo.rpc.RpcException; import com.alibaba.dubbo.rpc.cluster.support.FailoverCluster; /** * Cluster. (SPI, Singleton, ThreadSafe) * * <a href="http://en.wikipedia.org/wiki/Computer_cluster">Cluster</a> * <a href="http://en.wikipedia.org/wiki/Fault-tolerant_system">Fault-Tolerant</a> * * @author william.liangf */ @SPI(FailoverCluster.NAME) public interface Cluster { /** * Merge the directory invokers to a virtual invoker. * * @param <T> * @param directory * @return cluster invoker * @throws RpcException */ @Adaptive <T> Invoker<T> join(Directory<T> directory) throws RpcException; }
该接口只有一个方法,就是将directory对象中的多个invoker的集合整合成一个invoker对象。该方法被ReferenceConfig类的createProxy方法调用,调用它的代码以下。缓存
// 对有注册中心的Cluster 只用 AvailableCluster URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); invoker = cluster.join(new StaticDirectory(u, invokers));
Cluster内置有9个扩展实现类,都实现了不一样的集群容错策略,咱们只分析默认的自动故障转移的扩展实现FailoverCluster。服务器
源码以下,只是构造了一个类型为FailoverClusterInvoker的invoker对象。app
public class FailoverCluster implements Cluster { public final static String NAME = "failover"; public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new FailoverClusterInvoker<T>(directory); } }
咱们进入看看FailoverClusterInvoker的源码。
负载均衡
/** * 失败转移,当出现失败,重试其它服务器,一般用于读操做,但重试会带来更长延迟。 * * <a href="http://en.wikipedia.org/wiki/Failover">Failover</a> * * @author william.liangf * @author chao.liuc */ public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> { private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class); public FailoverClusterInvoker(Directory<T> directory) { super(directory); } @SuppressWarnings({ "unchecked", "rawtypes" }) public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyinvokers = invokers; checkInvokers(copyinvokers, invocation); int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); for (int i = 0; i < len; i++) { //重试时,进行从新选择,避免重试时invoker列表已发生变化. //注意:若是列表发生了变化,那么invoked判断会失效,由于invoker示例已经改变 if (i > 0) { checkWheatherDestoried(); copyinvokers = list(invocation); //从新检查一下 checkInvokers(copyinvokers, invocation); } Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List)invoked); try { Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le); } }
该类又继承自抽象实现类AbstractClusterInvoker,使用该类的一些方法,所以也要结合该类的源码一块儿看。less
/* * Copyright 1999-2011 Alibaba Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.dubbo.rpc.cluster.support; import java.util.ArrayList; import java.util.List; import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.common.Version; import com.alibaba.dubbo.common.extension.ExtensionLoader; import com.alibaba.dubbo.common.logger.Logger; import com.alibaba.dubbo.common.logger.LoggerFactory; import com.alibaba.dubbo.common.utils.NetUtils; import com.alibaba.dubbo.rpc.Invocation; import com.alibaba.dubbo.rpc.Invoker; import com.alibaba.dubbo.rpc.Result; import com.alibaba.dubbo.rpc.RpcException; import com.alibaba.dubbo.rpc.cluster.Directory; import com.alibaba.dubbo.rpc.cluster.LoadBalance; import com.alibaba.dubbo.rpc.support.RpcUtils; /** * AbstractClusterInvoker * * @author william.liangf * @author chao.liuc */ public abstract class AbstractClusterInvoker<T> implements Invoker<T> { private static final Logger logger = LoggerFactory .getLogger(AbstractClusterInvoker.class); protected final Directory<T> directory; protected final boolean availablecheck; private volatile boolean destroyed = false; private volatile Invoker<T> stickyInvoker = null; public AbstractClusterInvoker(Directory<T> directory) { this(directory, directory.getUrl()); } public AbstractClusterInvoker(Directory<T> directory, URL url) { if (directory == null) throw new IllegalArgumentException("service directory == null"); this.directory = directory ; //sticky 须要检测 avaliablecheck this.availablecheck = url.getParameter(Constants.CLUSTER_AVAILABLE_CHECK_KEY, Constants.DEFAULT_CLUSTER_AVAILABLE_CHECK) ; } public Class<T> getInterface() { return directory.getInterface(); } public URL getUrl() { return directory.getUrl(); } public boolean isAvailable() { Invoker<T> invoker = stickyInvoker; if (invoker != null) { return invoker.isAvailable(); } return directory.isAvailable(); } public void destroy() { directory.destroy(); destroyed = true; } /** * 使用loadbalance选择invoker.</br> * a)先lb选择,若是在selected列表中 或者 不可用且作检验时,进入下一步(重选),不然直接返回</br> * b)重选验证规则:selected > available .保证重选出的结果尽可能不在select中,而且是可用的 * * @param availablecheck 若是设置true,在选择的时候先选invoker.available == true * @param selected 已选过的invoker.注意:输入保证不重复 * */ protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { if (invokers == null || invokers.size() == 0) return null; String methodName = invocation == null ? "" : invocation.getMethodName(); boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName,Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY) ; { //ignore overloaded method if ( stickyInvoker != null && !invokers.contains(stickyInvoker) ){ stickyInvoker = null; } //ignore cucurrent problem if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))){ if (availablecheck && stickyInvoker.isAvailable()){ return stickyInvoker; } } } Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected); if (sticky){ stickyInvoker = invoker; } return invoker; } private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { if (invokers == null || invokers.size() == 0) return null; if (invokers.size() == 1) return invokers.get(0); // 若是只有两个invoker,退化成轮循 if (invokers.size() == 2 && selected != null && selected.size() > 0) { return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0); } Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation); //若是 selected中包含(优先判断) 或者 不可用&&availablecheck=true 则重试. if( (selected != null && selected.contains(invoker)) ||(!invoker.isAvailable() && getUrl()!=null && availablecheck)){ try{ Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck); if(rinvoker != null){ invoker = rinvoker; }else{ //看下第一次选的位置,若是不是最后,选+1位置. int index = invokers.indexOf(invoker); try{ //最后在避免碰撞 invoker = index <invokers.size()-1?invokers.get(index+1) :invoker; }catch (Exception e) { logger.warn(e.getMessage()+" may because invokers list dynamic change, ignore.",e); } } }catch (Throwable t){ logger.error("clustor relselect fail reason is :"+t.getMessage() +" if can not slove ,you can set cluster.availablecheck=false in url",t); } } return invoker; } /** * 重选,先从非selected的列表中选择,没有在从selected列表中选择. * @param loadbalance * @param invocation * @param invokers * @param selected * @return * @throws RpcException */ private Invoker<T> reselect(LoadBalance loadbalance,Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected ,boolean availablecheck) throws RpcException { //预先分配一个,这个列表是必定会用到的. List<Invoker<T>> reselectInvokers = new ArrayList<Invoker<T>>(invokers.size()>1?(invokers.size()-1):invokers.size()); //先从非select中选 if( availablecheck ){ //选isAvailable 的非select for(Invoker<T> invoker : invokers){ if(invoker.isAvailable()){ if(selected ==null || !selected.contains(invoker)){ reselectInvokers.add(invoker); } } } if(reselectInvokers.size()>0){ return loadbalance.select(reselectInvokers, getUrl(), invocation); } }else{ //选所有非select for(Invoker<T> invoker : invokers){ if(selected ==null || !selected.contains(invoker)){ reselectInvokers.add(invoker); } } if(reselectInvokers.size()>0){ return loadbalance.select(reselectInvokers, getUrl(), invocation); } } //最后从select中选可用的. { if(selected != null){ for(Invoker<T> invoker : selected){ if((invoker.isAvailable()) //优先选available && !reselectInvokers.contains(invoker)){ reselectInvokers.add(invoker); } } } if(reselectInvokers.size()>0){ return loadbalance.select(reselectInvokers, getUrl(), invocation); } } return null; } public Result invoke(final Invocation invocation) throws RpcException { checkWheatherDestoried(); LoadBalance loadbalance; List<Invoker<T>> invokers = list(invocation); if (invokers != null && invokers.size() > 0) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } else { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); } protected void checkWheatherDestoried() { if(destroyed){ throw new RpcException("Rpc cluster invoker for " + getInterface() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + " is now destroyed! Can not invoke any more."); } } @Override public String toString() { return getInterface() + " -> " + getUrl().toString(); } protected void checkInvokers(List<Invoker<T>> invokers, Invocation invocation) { if (invokers == null || invokers.size() == 0) { throw new RpcException("Failed to invoke the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + ". No provider available for the service " + directory.getUrl().getServiceKey() + " from registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Please check if the providers have been started and registered."); } } protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException; protected List<Invoker<T>> list(Invocation invocation) throws RpcException { List<Invoker<T>> invokers = directory.list(invocation); return invokers; } }
源码实现分析。
负载均衡器
@SPI(RandomLoadBalance.NAME) public interface LoadBalance { /** * select one invoker in list. * * @param invokers invokers. * @param url refer url * @param invocation invocation. * @return selected invoker. */ @Adaptive("loadbalance") <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException; }
上述源码所示,负载均衡只定义了一个方法,就是在候选的invokers中选择一个invoker对象出来。默认的扩展实现是random。那我么就分析RandomLoadBalance的源码。
/* * Copyright 1999-2011 Alibaba Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.dubbo.rpc.cluster.loadbalance; import java.util.List; import java.util.Random; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.rpc.Invocation; import com.alibaba.dubbo.rpc.Invoker; /** * random load balance. * * @author qianlei * @author william.liangf */ public class RandomLoadBalance extends AbstractLoadBalance { public static final String NAME = "random"; private final Random random = new Random(); protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) { int length = invokers.size(); // 总个数 int totalWeight = 0; // 总权重 boolean sameWeight = true; // 权重是否都同样 for (int i = 0; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); totalWeight += weight; // 累计总权重 if (sameWeight && i > 0 && weight != getWeight(invokers.get(i - 1), invocation)) { sameWeight = false; // 计算全部权重是否同样 } } if (totalWeight > 0 && ! sameWeight) { // 若是权重不相同且权重大于0则按总权重数随机 int offset = random.nextInt(totalWeight); // 并肯定随机值落在哪一个片段上 for (int i = 0; i < length; i++) { offset -= getWeight(invokers.get(i), invocation); if (offset < 0) { return invokers.get(i); } } } // 若是权重相同或权重为0则均等随机 return invokers.get(random.nextInt(length)); } }
该类继承了抽象类AbstractLoadBalance,所以咱们也要结合该类一块儿分析。
/* * Copyright 1999-2011 Alibaba Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.dubbo.rpc.cluster.loadbalance; import java.util.List; import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.rpc.Invoker; import com.alibaba.dubbo.rpc.Invocation; import com.alibaba.dubbo.rpc.cluster.LoadBalance; /** * AbstractLoadBalance * * @author william.liangf */ public abstract class AbstractLoadBalance implements LoadBalance { public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) { if (invokers == null || invokers.size() == 0) return null; if (invokers.size() == 1) return invokers.get(0); return doSelect(invokers, url, invocation); } protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation); protected int getWeight(Invoker<?> invoker, Invocation invocation) { int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT); if (weight > 0) { long timestamp = invoker.getUrl().getParameter(Constants.TIMESTAMP_KEY, 0L); if (timestamp > 0L) { int uptime = (int) (System.currentTimeMillis() - timestamp); int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP); if (uptime > 0 && uptime < warmup) { weight = calculateWarmupWeight(uptime, warmup, weight); } } } return weight; } static int calculateWarmupWeight(int uptime, int warmup, int weight) { int ww = (int) ( (float) uptime / ( (float) warmup / (float) weight ) ); return ww < 1 ? 1 : (ww > weight ? weight : ww); } }
源码分析以下:
public interface Router extends Comparable<Router> { /** * get the router url. * * @return url */ URL getUrl(); /** * route. * * @param invokers * @param url refer url * @param invocation * @return routed invokers * @throws RpcException */ <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException; }
路由器就定义了上述2个方法,核心方法是route,从大的invoker列表结合中根据规则过滤出一个子集合。咱们这里只分析实现类ConditionRouter的源码。
/* * Copyright 1999-2012 Alibaba Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.dubbo.rpc.cluster.router.condition; import java.text.ParseException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.common.logger.Logger; import com.alibaba.dubbo.common.logger.LoggerFactory; import com.alibaba.dubbo.common.utils.NetUtils; import com.alibaba.dubbo.common.utils.StringUtils; import com.alibaba.dubbo.common.utils.UrlUtils; import com.alibaba.dubbo.rpc.Invocation; import com.alibaba.dubbo.rpc.Invoker; import com.alibaba.dubbo.rpc.RpcException; import com.alibaba.dubbo.rpc.cluster.Router; /** * ConditionRouter * * @author william.liangf */ public class ConditionRouter implements Router, Comparable<Router> { private static final Logger logger = LoggerFactory.getLogger(ConditionRouter.class); private final URL url; private final int priority; private final boolean force; private final Map<String, MatchPair> whenCondition; private final Map<String, MatchPair> thenCondition; public ConditionRouter(URL url) { this.url = url; this.priority = url.getParameter(Constants.PRIORITY_KEY, 0); this.force = url.getParameter(Constants.FORCE_KEY, false); try { String rule = url.getParameterAndDecoded(Constants.RULE_KEY); if (rule == null || rule.trim().length() == 0) { throw new IllegalArgumentException("Illegal route rule!"); } rule = rule.replace("consumer.", "").replace("provider.", ""); int i = rule.indexOf("=>"); String whenRule = i < 0 ? null : rule.substring(0, i).trim(); String thenRule = i < 0 ? rule.trim() : rule.substring(i + 2).trim(); Map<String, MatchPair> when = StringUtils.isBlank(whenRule) || "true".equals(whenRule) ? new HashMap<String, MatchPair>() : parseRule(whenRule); Map<String, MatchPair> then = StringUtils.isBlank(thenRule) || "false".equals(thenRule) ? null : parseRule(thenRule); // NOTE: When条件是容许为空的,外部业务来保证相似的约束条件 this.whenCondition = when; this.thenCondition = then; } catch (ParseException e) { throw new IllegalStateException(e.getMessage(), e); } } public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException { if (invokers == null || invokers.size() == 0) { return invokers; } try { if (! matchWhen(url)) { return invokers; } List<Invoker<T>> result = new ArrayList<Invoker<T>>(); if (thenCondition == null) { logger.warn("The current consumer in the service blacklist. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey()); return result; } for (Invoker<T> invoker : invokers) { if (matchThen(invoker.getUrl(), url)) { result.add(invoker); } } if (result.size() > 0) { return result; } else if (force) { logger.warn("The route result is empty and force execute. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey() + ", router: " + url.getParameterAndDecoded(Constants.RULE_KEY)); return result; } } catch (Throwable t) { logger.error("Failed to execute condition router rule: " + getUrl() + ", invokers: " + invokers + ", cause: " + t.getMessage(), t); } return invokers; } public URL getUrl() { return url; } public int compareTo(Router o) { if (o == null || o.getClass() != ConditionRouter.class) { return 1; } ConditionRouter c = (ConditionRouter) o; return this.priority == c.priority ? url.toFullString().compareTo(c.url.toFullString()) : (this.priority > c.priority ? 1 : -1); } public boolean matchWhen(URL url) { return matchCondition(whenCondition, url, null); } public boolean matchThen(URL url, URL param) { return thenCondition != null && matchCondition(thenCondition, url, param); } private boolean matchCondition(Map<String, MatchPair> condition, URL url, URL param) { Map<String, String> sample = url.toMap(); for (Map.Entry<String, String> entry : sample.entrySet()) { String key = entry.getKey(); MatchPair pair = condition.get(key); if (pair != null && ! pair.isMatch(entry.getValue(), param)) { return false; } } return true; } private static Pattern ROUTE_PATTERN = Pattern.compile("([&!=,]*)\\s*([^&!=,\\s]+)"); private static Map<String, MatchPair> parseRule(String rule) throws ParseException { Map<String, MatchPair> condition = new HashMap<String, MatchPair>(); if(StringUtils.isBlank(rule)) { return condition; } // 匹配或不匹配Key-Value对 MatchPair pair = null; // 多个Value值 Set<String> values = null; final Matcher matcher = ROUTE_PATTERN.matcher(rule); while (matcher.find()) { // 逐个匹配 String separator = matcher.group(1); String content = matcher.group(2); // 表达式开始 if (separator == null || separator.length() == 0) { pair = new MatchPair(); condition.put(content, pair); } // KV开始 else if ("&".equals(separator)) { if (condition.get(content) == null) { pair = new MatchPair(); condition.put(content, pair); } else { condition.put(content, pair); } } // KV的Value部分开始 else if ("=".equals(separator)) { if (pair == null) throw new ParseException("Illegal route rule \"" + rule + "\", The error char '" + separator + "' at index " + matcher.start() + " before \"" + content + "\".", matcher.start()); values = pair.matches; values.add(content); } // KV的Value部分开始 else if ("!=".equals(separator)) { if (pair == null) throw new ParseException("Illegal route rule \"" + rule + "\", The error char '" + separator + "' at index " + matcher.start() + " before \"" + content + "\".", matcher.start()); values = pair.mismatches; values.add(content); } // KV的Value部分的多个条目 else if (",".equals(separator)) { // 若是为逗号表示 if (values == null || values.size() == 0) throw new ParseException("Illegal route rule \"" + rule + "\", The error char '" + separator + "' at index " + matcher.start() + " before \"" + content + "\".", matcher.start()); values.add(content); } else { throw new ParseException("Illegal route rule \"" + rule + "\", The error char '" + separator + "' at index " + matcher.start() + " before \"" + content + "\".", matcher.start()); } } return condition; } private static final class MatchPair { final Set<String> matches = new HashSet<String>(); final Set<String> mismatches = new HashSet<String>(); public boolean isMatch(String value, URL param) { for (String match : matches) { if (! UrlUtils.isMatchGlobPattern(match, value, param)) { return false; } } for (String mismatch : mismatches) { if (UrlUtils.isMatchGlobPattern(mismatch, value, param)) { return false; } } return true; } } }
该源码实现了以下条件路由器功能。
基于条件表达式的路由规则,如:
|
规则:
表达式:
public interface Directory<T> extends Node { /** * get service type. * * @return service type. */ Class<T> getInterface(); /** * list invokers. * * @return invokers */ List<Invoker<T>> list(Invocation invocation) throws RpcException; }
目录服务定义了一个核心接口list,就是列举出某个接口在目录中的全部服务列表。
提供了一个抽象的目录实现类,源码以下。
/** * 增长router的Directory * * @author chao.liuc */ public abstract class AbstractDirectory<T> implements Directory<T> { // 日志输出 private static final Logger logger = LoggerFactory.getLogger(AbstractDirectory.class); private final URL url ; private volatile boolean destroyed = false; private volatile URL consumerUrl ; private volatile List<Router> routers; public AbstractDirectory(URL url) { this(url, null); } public AbstractDirectory(URL url, List<Router> routers) { this(url, url, routers); } public AbstractDirectory(URL url, URL consumerUrl, List<Router> routers) { if (url == null) throw new IllegalArgumentException("url == null"); this.url = url; this.consumerUrl = consumerUrl; setRouters(routers); } public List<Invoker<T>> list(Invocation invocation) throws RpcException { if (destroyed){ throw new RpcException("Directory already destroyed .url: "+ getUrl()); } List<Invoker<T>> invokers = doList(invocation); List<Router> localRouters = this.routers; // local reference if (localRouters != null && localRouters.size() > 0) { for (Router router: localRouters){ try { if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, true)) { invokers = router.route(invokers, getConsumerUrl(), invocation); } } catch (Throwable t) { logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t); } } } return invokers; } public URL getUrl() { return url; } public List<Router> getRouters(){ return routers; } public URL getConsumerUrl() { return consumerUrl; } public void setConsumerUrl(URL consumerUrl) { this.consumerUrl = consumerUrl; } protected void setRouters(List<Router> routers){ // copy list routers = routers == null ? new ArrayList<Router>() : new ArrayList<Router>(routers); // append url router String routerkey = url.getParameter(Constants.ROUTER_KEY); if (routerkey != null && routerkey.length() > 0) { RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getExtension(routerkey); routers.add(routerFactory.getRouter(url)); } // append mock invoker selector routers.add(new MockInvokersSelector()); Collections.sort(routers); this.routers = routers; } public boolean isDestroyed() { return destroyed; } public void destroy(){ destroyed = true; } protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException ; }
list方法的实现逻辑是:先检查目录是否销毁状态,若已经销毁则抛出异常;调用抽象方法doList实现真正的从目录服务中获取invoker列表,该方法须要子类实现;循环对象中的路由器列表,若路由器url为null或者参数runtime为true则调用该路由器的route方法进行路由,将返回的invoker列表替换为路由后的结果; 返回最终的invoker列表。
setRouters方法是设置路由器列表,除了参数参入的routers以外,还会追加2个默认的路由器,一个是参数router指定的routerFactory得到的router,另一个是MockInvokersSelector对象;
模块还提供了一个默认目录实现类StaticDirectory,它是一个静态的内存缓存目录服务实现。源码以下:
public class StaticDirectory<T> extends AbstractDirectory<T> { private final List<Invoker<T>> invokers; public StaticDirectory(List<Invoker<T>> invokers){ this(null, invokers, null); } public StaticDirectory(List<Invoker<T>> invokers, List<Router> routers){ this(null, invokers, routers); } public StaticDirectory(URL url, List<Invoker<T>> invokers) { this(url, invokers, null); } public StaticDirectory(URL url, List<Invoker<T>> invokers, List<Router> routers) { super(url == null && invokers != null && invokers.size() > 0 ? invokers.get(0).getUrl() : url, routers); if (invokers == null || invokers.size() == 0) throw new IllegalArgumentException("invokers == null"); this.invokers = invokers; } public Class<T> getInterface() { return invokers.get(0).getInterface(); } public boolean isAvailable() { if (isDestroyed()) { return false; } for (Invoker<T> invoker : invokers) { if (invoker.isAvailable()) { return true; } } return false; } public void destroy() { if(isDestroyed()) { return; } super.destroy(); for (Invoker<T> invoker : invokers) { invoker.destroy(); } invokers.clear(); } @Override protected List<Invoker<T>> doList(Invocation invocation) throws RpcException { return invokers; } }
它的doList方法的实现是直接将属性invokers的值返回,很是简单。
此外还有一个RegistryDirectory的实现类,该类是整合了注册中心和目录服务。
由于考虑到本模块与dubbo-registry相关性较大,接下来咱们将研究dubbo-registry-api和dubbo-registry-default模块的源码。