今天,咱们来看看dubbo消费的执行过程
首先,咱们都知道dubbo是一个基于netty实现的RPC框架,底层通讯是使用netty来实现的。在学习dubbo的时候,或许咱们都会有下面的这些疑惑:
一、服务消费者只持有服务接口,咱们的消费端在执行接口请求的时候获取到的接口实现是什么?
二、消费者是如何经过netty创建同服务端的通讯的?
三、服务是怎么注册到注册中心的?
四、消费端怎么拉取服务?
五、服务的负载均衡是如何体现的?
等等这些问题都会困扰着咱们,今天咱们先来聊聊dubbo消费端的实现原理
如今,你可能已经本身经过官网的教程搭建了本身的dubbo demo服务,你在执行demo的时候会发现,服务消费者只持有服务接口,你是经过@Reference注解去获取的实现,你已经知道spring bean工厂会自动为用户建立代理实例,那么dubbo为咱们的消费者建立的代理实现是什么呢?只要开启idea的调试模式,你就能够看到咱们获得的实现实际上是:com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler。该Handler实现了InvocationHandler,InvocationHandler是JDK动态代理实现的核心接口,若是你不了解动态代理,那建议你本身去了解一下。
回到正题,咱们经过接口调用的方法都会被该Handler代理,该Handler源码以下:spring
public class InvokerInvocationHandler implements InvocationHandler { private final Invoker<?> invoker; public InvokerInvocationHandler(Invoker<?> handler) { this.invoker = handler; } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } return invoker.invoke(new RpcInvocation(method, args)).recreate(); } }
源码很简单,只有一个invoke方法,它是代理类和接口之间的桥梁。若是你再细心一点,会发现InvokerInvocationHandler中的Invoker实现类是com.alibaba.dubbo.rpc.cluster.support.wrapper.MockClusterInvoker。它是dubbo invoker的默认实现,里面封装了服务降级等功能。到这里,你基本已经知道消费者到底是怎么去调用服务的了,后面你只要继续跟着源码调试,服务是如何和netty创建联系的app
消费者请求调用链: proxy0#sayHello(String) —> InvokerInvocationHandler#invoke(Object, Method, Object[]) —> MockClusterInvoker#invoke(Invocation) —> AbstractClusterInvoker#invoke(Invocation) —> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance) —> Filter#invoke(Invoker, Invocation) // 包含多个 Filter 调用 —> ListenerInvokerWrapper#invoke(Invocation) —> AbstractInvoker#invoke(Invocation) —> DubboInvoker#doInvoke(Invocation) —> ReferenceCountExchangeClient#request(Object, int) —> HeaderExchangeClient#request(Object, int) —> HeaderExchangeChannel#request(Object, int) —> AbstractPeer#send(Object) —> AbstractClient#send(Object, boolean) —> NettyChannel#send(Object, boolean) —> NioClientSocketChannel#write(Object)
在MockClusterInvoker是一个抽象类,它的默认实现是FailoverClusterInvoker,在MockClusterInvoker中,经过服务目录Directory列举服务列表,核心方法invoke以下:负载均衡
public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); LoadBalance loadbalance = null; //列举服务列表 List<Invoker<T>> invokers = list(invocation); if (invokers != null && !invokers.isEmpty()) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); }
服务目录Directory已知的实现有RegistryDirectory和StaticDirectiry,默认使用的是RegistryDirectory,在进行服务调用的时候会从这里面去获取可用的服务列表,若是想要了解更多,推荐阅读dubbo官网服务列表一章,里面有很是详细的介绍。
能够看到,获取服务列表以后会从系统扩展中加载默认的负载均衡实现,而后继续往下执行到子类FailoverClusterInvoker的模板方法doInvoke,该方法会从新执行服务列举并检查服务的可用性,以后经过负载均衡策略选择具体服务,dubbo默认负载均衡策略是随机RandomLoadBalance。关键代码以下:框架
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyinvokers = invokers; // 服务检查 checkInvokers(copyinvokers, invocation); int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); for (int i = 0; i < len; i++) { //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. if (i > 0) { // i>0,说明第一次服务调用失败,须要从新检查服务列表 checkWhetherDestroyed(); //若是服务已经销毁,抛出异常 copyinvokers = list(invocation);//从新列举服务 // check again checkInvokers(copyinvokers, invocation);//检查服务是否为空 } //负载均衡获取服务 Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); invoked.add(invoker);//将获取的服务添加到已执行列表 RpcContext.getContext().setInvokers((List) invoked); try { //服务请求 Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { } // 异常处理部分省略。。。 } }
经过负载均衡获取到具体服务后,执行服务调用,到AbstractInvoker的invoke方法,主要设置一些attachment的信息。重点来看看实现类DubboInvoke的doInvoke方法,以下:dom
@Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); //设置路径和版本 inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); //ExchangeClient ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { //从配置中获取是否同步执行 boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); //是否单向执行 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); //请求超时时间 int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); if (isOneway) { //若是单向执行,发起调用后当即返回一个空RpcResult boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { //若是异步,调用后当即返回一个空的RpcResult ResponseFuture future = currentClient.request(inv, timeout); RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } else { //不然,同步等待 RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }
ExchangeClient接口的实现是HeaderExchangeClient,request方法很简单,以下异步
public ResponseFuture request(Object request, int timeout) throws RemotingException { return channel.request(request, timeout); }
就是作了一次请求转发,channel是ExchangeChannel,默认实现是HeaderExchangeChannel,HeaderExchangeChannel的request方法以下:ide
public ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } // create request. Request req = new Request(); req.setVersion("2.0.0");//版本号 req.setTwoWay(true);//双向通讯 req.setData(request);//请求数据 DefaultFuture future = new DefaultFuture(channel, req, timeout);//Future try { channel.send(req);//Dubbo封装的通讯Channel } catch (RemotingException e) { future.cancel(); throw e; } return future; }
这里就是构造Request请求体,而后传递给NettyChannel,NettyChannel封装了netty的channel,经过该channel将数据请求写入到TCP请求中传递给服务端。NettyChannel的send方法以下oop
public void send(Object message, boolean sent) throws RemotingException { super.send(message, sent); boolean success = true; int timeout = 0; try { //数据请求 ChannelFuture future = channel.write(message);//此处的channel才是org.jboss.netty.channel.Channel if (sent) { //等待请求结果 timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); success = future.await(timeout); } Throwable cause = future.getCause(); if (cause != null) { throw cause; } } catch (Throwable e) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e); } if (!success) { throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + "in timeout(" + timeout + "ms) limit"); } }
到这里,关于dubbo消费请求的基本流程已经走完,继续往下就是netty层面的东西了,有兴趣的童鞋能够自行寻找netty相关教程学习