该图是通过简化后的rpc-api模块的类图,去除了一些非关键的属性和方法定义,也去除了一些非核心的类和接口,只是一个简化了的的示意图,这样你们可以去除干扰看清楚该模块的核心接口极其关系,请点击看大图更清晰一些。java
服务协议。这是rpc模块中最核心的一个类,它定义了rpc的最主要的两个行为即:一、provider暴露远程服务,即将调用信息发布到服务器上的某个URL上去,能够供消费者链接调用,通常是将某个service类的所有方法总体发布到服务器上。二、consumer引用远程服务,即根据service的服务类和provider发布服务的URL转化为一个Invoker对象,消费者能够经过该对象调用provider发布的远程服务。这其实归纳了rpc的最为核心的职责,提供了多级抽象的实现、包装器实现等。express
Protocol的顶层抽象实现类,它定义了这些属性:一、exporterMap表示发布过的serviceKey和Exporter(远程服务发布的引用)的映射表;二、invokers是一个Invoker对象的集合,表示层级暴露过远程服务的服务执行体对象集合。还提供了一个通用的服务发布销毁方法destroy,该方法是一个通用方法,它清空了两个集合属性,调用了全部invoker的destroy方法,也调用全部exporter对象的unexport方法。apache
继承自AbstractProtoco的一个抽象代理协议类。它聚合了代理工厂ProxyFactory对象来实现服务的暴露和引用。它的源码以下。设计模式
/* * Copyright 1999-2012 Alibaba Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.dubbo.rpc.protocol; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.rpc.Exporter; import com.alibaba.dubbo.rpc.Invocation; import com.alibaba.dubbo.rpc.Invoker; import com.alibaba.dubbo.rpc.ProxyFactory; import com.alibaba.dubbo.rpc.Result; import com.alibaba.dubbo.rpc.RpcException; /** * AbstractProxyProtocol * * @author william.liangf */ public abstract class AbstractProxyProtocol extends AbstractProtocol { private final List<Class<?>> rpcExceptions = new CopyOnWriteArrayList<Class<?>>();; private ProxyFactory proxyFactory; public AbstractProxyProtocol() { } public AbstractProxyProtocol(Class<?>... exceptions) { for (Class<?> exception : exceptions) { addRpcException(exception); } } public void addRpcException(Class<?> exception) { this.rpcExceptions.add(exception); } public void setProxyFactory(ProxyFactory proxyFactory) { this.proxyFactory = proxyFactory; } public ProxyFactory getProxyFactory() { return proxyFactory; } @SuppressWarnings("unchecked") public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException { final String uri = serviceKey(invoker.getUrl());//得到Url对应的serviceKey值。 Exporter<T> exporter = (Exporter<T>) exporterMap.get(uri);//根据url获取对应的exporter。 if (exporter != null) {//若是已经存在,则直接返回,实现接口支持幂等调用。该处难道无须考虑线程安全问题吗? return exporter; } //执行抽放方法暴露服务。runnable方法的行为有什么约束没有?该处不明确。 final Runnable runnable = doExport(proxyFactory.getProxy(invoker), invoker.getInterface(), invoker.getUrl()); //调用proxyFactory.getProxy(invoker)来得到invoker的代理对象。 exporter = new AbstractExporter<T>(invoker) { public void unexport() { super.unexport(); exporterMap.remove(uri); if (runnable != null) { try { runnable.run(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } } }; exporterMap.put(uri, exporter); return exporter; } public <T> Invoker<T> refer(final Class<T> type, final URL url) throws RpcException { //先调用doRefer得到服务服务对象,再调用proxyFactory.getInvoker得到invoker对象。 final Invoker<T> tagert = proxyFactory.getInvoker(doRefer(type, url), type, url); Invoker<T> invoker = new AbstractInvoker<T>(type, url) { @Override protected Result doInvoke(Invocation invocation) throws Throwable { try { Result result = tagert.invoke(invocation); Throwable e = result.getException(); if (e != null) { for (Class<?> rpcException : rpcExceptions) { if (rpcException.isAssignableFrom(e.getClass())) { throw getRpcException(type, url, invocation, e); } } } return result; } catch (RpcException e) { if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) { e.setCode(getErrorCode(e.getCause())); } throw e; } catch (Throwable e) { throw getRpcException(type, url, invocation, e); } } }; invokers.add(invoker); return invoker; } protected RpcException getRpcException(Class<?> type, URL url, Invocation invocation, Throwable e) { RpcException re = new RpcException("Failed to invoke remote service: " + type + ", method: " + invocation.getMethodName() + ", cause: " + e.getMessage(), e); re.setCode(getErrorCode(e)); return re; } protected int getErrorCode(Throwable e) { return RpcException.UNKNOWN_EXCEPTION; } /** **留给子类实现的真正将类发布到URL上的抽象方法定义,由具体的协议来实现。 **/ protected abstract <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException; /** **留给子类实现的引用远程服务的抽象方法定义,该方法是将URL和type接口类应用到一个能够远程调用代理对象。 **/ protected abstract <T> T doRefer(Class<T> type, URL url) throws RpcException; }
是一个Protocol的支持过滤器的装饰器。经过该装饰器的对原始对象的包装使得Protocol支持可扩展的过滤器链,已经支持的包括ExceptionFilter、ExecuteLimitFilter和TimeoutFilter等多种支持不一样特性的过滤器。api
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; //经过该句得到扩展配置的过滤器列表,具体机制须要研究该类的实现。 List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (filters.size() > 0) { //循环将过滤器列表组装成为过滤器链,目标invoker是最后一个执行的。 for (int i = filters.size() - 1; i >= 0; i --) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { public Class<T> getInterface() { return invoker.getInterface(); } public URL getUrl() { return invoker.getUrl(); } public boolean isAvailable() { return invoker.isAvailable(); } public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; }
一个支持监听器特性的Protocal的包装器。支持两种监听器的功能扩展,分别是:ExporterListener是远程服务发布监听器,能够监听服务发布和取消发布两个事件点;InvokerListener是服务消费者引用调用器的监听器,能够监听引用和销毁两个事件方法。支持可扩展的事件监听模型,目前只提供了一些适配器InvokerListenerAdapter、ExporterListenerAdapter以及简单的过时服务调用监听器DeprecatedInvokerListener。开发者可自行扩展本身的监听器。该类源码以下。安全
/* * Copyright 1999-2011 Alibaba Group. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.dubbo.rpc.protocol; import java.util.Collections; import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.common.extension.ExtensionLoader; import com.alibaba.dubbo.rpc.Exporter; import com.alibaba.dubbo.rpc.ExporterListener; import com.alibaba.dubbo.rpc.Invoker; import com.alibaba.dubbo.rpc.InvokerListener; import com.alibaba.dubbo.rpc.Protocol; import com.alibaba.dubbo.rpc.RpcException; import com.alibaba.dubbo.rpc.listener.ListenerExporterWrapper; import com.alibaba.dubbo.rpc.listener.ListenerInvokerWrapper; /** * ListenerProtocol * * @author william.liangf */ public class ProtocolListenerWrapper implements Protocol { private final Protocol protocol; public ProtocolListenerWrapper(Protocol protocol){ if (protocol == null) { throw new IllegalArgumentException("protocol == null"); } this.protocol = protocol; } public int getDefaultPort() { return protocol.getDefaultPort(); } public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { //特殊协议,跳过监听器触发。 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } //调用原始协议的发布方法,触发监听器链事件。 return new ListenerExporterWrapper<T>(protocol.export(invoker), Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class) .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY))); } public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } return new ListenerInvokerWrapper<T>(protocol.refer(type, url), Collections.unmodifiableList( ExtensionLoader.getExtensionLoader(InvokerListener.class) .getActivateExtension(url, Constants.INVOKER_LISTENER_KEY))); } public void destroy() { protocol.destroy(); } }
dubbo的代理工厂。定义了两个接口分别是:getProxy根据invoker目标接口的代理对象,通常是消费者得到代理对象触发远程调用;getInvoker方法将代理对象proxy、接口类type和远程服务的URL获取执行对象Invoker,每每是提供者得到目标执行对象执行目标实现调用。AbstractProxyFactory是其抽象实现,提供了getProxy的模版方法实现,使得能够支持多接口的映射。dubbo最终内置了两种动态代理的实现,分别是jdkproxy和javassist。默认的实现使用javassist。为何选择javassist,梁飞选型的时候作过性能测试对比分析,参考:http://javatar.iteye.com/blog/814426/服务器
该接口是服务的执行体。它有获取服务发布的URL,服务的接口类等关键属性的行为;还有核心的服务执行方法invoke,执行该方法后返回执行结果Result,而传递的参数是调用信息Invocation。该接口有大量的抽象和具体实现类。AbstractProxyInvoker是基于代理的执行器抽象实现,AbstractInvoker是通用的抽象实现。app
首先ServiceConfig类拿到对外提供服务的实际类ref(如:HelloWorldImpl),而后经过ProxyFactory类的getInvoker方法使用ref生成一个AbstractProxyInvoker实例,到这一步就完成具体服务到Invoker的转化。接下来就是Invoker转换到Exporter的过程。
Dubbo处理服务暴露的关键就在Invoker转换到Exporter的过程(如上图中的红色部分),下面咱们以Dubbo和RMI这两种典型协议的实现来进行说明:less
上图是服务消费的主过程:
首先ReferenceConfig类的init方法调用Protocol的refer方法生成Invoker实例(如上图中的红色部分),这是服务消费的关键。接下来把Invoker转换为客户端须要的接口(如:HelloWorld)。
关于每种协议如RMI/Dubbo/Web service等它们在调用refer方法生成Invoker实例的细节和上一章节所描述的相似。ide
该模块下设计较为复杂,在设计中能够看出来应用了大量的设计模式,包括模版方法、职责链、装饰器和动态代理等设计模式。掌握该模块下的核心概念对于后续阅读其它部分代码相当重要,后面的其它模块要么是它的实现,要么是由它衍生出来的,要么与它的关系很是紧密。
接下来咱们看看rpc的默认实现模块——dubbo-rpc-default。该模块提供了默认的dubbo协议的实现,也是默认使用的协议。