从图中能够看出该模块下的类主要是实现了dubbo-rpc-api和dubbo-remoting-api两个模块中定义的一些接口和抽象类。扩展了一种duubo框架自定义的dubbo协议,包括编解码和方法调用处理等。java
该类是抽象协议实现类AbstractProtocol的具体的dubbo协议的实现,从该类开始着手分析。api
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. 经过url得到该服务的key。格式如:{serviceGroup}/{serviceName}:{serviceVersion}:{port} String key = serviceKey(url); //Dubbo协议实现的服务发布器。 DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); //export an stub service for dispaching event //参数STUB_EVENT_KEY和IS_CALLBACK_SERVICE的含义不太清楚,须要后续深究。 Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice){ String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){ if (logger.isWarnEnabled()){ logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } //调用打开服务器绑定url的方法,这个地方是核心,须要进入深究。 openServer(url); return exporter; }
该方法实现了dubbo协议的服务发布,显示构造一个DubboExporter实现类的Exporter,用于返回。最核心的是调用内部方法openServer(url);将该url发布到dubbo服务器上。咱们进入该方法看看。缓存
private void openServer(URL url) { // find server. String key = url.getAddress(); //client 也能够暴露一个只有server能够调用的服务。 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true); if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null) { serverMap.put(key, createServer(url)); } else { //server支持reset,配合override功能使用 server.reset(url); } } }
该方法是得到url的地址,经过地址找到对应的server,若已经有相同的地址则无需构造新的server,只须要直接使用,只就起到了缓存server的做用,避免重复构建server。若已经找到了该地址,则会调用server.reset(url)重置一下。url中的参数Constants.IS_SERVER_KEY参数能够禁止发布远程服务,只能本地调用。具体意义不是十分清楚。继续进入方法:createServer(url)服务器
private ExchangeServer createServer(URL url) { //默认开启server关闭时发送readonly事件 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); //默认开启heartbeat url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url); url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); ExchangeServer server; try { server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; }
该方法先增长了一些默认的参数,好比heartbeat、server等。检查参数的合法性。最后调用Exchangers.bind(url, requestHandler)将url绑定到requestHandler上得到绑定的服务器,Exchangers是网络通信模块dubbo-remoting-api中定义的,详细的含义,等咱们分析该模块再了解。咱们猜想该方法的含义是绑定url的处理器为requestHandler,并返回服务器。requestHandler就是如何处理接收的请求,这个地方是核心,咱们进入该对象的定义看看。网络
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); //若是是callback 须要处理高版本调用低版本的问题 if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){ String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || methodsStr.indexOf(",") == -1){ hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods){ if (inv.getMethodName().equals(method)){ hasMethod = true; break; } } } if (!hasMethod){ logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv ); return null; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); return invoker.invoke(inv); } throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } @Override public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Invocation) { reply((ExchangeChannel) channel, message); } else { super.received(channel, message); } } @Override public void connected(Channel channel) throws RemotingException { invoke(channel, Constants.ON_CONNECT_KEY); } @Override public void disconnected(Channel channel) throws RemotingException { if(logger.isInfoEnabled()){ logger.info("disconected from "+ channel.getRemoteAddress() + ",url:" + channel.getUrl()); } invoke(channel, Constants.ON_DISCONNECT_KEY); } private void invoke(Channel channel, String methodKey) { Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey); if (invocation != null) { try { received(channel, invocation); } catch (Throwable t) { logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t); } } } private Invocation createInvocation(Channel channel, URL url, String methodKey) { String method = url.getParameter(methodKey); if (method == null || method.length() == 0) { return null; } RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]); invocation.setAttachment(Constants.PATH_KEY, url.getPath()); invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY)); invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY)); invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY)); if (url.getParameter(Constants.STUB_EVENT_KEY, false)){ invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString()); } return invocation; } };
该对象是一个匿名类对象,实现了接口ExchangeHandler,应该就是一个远程通信的抽象,是一个通信处理类,处理接收到信息。其中方法reply是响应客户端请求信息,它根据Invocation对象得到invoker,最后再调用invoker.invoke方法执行目标对象的方法,将返回结果发回给客户端。其它的几个事件的方法也作了响应的处理,包括:received、connected和disconnected等事件。框架
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { // create rpc invoker. DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; } private ExchangeClient[] getClients(URL url){ //是否共享链接 boolean service_share_connect = false; int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); //若是connections不配置,则共享链接,不然每服务每链接 if (connections == 0){ service_share_connect = true; connections = 1; } ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { if (service_share_connect){ clients[i] = getSharedClient(url); } else { clients[i] = initClient(url); } } return clients; } /** *获取共享链接 */ private ExchangeClient getSharedClient(URL url){ String key = url.getAddress(); ReferenceCountExchangeClient client = referenceClientMap.get(key); if ( client != null ){ if ( !client.isClosed()){ client.incrementAndGetCount(); return client; } else { // logger.warn(new IllegalStateException("client is closed,but stay in clientmap .client :"+ client)); referenceClientMap.remove(key); } } ExchangeClient exchagneclient = initClient(url); client = new ReferenceCountExchangeClient(exchagneclient, ghostClientMap); referenceClientMap.put(key, client); ghostClientMap.remove(key); return client; } /** * 建立新链接. */ private ExchangeClient initClient(URL url) { // client type setting. String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT)); String version = url.getParameter(Constants.DUBBO_VERSION_KEY); boolean compatible = (version != null && version.startsWith("1.0.")); url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() && compatible ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); //默认开启heartbeat url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // BIO存在严重性能问题,暂时不容许使用 if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException("Unsupported client type: " + str + "," + " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " ")); } ExchangeClient client ; try { //设置链接应该是lazy的 if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)){ client = new LazyConnectExchangeClient(url ,requestHandler); } else { client = Exchangers.connect(url ,requestHandler); } } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e); } return client; }
该方法先直接构造一个DubboInvoker类型的对象,其中获取客户端的参数调用了方法getClients(url)。看是否配置了参数connections,若未配置或配置为0则表示共享客户端链接,若是不共享则直接建立一个新的客户端对象,不然得到已经共享的链接,而且返回一个建立包装器ReferenceCountExchangeClient的客户端实例,该实例会记录被引用次数,最终的方法仍是调用目标的client对象。异步
初始化一个全新的Client对象的方法是核心,它也是先配置一些默认的参数,若是配置参数lazy则表示延迟建立客户端链接,则直接返回一个LazyConnectExchangeClient对象,该对象也是ExchangeClient的包装器对象,它会在请求的时候先检查链接,若未建立链接则会先建立链接。最后调用client = Exchangers.connect(url ,requestHandler);将url绑定到请求处理器requestHandler上。async
该类是消费者dubbo协议的执行器。它处理了dubbo协议在客户端调用远程接口的逻辑实现。核心方法是doInvoke,咱们重点看看这个方法的实现。ide
@Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout) ; RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } else { RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }
首先客户端支持多个链接调用服务,这个能够经过参数设置,会轮询链接去调用服务。支持三种调用方式,分别是oneway(单向调用)、async(异步)和sync(同步),这个都是能够经过url参数指定,经过客户端对应的方法去调用服务端的服务。性能
能够看出来,咱们有大量的接口和抽象类来自于dubbo-remoting-api模块,咱们的疑问都在这里,接下来咱们研究该模块才能解决咱们的不少疑惑。