// 代理工厂建立invoker,服务代理,经过该代理进行远程调用 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL .addParameterAndEncoded( Constants.EXPORT_KEY, url.toFullString())); // 暴露远程服务 Exporter<?> exporter = protocol.export(invoker); exporters.add(exporter);
接着上一篇,咱们继续看发布过程,前面咱们已经看过了经过代理工程建立invoker,接着咱们就看一下远程服务的暴露,这里能够看到经过protocol建立一个exporter而后将其存入list中。远程服务暴露的实现主要在dubbo-rpc模块下,根据不一样的协议有分为不一样的子工程。不一样的协议对Protocol接口都有其对应实现类。其中有一些服务暴露接口并无具体的实现。java
Protocol接口中有4个方法定义getDefaultPort()获取端口、export()暴露服务、refer()引用服务、destroy()释放协议,从建立到销毁可见Protocol维护了远程调用Invoker的整个生命周期。bootstrap
@SPI("dubbo") public interface Protocol { /** * 获取缺省端口,当用户没有配置端口时使用。 * * @return 缺省端口 */ int getDefaultPort(); /** * 暴露远程服务:<br> * 1. 协议在接收请求时,应记录请求来源方地址信息:RpcContext.getContext().setRemoteAddress();<br> * 2. export()必须是幂等的,也就是暴露同一个URL的Invoker两次,和暴露一次没有区别。<br> * 3. export()传入的Invoker由框架实现并传入,协议不须要关心。<br> * * @param <T> 服务的类型 * @param invoker 服务的执行体 * @return exporter 暴露服务的引用,用于取消暴露 * @throws RpcException 当暴露服务出错时抛出,好比端口已占用 */ @Adaptive <T> Exporter<T> export(Invoker<T> invoker) throws RpcException; /** * 引用远程服务:<br> * 1. 当用户调用refer()所返回的Invoker对象的invoke()方法时,协议需相应执行同URL远端export()传入的Invoker对象的invoke()方法。<br> * 2. refer()返回的Invoker由协议实现,协议一般须要在此Invoker中发送远程请求。<br> * 3. 当url中有设置check=false时,链接失败不能抛出异常,并内部自动恢复。<br> * * @param <T> 服务的类型 * @param type 服务的类型 * @param url 远程服务的URL地址 * @return invoker 服务的本地代理 * @throws RpcException 当链接服务提供方失败时抛出 */ @Adaptive <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException; /** * 释放协议:<br> * 1. 取消该协议全部已经暴露和引用的服务。<br> * 2. 释放协议所占用的全部资源,好比链接和端口。<br> * 3. 协议在释放后,依然能暴露和引用新的服务。<br> */ void destroy(); }
Protocol在执行时会先执行wrapper的监听器和过滤器,这里执行顺序是这样的缓存
ProtocolListenerWrapper.export()-->ProtocolFilterWrapper.export()-->RegistryProtocol.export()在这里完成服务注册到zookeeper上面。app
这里咱们重点看一下dubboProtocol中export()方法的实现,在该方法中主要是建立了DubboExporter(包含一个invoker)而后把exporter存入map中,而后是打开server的过程。框架
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. //拼接key:group1/com.alibaba.dubbo.demo.DemoService:1.0(version):2880(port) String key = serviceKey(url); //建立exporter,父类中定义的map<String,Exporter> exporterMap DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter);//exporter存入map中 //export an stub service for dispaching event Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice){//是否作本地缓存,这里将stubServiceMethods单独存储 String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){ if (logger.isWarnEnabled()){ logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } //打开服务链接 openServer(url); return exporter; } private void openServer(URL url) { // find server. String key = url.getAddress(); //client 也能够暴露一个只有server能够调用的服务。 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true); if (isServer) { ExchangeServer server = serverMap.get(key);//ExchangeServer存储在map中,Map<key,交换层服务server>。 if (server == null) { serverMap.put(key, createServer(url)); } else { //server支持reset,配合override功能使用 server.reset(url); } } } //建立信息交换层服务server private ExchangeServer createServer(URL url) { //默认开启server关闭时发送readonly事件 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());//url添加channel.readonly.sent属性true,返回的是一个新url对象 //默认开启heartbeat url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));//url添加heartbeat属性60000,返回一个新url String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);//获取server属性,默认为netty if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url); //设置编码解码协议,默认dubbo url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); ExchangeServer server; try { //绑定url和处理器,返回一个信息交换层HeaderExchangeServer,该信息交换层server里面有一个NettyServer, server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; }
打开server过程:HeaderExchangeServer(交换层服务) --> NettyServer(传输层服务) ,最终跟踪到NettyServer中咱们能够看到调用netty打开链接的方法。ide
protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); //ChannelFactory 是一个建立和管理Channel通道及其相关资源的工厂接口 ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); // https://issues.jboss.org/browse/NETTY-365 // https://issues.jboss.org/browse/NETTY-379 // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true)); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); /*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/ pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } }); // bind channel = bootstrap.bind(getBindAddress()); }
至此为止,咱们已经将发布暴露服务的过程看完再看就要去看netty中具体的NIO实现了,有兴趣能够看看netty的源码。this