注:Dubbo版本是2.6.2。java
图1 Dubbo的ForkingClusterInvoker类继承图ide
并行调用多个服务,只要一个成功即返回,可是这要消耗更多的资源。spa
核心代码在ForkingClusterInvoker的doInvoke(Invocation,List<Invoker<T>>,LoadBalance)中,源码以下。代码看起来比较多,可是咱们分析主要逻辑的话,不复杂。线程
@Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { checkInvokers(invokers, invocation); final List<Invoker<T>> selected; final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS); final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); if (forks <= 0 || forks >= invokers.size()) { selected = invokers; } else { selected = new ArrayList<Invoker<T>>(); for (int i = 0; i < forks; i++) { Invoker<T> invoker = select(loadbalance, invocation, invokers, selected); if (!selected.contains(invoker)) {//Avoid add the same invoker several times. selected.add(invoker); } } } RpcContext.getContext().setInvokers((List) selected); final AtomicInteger count = new AtomicInteger(); final BlockingQueue<Object> ref = new LinkedBlockingQueue<Object>(); for (final Invoker<T> invoker : selected) { executor.execute(new Runnable() { @Override public void run() { try { Result result = invoker.invoke(invocation); ref.offer(result); } catch (Throwable e) { int value = count.incrementAndGet(); if (value >= selected.size()) { ref.offer(e); } } } }); } try { Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS); if (ret instanceof Throwable) { Throwable e = (Throwable) ret; throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); } return (Result) ret; } catch (InterruptedException e) { throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e); } }
} catch (Throwable e) { int value = count.incrementAndGet(); if (value >= selected.size()) { ref.offer(e); } }
能够看到,咱们的请求是异常的。在最后,从ref队列中取出第一个,假设为r,并且是带有超时的那种等待。若是从队列中poll时,抛出InterruptedException异常,则将其封装后抛出;若是r是一个Throwable类型,则说明全部的请求都失败了,抛出异常;若是r不是Throwable则说明存在请求成功的状况,返回r。code
try { Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS); if (ret instanceof Throwable) { Throwable e = (Throwable) ret; throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); } return (Result) ret; } catch (InterruptedException e) { throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e); }
重点是:并行调用多个服务,只要有一个成功,则返回结果,不等其它的线程执行完成。假设并行请求A一、A二、A3,A1请求使用了3s,A2请求用了2s,A3请求用了5s,且A2请求的结果最早放入到队列中,那么主线程就返回A2请求的结果,主线程不等A1和A3线程执行完成。orm