本例以一个简单典型的服务发布为例,spring配置以下java
//dubbo协议 <dubbo:protocol name="dubbo" port="20880" id="dubbo1"/> //zk注册中心 <dubbo:registry id="hangzhouRegistry" address="zookeeper://192.168.64.128:2181"/> <dubbo:service interface="demo.dubbo.api.DemoService" ref="demoService" protocol="dubbo1" />
//服务接口 public interface DemoService { public String sayHello(String name); } //实现 public class DemoServiceImpl implements DemoService { public String sayHello(String name) { Random random=new Random(); try { Thread.sleep(800* random.nextInt(6)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress()); return "Hello " + name + ", response form provider: " + RpcContext.getContext().getLocalAddress(); } }
有博文 dubbo基于spring的构建分析,能够看到dubbo服务发布注册的启动方法有两个入口,
第一在ServiceBean类的onApplicationEvent()方法,在容器初始化完成后,执行暴露逻辑
第二在ServiceBean类的afterPropertiesSet()方法,当前servicebean属性构造完成后,执行暴露逻辑
具体暴露方法在其父类ServiceConfig的ServiceConfig方法里spring
//这是个同步的方法 public synchronized void export() { if (provider != null) { if (export == null) { export = provider.getExport(); } if (delay == null) { delay = provider.getDelay(); } } if (export != null && !export) { return; } //延迟暴露 经过delay参数配置的,延迟暴露,放入单独线程中。 if (delay != null && delay > 0) { delayExportExecutor.schedule(new Runnable() { public void run() { doExport(); } }, delay, TimeUnit.MILLISECONDS); } else { //暴露方法 doExport(); } }
/*** * 也是个同步的方法,暴露过程具体过程 */ protected synchronized void doExport() { //....属性检查和赋值,代码略 //暴露过程 doExportUrls(); } private void doExportUrls() { //获取注册中心信息 List<URL> registryURLs = loadRegistries(true); for (ProtocolConfig protocolConfig : protocols) { //多个协议,暴露屡次 doExportUrlsFor1Protocol(protocolConfig, registryURLs); } } //暴露过程 private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { //属性的解析和赋值..... //..... 代码略..... //获取服务暴露范围,本地或者远程 String scope = url.getParameter(Constants.SCOPE_KEY); //配置为none不暴露 //不配默认,服务暴露远程同时在本地暴露 if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) { //配置不是remote的状况下作本地暴露 (配置为remote,则表示只暴露远程服务) if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { //本地暴露 (***看这里***)关键1 exportLocal(url); } //若是配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务) if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { if (logger.isInfoEnabled()) { logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url); } if (registryURLs != null && registryURLs.size() > 0 && url.getParameter("register", true)) { //有多个注册中心,暴露到多个注册中心 for (URL registryURL : registryURLs) { //url 添加dynamic 属性值 url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic")); URL monitorUrl = loadMonitor(registryURL); if (monitorUrl != null) { url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString()); } if (logger.isInfoEnabled()) { logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL); } //(***看这里***)关键2 //默认走到JavassistProxyFactory.getInvoker方法 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); //这里的invoker的url的协议是register类型 Exporter<?> exporter = protocol.export(invoker); exporters.add(exporter); } } else { //没有注册中心 ,只在本机ip打开服务端口,生成服务代理,并不注册到注册中心。 //(***看这里***)关键3 此处的url协议为dubbo Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url); Exporter<?> exporter = protocol.export(invoker); exporters.add(exporter); } } } this.urls.add(url); }
上面方法中有三个地方涉及到具体的服务暴露先看,关键1bootstrap
private void exportLocal(URL url) { if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { URL local = URL.valueOf(url.toFullString()) .setProtocol(Constants.LOCAL_PROTOCOL)//设置为injvm 协议 .setHost(NetUtils.LOCALHOST) .setPort(0); //这里的protocol是Protocol$Adpative的实例(spi机制) //proxyFactory是ProxyFactory$Adpative实例(spi机制) Exporter<?> exporter = protocol.export( proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); //放到暴露列表 exporters.add(exporter); logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry"); } }
ProxyFactory$Adpative的getInvoker方法api
public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException { if (arg2 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg2; String extName = url.getParameter("proxy", "javassist"); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])"); com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName); //这里的extension默认是JavassistProxyFactory实例, return extension.getInvoker(arg0, arg1, arg2); }
JavassistProxyFactory的getInvoker方法缓存
//proxy 是服务实现类,type是服务接口 public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { //利用Wrapper类经过服务接口生成对应的代理类。 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); //实现抽象类AbstractProxyInvoker抽象方法doInvoke,并调用(proxy, type, url)构造函数实例化匿名类 return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { //这里调用代理类的invokeMethod方法 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
AbstractProxyInvoker的构造方法并发
public AbstractProxyInvoker(T proxy, Class<T> type, URL url) { if (proxy == null) { throw new IllegalArgumentException("proxy == null"); } if (type == null) { throw new IllegalArgumentException("interface == null"); } if (!type.isInstance(proxy)) { throw new IllegalArgumentException(proxy.getClass().getName() + " not implement interface " + type); } this.proxy = proxy; this.type = type; this.url = url;//赋值ur到自身(invoker),这个后面用到 }
AbstractProxyInvoker的invoke方法app
public Result invoke(Invocation invocation) throws RpcException { try { //回调用doInvoke方法,会传入执行实例proxy,方法名和方法参数类型和值 return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments())); } catch (InvocationTargetException e) { return new RpcResult(e.getTargetException()); } catch (Throwable e) { throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e); } }
这里以文章开头DemoService接口为例经过Wrapper.getWrapper返回的类代码,这里须要代码hack框架
package com.alibaba.dubbo.common.bytecode; import com.alibaba.dubbo.common.DemoService; import java.lang.reflect.InvocationTargetException; import java.util.Map; public class Wrapper0 extends Wrapper implements ClassGenerator.DC { public static String[] pns; public static Map pts; public static String[] mns; public static String[] dmns; public static Class[] mts0; public String[] getPropertyNames() { return pns; } public boolean hasProperty(String paramString) { return pts.containsKey(paramString); } public Class getPropertyType(String paramString) { return (Class)pts.get(paramString); } public String[] getMethodNames() { return mns; } public String[] getDeclaredMethodNames() { return dmns; } public void setPropertyValue(Object paramObject1, String paramString, Object paramObject2) { try { DemoService localDemoService = (DemoService)paramObject1; } catch (Throwable localThrowable) { throw new IllegalArgumentException(localThrowable); } throw new NoSuchPropertyException("Not found property \"" + paramString + "\" filed or setter method in class com.alibaba.dubbo.common.DemoService."); } public Object getPropertyValue(Object paramObject, String paramString) { try { DemoService localDemoService = (DemoService)paramObject; } catch (Throwable localThrowable) { throw new IllegalArgumentException(localThrowable); } throw new NoSuchPropertyException("Not found property \"" + paramString + "\" filed or setter method in class com.alibaba.dubbo.common.DemoService."); } //(**看这里,关键方法实现****) public Object invokeMethod(Object paramObject, String paramString, Class[] paramArrayOfClass, Object[] paramArrayOfObject) throws InvocationTargetException { DemoService localDemoService; try { //赋值执行实例,这里是接口实现类,DemoServiceImpl对象 localDemoService = (DemoService)paramObject; } catch (Throwable localThrowable1) { throw new IllegalArgumentException(localThrowable1); } try { //根据传入的要调用的方法名paramString,方法参数值,调用执行实例方法 if (("sayHello".equals(paramString)) || (paramArrayOfClass.length == 1)) return localDemoService.sayHello((String)paramArrayOfObject[0]); } catch (Throwable localThrowable2) { throw new InvocationTargetException(localThrowable2); } throw new NoSuchMethodException("Not found method \"" + paramString + "\" in class com.alibaba.dubbo.common.DemoService."); } }
到这比较清楚了解,具体的代理过程了。dom
第一步上面invoker对象生成好后,接下来就要经过Protocol$Adpative的export方法暴露服务jvm
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null"); com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); //根据上文本地调用这里的protocal协议别设置为injvm //因此这里会走到InjvmProtocol的export方法 com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.export(arg0); }
看下InjvmProtocol的export方法
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { //返回InjvmExporter对象 return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap); }
InjvmExporter构造函数
InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) { super(invoker);//invoker赋给自身 this.key = key; this.exporterMap = exporterMap; //存的形式,serviceKey:自身(exporter) put到map关联起来,这样能够经过servciekey找到exporterMap而后找到invoker exporterMap.put(key, this); }
这里的exporterMap是由InjvmProtocol实例拥有,而InjvmProtocol有时单例的,由于InjvmProtocol类有如下实例和方法:
//静态自身成员变量 private static InjvmProtocol INSTANCE; //构造方法,把自身赋值给INSTANCE对象 public InjvmProtocol() { INSTANCE = this; }
因此exporterMap对象也是单例的。同时这里顺便看下InjvmProtocol的refer方法,本地服务的引用查找也是经过自身exporterMap对象。
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { //把exporterMap对象赋值给InjvmInvoker return new InjvmInvoker<T>(serviceType, url, url.getServiceKey(), exporterMap); } //具体查找过程 public Result doInvoke(Invocation invocation) throws Throwable { //经过exporterMap获取exporter Exporter<?> exporter = InjvmProtocol.getExporter(exporterMap, getUrl()); if (exporter == null) { throw new RpcException("Service [" + key + "] not found."); } RpcContext.getContext().setRemoteAddress(NetUtils.LOCALHOST, 0); return exporter.getInvoker().invoke(invocation); }
以上是本地服务的发布和引用过程
接下来看下本例中会用到的远程服务发布暴露过程,即暴露服务端口并发布服务信息到注册中心。也便是关键2代码处
经过上面本地服务暴露过程分析能够知道,远程服务的态代理生成过程和本地服务代理生成是同样的,惟一区别点是构造invoker是传入的url是registryURL
传入url的不一样,会形成下面这句的执行过程的不一样
Exporter<?> exporter = protocol.export(invoker);
这里protocol经过spi走的是Protocol$Adpative的export方法:
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null"); com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); //因为传入的url是registryURL因此会走RegistryProtocol的export方法 com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.export(arg0); }
RegistryProtocol的export方法以下
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { //export invoker 暴露invoker (***看doLocalExport方法**) final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); //registry provider 获取对应注册中心操做对象 final Registry registry = getRegistry(originInvoker); //获取要注册到注册中心的地址 final URL registedProviderUrl = getRegistedProviderUrl(originInvoker); //注册服务url到注册中心(***把服务信息注册到注册中心**) registry.register(registedProviderUrl); // 订阅override数据 // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,由于subscribed以服务名为缓存的key,致使订阅信息覆盖。 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); //保证每次export都返回一个新的exporter实例 //实现一个匿名类实现接口Exporter return new Exporter<T>() { public Invoker<T> getInvoker() { return exporter.getInvoker(); } //取消暴露的过程 public void unexport() { try { exporter.unexport(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { //取消注册 registry.unregister(registedProviderUrl); } catch (Throwable t) { logger.warn(t.getMessage(), t); } try { //取消订阅 overrideListeners.remove(overrideSubscribeUrl); registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } }; }
doLocalExport方法,这里方法里涉及过程比较多
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) { //经过原始originInvoker构造缓存key String key = getCacheKey(originInvoker); ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); //缓存没有,走具体暴露逻辑 if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { //InvokerDelegete是RegistryProtocol类的静态内部类,继承自InvokerWrapper, //经过构造器赋值持有代理originInvoker和服务暴露协议url对象,算是包装一层 //而url 是经过getProviderUrl(originInvoker)返回的,此时url的协议已经是dubbo,即服务暴露的协议 final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); //ExporterChangeableWrapper是RegistryProtocol的私有内部类实现了Exporter接口。 //经过调用它的构造方法(Exporter<T> exporter, Invoker<T> originInvoker)构造exporterWrapper实例 //而这里传入的exporter是经过(Exporter<T>) protocol.export(invokerDelegete)语句建立 //由上一步知道,这里的invokerDelegete里url属性的protocol协议已是dubbo //下面具体看下protocol.export(invokerDelegete)方法。 exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return exporter; }
还对Protocol$Adpative类的export逻辑分析
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null"); com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); //因为这里invokerDelegete的url属性的protocol是dubbo,因此这里走DubboProtocol的export协议 com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.export(arg0); }
接着看下DubboProtocol的export方法
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // get serviceKey. String key = serviceKey(url);//key的组成group/service:version:port //构造服务的exporter //如同InjvmProtocol同样,DubboProtocol也是单例的 因此这里exporterMap也是单例的 DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); //经过key放入exporterMap,把持有invoker的exporter 和serviceKey关联 //这个在后面服务调用时,能够经过key找到对应的exporter进而找到invoker提供服务 exporterMap.put(key, exporter); //export an stub service for dispaching event Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } else { stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods); } } //根据url开启一个服务,好比绑定端口,开始接受请求信息(**继续看这里***) openServer(url); return exporter; }
private void openServer(URL url) { //key=host:port 用于定位server String key = url.getAddress(); //client也能够暴露一个只有server能够调用的服务。 boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); if (isServer) { //服务实例放到serverMap,key是host:port //这里的serverMap也单例的 ExchangeServer server = serverMap.get(key); if (server == null) { //经过createServer(url)方法获取server (***看这里***) serverMap.put(key, createServer(url)); } else { //server支持reset,配合override功能使用 server.reset(url); } } } /*** * 开启服务 * @param url * @return */ private ExchangeServer createServer(URL url) { //默认开启server关闭时发送readonly事件 url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); //默认开启heartbeat url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); /*** * 经过server key 检查是不是dubbo目前spi扩展支持的传输框架。默认是netty */ if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url); //经过codec key 获取编解码方案,兼容dubbo1,默认是dubbo1compatible ,不然默认dubbo 编解码方案 url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); ExchangeServer server; try { //构造具体服务实例, //Exchangers是门面类,里面封装了具体交换层实现,并调用它的bind方法(***看这里***) server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } //这里会验证一下,客户端传输层实现 //若是没有对应的实现,会抛出异常 str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; }
跟踪方法
//Exchangers.bind方法 public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); //经过spi会走HeaderExchanger的bind逻辑 return getExchanger(url).bind(url, handler); } //HeaderExchanger的bind方法 public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { //Transporters也是门面类,封装了具体的传输层实现查找和bind过程 return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } //HeaderExchangeServer的构造函数 public HeaderExchangeServer(Server server) { if (server == null) { throw new IllegalArgumentException("server == null"); } this.server = server; this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); if (heartbeatTimeout < heartbeat * 2) { throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); } //开启心跳 startHeatbeatTimer(); }
继续跟进方法
//Transporters.bind方法 public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } //根据spi 这里具体走 NettyTransporter.bind return getTransporter().bind(url, handler); } //NettyTransporter的bind方法 public Server bind(URL url, ChannelHandler listener) throws RemotingException { //能够看到这里是NettyServer实例 return new NettyServer(url, listener); } //NettyServer构造器 public NettyServer(URL url, ChannelHandler handler) throws RemotingException { //调用父类AbstractServer构造器 super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); }
父类AbstractServer的构造函数
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); localAddress = getUrl().toInetSocketAddress(); String host = url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(getUrl().getHost()) ? NetUtils.ANYHOST : getUrl().getHost(); bindAddress = new InetSocketAddress(host, getUrl().getPort()); this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); try { //打开端口,启动服务,在其子类实现,这里是NettyServer的doOpen()方法(***看这里**) doOpen(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } //fixme replace this with better method DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort())); }
NettyServer的doOpen()方法:
protected void doOpen() throws Throwable { //经过netty 开启服务监听端口 NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); // https://issues.jboss.org/browse/NETTY-365 // https://issues.jboss.org/browse/NETTY-379 // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true)); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); /*int idleTimeout = getIdleTimeout(); if (idleTimeout > 10000) { pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0)); }*/ pipeline.addLast("decoder", adapter.getDecoder());//解码器 pipeline.addLast("encoder", adapter.getEncoder());//编码器 pipeline.addLast("handler", nettyHandler);//NettyHandler 扩展netty双向handler基类r 能够接受进站和出站数据流 return pipeline; } }); // bind 地址 开启端口 channel = bootstrap.bind(getBindAddress()); }
以上基本梳理了的dubbo从service配置解析到生成服务代理,并经过netty开启服务端口的服务暴露过程。