Dubbo 服务暴露过程是经过 com.alibaba.dubbo.config.spring.ServiceBean
来实现的。Spring 容器 refresh() 完成后,会发送 ContextRefreshedEvent,ServiceBean 会接收到这个 event 而后调用 export()。html
Dubbo 服务暴露过程:spring
apache
经过 Protocol 调用 export() Exporter<?> exporter = protocol.export(com.alibaba.dubbo.rpc.Invoker<T> invoker); 默认会使用 DubboProtocol 作服务暴露,过程当中会启动 Netty Server 监听端口。bootstrap
// JavassistProxyFactory.class public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper cannot handle this scenario correctly: the classname contains '$' final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { // 经过 wrapper 类去调用服务提供者的真实方法。(避免使用反射,提升效率) return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
经过以前 Dubbo Protocol & Filter 的学习,咱们知道这里的 protocol 是一个 Wrappered Protocol,因此 protocol.export() 方法会先调用 Protocol SPI 扩展中的 wrapper 类的 export() 。 其中,ProtocolFilterWrapper#export(Invoker<T> invoker) 代码以下:缓存
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // 若是 URL 的 protocol 是注册协议的话,就执行服务注册流程 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } // 服务暴露过程当中,会添加 group="provider" 的 Filter return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); }
经过在上述 export() 方法上打断点跟踪,咱们能够发现,Dubbo 首先执行的是服务注册,在服务注册过程当中,会再次调用 protocol.export(invokerDelegete) 来作服务本地暴露。app
RegistryProtocol#export(Invoker<T> originInvoker) 代码以下:socket
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //export invoker // 会再次调用 protocol.export(invokerDelegete) 来作服务本地暴露 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); URL registryUrl = getRegistryUrl(originInvoker); //registry provider final Registry registry = getRegistry(originInvoker); final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); //to judge to delay publish whether or not boolean register = registedProviderUrl.getParameter("register", true); ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl); if (register) { // 服务注册 register(registryUrl, registedProviderUrl); ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); } // Subscribe the override data // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover. final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //Ensure that a new exporter instance is returned every time export return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl); }
附: com.alibaba.dubbo.registry.support.AbstractRegistry#AbstractRegistry(URL url) 服务注册缓存文件tcp
服务本地暴露,最终会调用 DubboProtocol#export(Invoker<T> invoker)ide
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. String key = serviceKey(url); // 1. 建立 DubboExporter DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); ...... // 2. 开启服务监听 openServer(url); optimizeSerialization(url); return exporter; }
Dubbo 服务暴露时,首先会建立一个 DubboExporter,而后再经过 netty 开启服务端口监听。 DubboExporter 的做用是缓存 Invoker,方便后续操做获取 Invoker。其中最重要的操做就是: Provider 接收到 Request 请求后,获取到对应的 Invoker,而后执行 Invoker。学习
openServer(url) 最终会调用 createServer(URL url) 来建立 tcp server:
private ExchangeServer createServer(URL url) { // send readonly event when server closes, it's enabled by default url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); // enable heartbeat by default url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); ...... // 设置 codec = "dubbo" url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); ExchangeServer server; try { // 建立 server,并传递 requestHandler server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } ...... return server; }
这里的 requestHandler
很是重要,它是用来处理 consumer 端的 Request,将 Request 转化成 Invoker 调用的。
建立 tcp server 的过程:
Exchangers.bind(url, requestHandler) --> Exchanger$Adaptive#bind(URL url, ExchangeHandler handler) --> HeaderExchanger#bind(URL url, ExchangeHandler handler) --> 返回 new HeaderExchangeServer(Server server)
// HeaderExchanger.class public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }
Transporters.bind(URL url, ChannelHandler... handlers) --> Transporter$Adaptive#bind(URL url, ChannelHandler handler) --> NettyTransporter#bind(URL url, ChannelHandler listener) --> 返回 new NettyServer(url, listener)
编解码最终使用的是:com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec
NettyServer 开启服务监听的代码:
protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); // 这里的 this 所指的 handler 就是 Exchangers.bind(url, requestHandler) 传递的 handler final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); // 编解码的 handler pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); // 业务处理的 handler pipeline.addLast("handler", nettyHandler); return pipeline; } }); // bind channel = bootstrap.bind(getBindAddress()); }
在没有注册中心,直接暴露提供者的状况下 [1],ServiceConfig
解析出的 URL 的格式为:dubbo://service-host/com.foo.FooService?version=1.0.0
。
基于扩展点自适应机制,经过 URL 的 dubbo://
协议头识别,直接调用 DubboProtocol
的 export()
方法,打开服务端口。
在有注册中心,须要注册提供者地址的状况下 [2],ServiceConfig
解析出的 URL 的格式为: registry://registry-host/org.apache.dubbo.registry.RegistryService?export=URL.encode("dubbo://service-host/com.foo.FooService?version=1.0.0")
,
基于扩展点自适应机制,经过 URL 的 registry://
协议头识别,就会调用 RegistryProtocol
的 export()
方法,将 export
参数中的提供者 URL,先注册到注册中心。
再从新传给 Protocol
扩展点进行暴露: dubbo://service-host/com.foo.FooService?version=1.0.0
,而后基于扩展点自适应机制,经过提供者 URL 的 dubbo://
协议头识别,就会调用 DubboProtocol
的 export()
方法,打开服务端口。
上图是服务提供者暴露服务的主过程:
首先 ServiceConfig
类拿到对外提供服务的实际类 ref(如:HelloWorldImpl),而后经过 ProxyFactory
类的 getInvoker
方法使用 ref 生成一个 AbstractProxyInvoker
实例,到这一步就完成具体服务到 Invoker
的转化。接下来就是 Invoker
转换到 Exporter
的过程。
Dubbo 处理服务暴露的关键就在 Invoker
转换到 Exporter
的过程,上图中的红色部分。下面咱们以 Dubbo 和 RMI 这两种典型协议的实现来进行说明:
Dubbo 协议的 Invoker
转为 Exporter
发生在 DubboProtocol
类的 export
方法,它主要是打开 socket 侦听服务,并接收客户端发来的各类请求,通信细节由 Dubbo 本身实现。
RMI 协议的 Invoker
转为 Exporter
发生在 RmiProtocol
类的 export