上图是服务消费的主过程:html
首先经过ReferenceConfig
类的private void init()
方法会先检查初始化全部的配置信息后,调用private T createProxy(Map<String, String> map)
建立代理,消费者最终获得的是服务的代理, 在createProxy
接着调用Protocol
接口实现的<T> Invoker<T> refer(Class<T> type, URL url)
方法生成Invoker
实例(如上图中的红色部分),这是服务消费的关键。接下来把Invoker
经过ProxyFactory
代理工厂转换为客户端须要的接口(如:HelloWorld
),建立服务代理并返回。java
一、把服务引用的信息封装成URL并注册到zk注册中心;
二、监听注册中心的服务的上下线;
三、链接服务提供端,建立NettyClient对象;
四、将这些信息包装成DubboInvoker消费端的调用链,建立消费端Invoker实例的服务代理并返回;apache
一、通过负载均衡策略,调用提供者;
二、选择其中一个服务的URL与提供者netty创建链接,使用ProxyFactory 建立远程通讯,或者本地通讯的,Invoker发到netty服务端;
三、服务器端接收到该Invoker信息后,找到对应的本地Invoker,处理Invocation请求;
四、获取异步,或同步处理结果;服务器
先看一个简单的客户端引用服务的例子,HelloService
,dubbo
配置以下:app
<dubbo:application name="consumer-of-helloService" /> <dubbo:registry protocol="zookeeper" address="127.0.0.1:2181" /> <dubbo:protocol name="dubbo" port="20880" /> <dubbo:reference id="helloService" interface="com.demo.dubbo.service.HelloService" />
Zookeeper
做为注册中心HelloService
接口服务根据以前的介绍,在Spring启动的时候,根据<dubbo:reference>
配置会建立一个ReferenceBean,该bean又实现了Spring的FactoryBean接口,因此咱们以下方式使用时:负载均衡
@Autowired private HelloService helloService;
使用的不是ReferenceBean对象,而是ReferenceBean的getObject()方法返回的对象,该对象经过代理实现了HelloService接口,因此要看服务引用的整个过程就须要从ReferenceBean.getObject()方法开始入手。异步
将ReferenceConfig.init()中的内容拆成具体的步骤,以下:async
methods=hello, timestamp=1443695417847, dubbo=2.5.3 application=consumer-of-helloService side=consumer pid=7748 interface=com.demo.dubbo.service.HelloService
若是是单个注册中心,代码以下:ide
Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); invoker = refprotocol.refer(interfaceClass, url);
上述url的内容以下:ui
registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService? application=consumer-of-helloService& dubbo=2.5.6& pid=8292& registry=zookeeper& timestamp=1443707173909& refer= application=consumer-of-helloService& dubbo=2.5.6& interface=com.demo.dubbo.service.HelloService& methods=hello& pid=8292& side=consumer& timestamp=1443707173884&
ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); proxyFactory.getProxy(invoker);
下面就详细说明下上述提到的几个概念:Protocol、Invoker、ProxyFactory
Invoker是一个可执行对象,有三种类型的Invoker:
Invoker的实现状况以下:
先来看服务引用的第2个步骤,返回Invoker对象
对于客户端来讲,Invoker应该是二、3这两种类型。先来讲第2种类型,即远程通讯的Invoker,看DubboInvoker的源码,调用过程AbstractInvoker.invoke()->doInvoke():
protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout) ; RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } else { RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }
大体内容就是:
将经过远程通讯将Invocation信息传递给服务器端,服务器端接收到该Invocation信息后,找到对应的本地Invoker,而后经过反射执行相应的方法,将方法的返回值再经过远程通讯将结果传递给客户端。
这里分3种状况:
ExchangeClient.send()
方法。ExchangeClient.request()
方法返回一个ResponseFuture
对象,经过RpcContext
中的ThreadLocal
使ResponseFuture
和当前线程绑定,未等服务端响应结果就直接返回,而后服务端经过ProtocolFilterWrapper.buildInvokerChain()
方法会调用Filter.invoke()
方法,即FutureFilter.invoker()->asyncCallback()
,会获取RpcContext
的ResponseFuture
对象,异步返回结果。ExchangeClient.request()
方法,返回一个ResponseFuture
,一直阻塞到服务端返回响应结果服务引用的第二步就是:
Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); invoker = refprotocol.refer(interfaceClass, url);
使用协议Protocol根据上述的url和服务接口来引用服务,建立出一个Invoker对象
默认实现的DubboProtocol也会通过ProtocolFilterWrapper、ProtocolListenerWrapper、RegistryProtocol的包装
首先看下RegistryProtocol.refer()方法,它干了哪些事呢?
Directory和Cluster都是服务治理的重点,接下去会单独拿一章出来说
服务引用的第三步就是:
proxyFactory.getProxy(invoker);
对于Server端,ProxyFactory主要负责将服务如HelloServiceImpl统一进行包装成一个Invoker,这些Invoker经过反射来执行具体的HelloServiceImpl对象的方法
对于client端,则是将上述建立的集群版Invoker(Cluster)建立出代理对象
代码以下:
public class JavassistProxyFactory extends AbstractProxyFactory { @SuppressWarnings("unchecked") public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }
能够看到是利用jdk自带的Proxy来动态代理目标对象Invoker,因此咱们调用建立出来的代理对象如HelloService的方法时,会执行InvokerInvocationHandler中的逻辑:
public class InvokerInvocationHandler implements InvocationHandler { private final Invoker<?> invoker; public InvokerInvocationHandler(Invoker<?> handler){ this.invoker = handler; } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } //AbstractClusterInvoker.invoke() return invoker.invoke(new RpcInvocation(method, args)).recreate(); } }
参考:http://dubbo.apache.org/books/dubbo-dev-book/implementation.html
参考:https://my.oschina.net/xiaominmin/blog/1599378