目标:介绍远程调用中协议的设计和实现,介绍dubbo-rpc-api中的各类protocol包的源码,是重点内容。java
在远程调用中协议是很是重要的一层,看下面这张图:git
该层是在信息交换层之上,分为了而且夹杂在服务暴露和服务引用中间,为了有一个约定的方式进行调用。github
dubbo支持不一样协议的扩展,好比http、thrift等等,具体的能够参照官方文档。本文讲解的源码大部分是对于公共方法的实现,而具体的服务暴露和服务引用会在各个协议实现中讲到。api
下面是该包下面的类图:app
该类是协议的抽象类,实现了Protocol接口,其中实现了一些公共的方法,抽象方法在它的子类AbstractProxyProtocol中定义。异步
/** * 服务暴露者集合 */
protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();
/** * 服务引用者集合 */
//TODO SOFEREFENCE
protected final Set<Invoker<?>> invokers = new ConcurrentHashSet<Invoker<?>>();
复制代码
protected static String serviceKey(URL url) {
// 得到绑定的端口号
int port = url.getParameter(Constants.BIND_PORT_KEY, url.getPort());
return serviceKey(port, url.getPath(), url.getParameter(Constants.VERSION_KEY),
url.getParameter(Constants.GROUP_KEY));
}
protected static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) {
return ProtocolUtils.serviceKey(port, serviceName, serviceVersion, serviceGroup);
}
复制代码
该方法是为了获得服务key group+"/"+serviceName+":"+serviceVersion+":"+portide
@Override
public void destroy() {
// 遍历服务引用实体
for (Invoker<?> invoker : invokers) {
if (invoker != null) {
// 从集合中移除
invokers.remove(invoker);
try {
if (logger.isInfoEnabled()) {
logger.info("Destroy reference: " + invoker.getUrl());
}
// 销毁
invoker.destroy();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
// 遍历服务暴露者
for (String key : new ArrayList<String>(exporterMap.keySet())) {
// 从集合中移除
Exporter<?> exporter = exporterMap.remove(key);
if (exporter != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Unexport service: " + exporter.getInvoker().getUrl());
}
// 取消暴露
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
}
复制代码
该方法是对invoker和exporter的销毁。源码分析
该类继承了AbstractProtocol类,其中利用了代理工厂对AbstractProtocol中的两个集合进行了填充,而且对异常作了处理。ui
/** * rpc的异常类集合 */
private final List<Class<?>> rpcExceptions = new CopyOnWriteArrayList<Class<?>>();
/** * 代理工厂 */
private ProxyFactory proxyFactory;
复制代码
@Override
@SuppressWarnings("unchecked")
public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
// 得到uri
final String uri = serviceKey(invoker.getUrl());
// 得到服务暴露者
Exporter<T> exporter = (Exporter<T>) exporterMap.get(uri);
if (exporter != null) {
return exporter;
}
// 新建一个线程
final Runnable runnable = doExport(proxyFactory.getProxy(invoker, true), invoker.getInterface(), invoker.getUrl());
exporter = new AbstractExporter<T>(invoker) {
/** * 取消暴露 */
@Override
public void unexport() {
super.unexport();
// 移除该key对应的服务暴露者
exporterMap.remove(uri);
if (runnable != null) {
try {
// 启动线程
runnable.run();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
};
// 加入集合
exporterMap.put(uri, exporter);
return exporter;
}
复制代码
其中分为两个步骤,建立一个exporter,放入到集合汇中。在建立exporter时对unexport方法进行了重写。this
@Override
public <T> Invoker<T> refer(final Class<T> type, final URL url) throws RpcException {
// 经过代理得到实体域
final Invoker<T> target = proxyFactory.getInvoker(doRefer(type, url), type, url);
Invoker<T> invoker = new AbstractInvoker<T>(type, url) {
@Override
protected Result doInvoke(Invocation invocation) throws Throwable {
try {
// 得到调用结果
Result result = target.invoke(invocation);
Throwable e = result.getException();
// 若是抛出异常,则抛出相应异常
if (e != null) {
for (Class<?> rpcException : rpcExceptions) {
if (rpcException.isAssignableFrom(e.getClass())) {
throw getRpcException(type, url, invocation, e);
}
}
}
return result;
} catch (RpcException e) {
// 抛出未知异常
if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) {
e.setCode(getErrorCode(e.getCause()));
}
throw e;
} catch (Throwable e) {
throw getRpcException(type, url, invocation, e);
}
}
};
// 加入集合
invokers.add(invoker);
return invoker;
}
复制代码
该方法是服务引用,先从代理工厂中得到Invoker对象target,而后建立了真实的invoker在重写方法中调用代理的方法,最后加入到集合。
protected abstract <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException;
protected abstract <T> T doRefer(Class<T> type, URL url) throws RpcException;
复制代码
能够看到其中抽象了服务引用和暴露的方法,让各种协议各自实现。
该类是invoker的抽象方法,由于协议被夹在服务引用和服务暴露中间,不管什么协议都有一些通用的Invoker和exporter的方法实现,而该类就是实现了Invoker的公共方法,而把doInvoke抽象出来,让子类只关注这个方法。
/** * 服务类型 */
private final Class<T> type;
/** * url对象 */
private final URL url;
/** * 附加值 */
private final Map<String, String> attachment;
/** * 是否可用 */
private volatile boolean available = true;
/** * 是否销毁 */
private AtomicBoolean destroyed = new AtomicBoolean(false);
复制代码
private static Map<String, String> convertAttachment(URL url, String[] keys) {
if (keys == null || keys.length == 0) {
return null;
}
Map<String, String> attachment = new HashMap<String, String>();
// 遍历key,把值放入附加值集合中
for (String key : keys) {
String value = url.getParameter(key);
if (value != null && value.length() > 0) {
attachment.put(key, value);
}
}
return attachment;
}
复制代码
该方法是转化为附加值,把url中的值转化为服务调用invoker的附加值。
@Override
public Result invoke(Invocation inv) throws RpcException {
// if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed
// 若是服务引用销毁,则打印告警日志,可是经过
if (destroyed.get()) {
logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
+ ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
}
RpcInvocation invocation = (RpcInvocation) inv;
// 会话域中加入该调用链
invocation.setInvoker(this);
// 把附加值放入会话域
if (attachment != null && attachment.size() > 0) {
invocation.addAttachmentsIfAbsent(attachment);
}
// 把上下文的附加值放入会话域
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
/** * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here, * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information). */
invocation.addAttachments(contextAttachments);
}
// 若是开启的是异步调用,则把该设置也放入附加值
if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
}
// 加入编号
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
try {
// 执行调用链
return doInvoke(invocation);
} catch (InvocationTargetException e) { // biz exception
Throwable te = e.getTargetException();
if (te == null) {
return new RpcResult(e);
} else {
if (te instanceof RpcException) {
((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
}
return new RpcResult(te);
}
} catch (RpcException e) {
if (e.isBiz()) {
return new RpcResult(e);
} else {
throw e;
}
} catch (Throwable e) {
return new RpcResult(e);
}
}
复制代码
该方法作了一些公共的操做,好比服务引用销毁的检测,加入附加值,加入调用链实体域到会话域中等。而后执行了doInvoke抽象方法。各协议本身去实现。
该类和AbstractInvoker相似,也是在服务暴露中实现了一些公共方法。
/** * 实体域 */
private final Invoker<T> invoker;
/** * 是否取消暴露服务 */
private volatile boolean unexported = false;
复制代码
@Override
public void unexport() {
// 若是已经消取消暴露,则之间返回
if (unexported) {
return;
}
// 设置为true
unexported = true;
// 销毁该实体域
getInvoker().destroy();
}
复制代码
该类是Invoker的包装类,其中用到类装饰模式,不过并无实现实际的功能加强。
public class InvokerWrapper<T> implements Invoker<T> {
/** * invoker对象 */
private final Invoker<T> invoker;
private final URL url;
public InvokerWrapper(Invoker<T> invoker, URL url) {
this.invoker = invoker;
this.url = url;
}
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return url;
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
return invoker.invoke(invocation);
}
@Override
public void destroy() {
invoker.destroy();
}
}
复制代码
该类实现了Protocol接口,其中也用到了装饰模式,是对Protocol的装饰,是在服务引用和暴露的方法上加上了过滤器功能。
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
// 得到过滤器的全部扩展实现类实例集合
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
// 从最后一个过滤器开始循环,建立一个带有过滤器链的invoker对象
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
// 记录last的invoker
final Invoker<T> next = last;
// 新建last
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
/** * 关键在这里,调用下一个filter表明的invoker,把每个过滤器串起来 * @param invocation * @return * @throws RpcException */
@Override
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
复制代码
该方法就是建立带 Filter 链的 Invoker 对象。倒序的把每个过滤器串连起来,造成一个invoker。
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 若是是注册中心,则直接暴露服务
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
// 服务提供侧暴露服务
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
复制代码
该方法是在服务暴露上作了过滤器链的加强,也就是加上了过滤器。
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 若是是注册中心,则直接引用
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
// 消费者侧引用服务
return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}
复制代码
该方法是在服务引用上作了过滤器链的加强,也就是加上了过滤器。
该类也实现了Protocol,也是装饰了Protocol接口,可是它是在服务引用和暴露过程当中加上了监听器的功能。
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 若是是注册中心,则暴露该invoker
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
// 建立一个暴露者监听器包装类对象
return new ListenerExporterWrapper<T>(protocol.export(invoker),
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
}
复制代码
该方法是在服务暴露上作了监听器功能的加强,也就是加上了监听器。
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 若是是注册中心。则直接引用服务
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
// 建立引用服务监听器包装类对象
return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
Collections.unmodifiableList(
ExtensionLoader.getExtensionLoader(InvokerListener.class)
.getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
}
复制代码
该方法是在服务引用上作了监听器功能的加强,也就是加上了监听器。
该部分相关的源码解析地址:github.com/CrazyHZM/in…
该文章讲解了远程调用中关于协议的部分,其实就是讲了一些公共的方法,而且把关键方法抽象出来让子类实现,具体的方法实现都在各个协议中本身实现。接下来我将开始对rpc模块的代理进行讲解。