ReferenceBean
跟ServiceBean
同样,都实现了InitializingBean
的接口,天然也会调用afterPropertiesSet
方法在,这个方法和dubbo的启动过程(二)--服务方属性配置雷同,这边就不继续了。除了InitializingBean
这个接口,他还继承了FactoryBean
接口,实例化的时候,就会调用getObject
方法,例子看spring学习之FactoryBean,源码见doGetObjectFromFactoryBean。spring
public Object getObject() { return get(); } public synchronized T get() { // 各类配置的更新 checkAndUpdateSubConfigs(); if (destroyed) { throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!"); } if (ref == null) { init(); } return ref; }
// 以上代码跟服务方雷同,省略 ref = createProxy(map); String serviceKey = URL.buildKey(interfaceName, group, version); ApplicationModel.initConsumerModel(serviceKey, buildConsumerModel(serviceKey, attributes)); initialized = true;
在createProxy
方法中,会调用RegistryProtocol
的refer
方法,这里先略过ProtocolFilterWrapper
、ProtocolListenerWrapper
、QosProtocolWrapper
。segmentfault
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { // 获取注册中心 url = URLBuilder.from(url) .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY)) .removeParameter(REGISTRY_KEY) .build(); Registry registry = registryFactory.getRegistry(url); if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } // group="a,b" or group="*" Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY)); String group = qs.get(GROUP_KEY); if (group != null && group.length() > 0) { if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) { return doRefer(getMergeableCluster(), registry, type, url); } } // 关联引用 return doRefer(cluster, registry, type, url); } private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); // all attributes of REFER_KEY Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters()); // 把消费方信息注册到注册中心 URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters); if (!ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true)) { directory.setRegisteredConsumerUrl(getRegisteredConsumerUrl(subscribeUrl, url)); registry.register(directory.getRegisteredConsumerUrl()); } directory.buildRouterChain(subscribeUrl); directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY)); // 获取invoker并缓存 Invoker invoker = cluster.join(directory); ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory); return invoker; }
cluster.join(directory)这个代码,会执行包装类MockClusterWrapper的join方法:缓存
public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new MockClusterInvoker<T>(directory, this.cluster.join(directory)); }
因此invoke是MockClusterInvoker。app
在createProxy
方法中,invoker建立完了,就开始建立代理,代理类中,经过newInstance的构造参数,把InvokerInvocationHandler传入到成员变量中,后面调用的方法,就是经过这个类的invoke方法进行反射的。负载均衡
// 调用下面的JavassistProxyFactory的getProxy方法 return (T) PROXY_FACTORY.getProxy(invoker); // 注意传入的是InvokerInvocationHandler public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }
当执行方法的时候,就会执行InvokerInvocationHandler的invoke方法:异步
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); // Object直接调用返回 if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } // toString、hashCode、equals返回响应的invoke方法 if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } // 这个invoke就是MockClusterInvoker,RpcInvocation传入方法参数信息 return invoker.invoke(new RpcInvocation(method, args)).recreate(); }
MockClusterInvoker#invokeasync
public Result invoke(Invocation invocation) throws RpcException { Result result = null; // 是否配置了mock String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim(); // 。。。省略部分代码。若是强制调用mock,调用本地的mock result = doMockInvoke(invocation, null); // 没有mock,调用FailoverClusterInvoker的方法 result = this.invoker.invoke(invocation); // 。。。省略 return result; }
这边会调用FailoverClusterInvoker的父类AbstractClusterInvoker#invokeide
public Result invoke(final Invocation invocation) throws RpcException { // 检查是否被Destroyed checkWhetherDestroyed(); // binding attachments into invocation. Map<String, String> contextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { ((RpcInvocation) invocation).addAttachments(contextAttachments); } // 获取Invoker列表 List<Invoker<T>> invokers = list(invocation); // 负载均衡 LoadBalance loadbalance = initLoadBalance(invokers, invocation); RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); // FailoverClusterInvoker#doInvoke return doInvoke(invocation, invokers, loadbalance); }
FailoverClusterInvoker#doInvoke
失败重试、负载均衡这里设置oop
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyInvokers = invokers; // 检查invokers是否为空 checkInvokers(copyInvokers, invocation); // 获取方法名 String methodName = RpcUtils.getMethodName(invocation); // 失败重试次数 int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. // 最后一次exception RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); for (int i = 0; i < len; i++) { //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. if (i > 0) { // 检查是否Destroyed checkWhetherDestroyed(); // 获取Invokers copyInvokers = list(invocation); // check again 检查invokers是否为空 checkInvokers(copyInvokers, invocation); } Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { // InvokerWrapper#invoke Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { // 。。。。 } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } // 。。。。 }
InvokerWrapper#invoke:
这个invoker是ProtocolFilterWrapper,在这个invoke中,会调用ProtocolFilterWrapper&CallbackRegistrationInvoker#invoke-->ConsumerContextFilter#invoke-->FutureFilter#invoke-->MonitorFilter#invoke-->ListenerInvokerWrapper#invoke-->AsyncToSyncInvoker#invoke-->DubboInvoker#invoke学习
public Result invoke(Invocation invocation) throws RpcException { return invoker.invoke(invocation); }
ConsumerContextFilter#invoke:
在上下文设置invoker、invocation、LocalAddress、RemoteAddress、RemoteApplicationName、Attachment,再把invoker设置到invocation中,执行完下一个invoke后,再清空上下文信息。
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { RpcContext.getContext() .setInvoker(invoker) .setInvocation(invocation) .setLocalAddress(NetUtils.getLocalHost(), 0) .setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort()) .setRemoteApplicationName(invoker.getUrl().getParameter(REMOTE_APPLICATION_KEY)) .setAttachment(REMOTE_APPLICATION_KEY, invoker.getUrl().getParameter(APPLICATION_KEY)); if (invocation instanceof RpcInvocation) { ((RpcInvocation) invocation).setInvoker(invoker); } try { RpcContext.removeServerContext(); return invoker.invoke(invocation); } finally { RpcContext.removeContext(); } }
FutureFilter#invoke:
主要是设置回调的
public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException { fireInvokeCallback(invoker, invocation); // need to configure if there's return value before the invocation in order to help invoker to judge if it's // necessary to return future. return invoker.invoke(invocation); }
MonitorFilter#invoke:
接口对应的方法调用的次数。
key为:接口全限定命.方法
value是AtomicInteger类型,记录调用次数
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { if (invoker.getUrl().hasParameter(MONITOR_KEY)) { invocation.setAttachment(MONITOR_FILTER_START_TIME, String.valueOf(System.currentTimeMillis())); getConcurrent(invoker, invocation).incrementAndGet(); // count up } return invoker.invoke(invocation); // proceed invocation chain } // concurrent counter private AtomicInteger getConcurrent(Invoker<?> invoker, Invocation invocation) { String key = invoker.getInterface().getName() + "." + invocation.getMethodName(); AtomicInteger concurrent = concurrents.get(key); if (concurrent == null) { concurrents.putIfAbsent(key, new AtomicInteger()); concurrent = concurrents.get(key); } return concurrent; }
ListenerInvokerWrapper#invoke:
直接调用AsyncToSyncInvoker#invoke
public Result invoke(Invocation invocation) throws RpcException { return invoker.invoke(invocation); }
AsyncToSyncInvoker#invoke:
若是是同步,就阻塞等待返回,若是是异步,就直接返回
public Result invoke(Invocation invocation) throws RpcException { Result asyncResult = invoker.invoke(invocation); try { // 同步就阻塞等待返回 if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) { asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); } } catch (InterruptedException e) { throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return! method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (ExecutionException e) { Throwable t = e.getCause(); if (t instanceof TimeoutException) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } else if (t instanceof RemotingException) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } } catch (Throwable e) { throw new RpcException(e.getMessage(), e); } return asyncResult; }
AbstractInvoker#invoke
public Result invoke(Invocation inv) throws RpcException { // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed if (destroyed.get()) { logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, " + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer"); } RpcInvocation invocation = (RpcInvocation) inv; // 传入当前的dubboInvoker invocation.setInvoker(this); if (CollectionUtils.isNotEmptyMap(attachment)) { invocation.addAttachmentsIfAbsent(attachment); } Map<String, String> contextAttachments = RpcContext.getContext().getAttachments(); if (CollectionUtils.isNotEmptyMap(contextAttachments)) { invocation.addAttachments(contextAttachments); } // 包括FUTURE、ASYNC、SYNC invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation)); // Async时设置ID RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); try { return doInvoke(invocation); } catch (InvocationTargetException e) { // biz exception Throwable te = e.getTargetException(); if (te == null) { return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation); } else { if (te instanceof RpcException) { ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION); } return AsyncRpcResult.newDefaultAsyncResult(null, te, invocation); } } catch (RpcException e) { if (e.isBiz()) { return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation); } else { throw e; } } catch (Throwable e) { return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation); } }
DubboInvoker#doInvoke
远程调用接口,获取数据
protected Result doInvoke(final Invocation invocation) throws Throwable { // 给inv设置路径和版本信息 RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(PATH_KEY, getUrl().getPath()); inv.setAttachment(VERSION_KEY, version); // 获取一个ExchangeClient ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { // 是否为Oneway boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv); CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout); asyncRpcResult.subscribeTo(responseFuture); // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter FutureContext.getContext().setCompatibleFuture(responseFuture); return asyncRpcResult; } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }