上一篇文章分析了暴露服务到本地,Dubbo的服务导出1之导出到本地。接下来分析暴露服务到远程。bootstrap
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { if (registryURLs != null && !registryURLs.isEmpty()) { for (URL registryURL : registryURLs) { // 添加动态参数,此动态参数是决定Zookeeper建立临时节点仍是持久节点 url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY)); String proxy = url.getParameter(Constants.PROXY_KEY); if (StringUtils.isNotEmpty(proxy)) { registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy); } // 步骤1)建立Invoker,这里建立Invoker逻辑和上面同样 Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded( Constants.EXPORT_KEY, url.toFullString())); DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this); // 步骤2)暴露服务 Exporter<?> exporter = ServiceConfig.protocol.export(wrapperInvoker); exporters.add(exporter); } } }
/** * 下面分析步骤2,该方法两大核心逻辑,导出服务和注册服务,服务注册下篇文章分析 */ @Override public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { // 1. 导出服务,export invoker,本篇文章仅分析第一步导出服务到远程 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
// zookeeper://10.101.99.127:2181/com.alibaba.dubbo.registry.RegistryService // ?application=demo-provider&dubbo=2.0.2 URL registryUrl = getRegistryUrl(originInvoker); // registry provider,默认返回ZookeeperRegistry实例 final Registry registry = getRegistry(originInvoker); // dubbo://172.22.213.93:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true // &application=demo-provider&default.server=netty4&dubbo=2.0.2&generic=false // &interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=8140&side=provider final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker); // 不配置的话默认返回true boolean register = registeredProviderUrl.getParameter("register", true); ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
// 2.注册服务,这篇文章已经比较长了,决定将步骤2和步骤3新起一篇文章分析,服务暴露以后须要注册到注册中心 if (register) { register(registryUrl, registeredProviderUrl); ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true); } // 3.数据更新订阅 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl); }
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) { String key = getCacheKey(originInvoker); ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper<T>) bounds.get(key); if (exporter == null) { final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); // 调用protocol的export方法导出服务,默认是采用Dubbo协议,对应DubboProtocol的export方法 // 可是这里protocol.export()并非先走DubboProtocol的export方法,而是先走 // ProtocolListenerWrapper的wrapper方法 // 由于ProtocolListenerWrapper对DubboProtocol作了一层包装,具体参考 // https://segmentfault.com/a/1190000020387196,核心方法protocal.export() exporter = new ExporterChangeableWrapper<T>( (Exporter<T>) protocol.export(invokerDelegete), originInvoker); bounds.put(key, exporter); } } } return exporter; }
/** * 上述核心方法protocol.export()会先走到ProtocolListenerWrapper的export方法,该方法是在服务暴露上作了 监听器功能的加强,也就是加上了监听器 */ @Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // 若是是注册中心,则暴露该invoker if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } // 建立一个暴露者监听器包装类对象,暴露服务时这里的protocol是ProtocolFilterWrapper,这里用到了 // Wrapper包装原有的DubboProtocol,能够参考https://segmentfault.com/a/1190000020387196 return new ListenerExporterWrapper<T>(protocol.export(invoker), Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class) .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY))); }
/** * ProtocolFilterWrapper的export方法,该方法是在服务暴露上作了过滤器链的加强,也就是加上了过滤器 */ @Override public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // 若是是注册中心,则直接暴露服务 if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } // 服务提供侧暴露服务,这里经过buildInvokerChain造成了过滤器链 return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); }
/** * 该方法就是建立带Filter链的Invoker对象,倒序的把每个过滤器串连起来,造成一个invoker */ private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; // 得到过滤器的全部扩展实现类实例集合 List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class). getActivateExtension(invoker.getUrl(), key, group); if (!filters.isEmpty()) { // 从最后一个过滤器开始循环,建立一个带有过滤器链的invoker对象 for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { @Override public Class<T> getInterface() { return invoker.getInterface(); } @Override public URL getUrl() { return invoker.getUrl(); }
@Override public boolean isAvailable() { return invoker.isAvailable(); } // 关键在这里,调用下一个filter表明的invoker,把每个过滤器串起来 @Override public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } @Override public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; }
// 通过两个Wrapper的export方法包装以后,走到DubboProtocol的export方法,这里是核心方法 public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // url形如dubbo://172.22.213.93:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true // &application=demo-provider&bind.ip=172.22.213.93&bind.port=20880&dubbo=2.0.2&generic=false // /&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=648&qos.port=22222 // &side=provider×tamp=1569585915258 URL url = invoker.getUrl(); // 获取服务标识,理解成服务坐标也行,由服务组名,服务名,服务版本号以及端口组成,key形如 // com.alibaba.dubbo.demo.DemoService:20880 String key = serviceKey(url); // 建立DubboExporter DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); // 将<key, exporter>键值对放入缓存中 exporterMap.put(key, exporter); // 本地存根相关代码, export an stub service for dispatching event // 删除,暂时尚未分析本地存根相关 // 启动服务器,重点关注这里 openServer(url); optimizeSerialization(url); return exporter; }
// 根据URL值能够猜想,openServer方法就是启动Netty服务器,在172.22.213.93:20880端口上监听调用请求 openServer(url);
/** * 在同一台机器上(单网卡),同一个端口上仅容许启动一个服务器实例,若某个端口上已有服务器实例,此时则调用reset方法 重置服务器的一些配置 */ private void openServer(URL url) { // 获取host:port,并将其做为服务器实例的key,用于标识当前的服务器实例,key形如172.22.213.93:20880 String key = url.getAddress(); //client can export a service which's only for server to invoke boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true); if (isServer) { ExchangeServer server = serverMap.get(key); if (server == null) { // 建立服务器实例,put以后serverMap形如<172.22.213.93:20880, HeaderExchangeServer> serverMap.put(key, createServer(url)); } else { // 服务器已建立,则根据url中的配置重置服务器 server.reset(url); } } }
private ExchangeServer createServer(URL url) { url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); // 添加心跳检测配置到URL中,enable heartbeat by default url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // 获取server参数,默认为netty,这里配置成了netty4,str就为netty4 String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); // 经过SPI检测是否存在server参数所表明的Transporter拓展,不存在则抛出异常 if (str != null && str.length() > 0 && !ExtensionLoader. getExtensionLoader(Transporter.class).hasExtension(str)) throw new RpcException("Unsupported server type: " + str + ", url: " + url); // 添加编码解码器参数 url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server; try { // 建立 ExchangeServer,核心方法 server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); } str = url.getParameter(Constants.CLIENT_KEY); if (str != null && str.length() > 0) { Set<String> supportedTypes = ExtensionLoader. getExtensionLoader(Transporter.class).getSupportedExtensions(); if (!supportedTypes.contains(str)) { throw new RpcException("Unsupported client type: " + str); } } return server; }
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); return getExchanger(url).bind(url, handler); } public static Exchanger getExchanger(URL url) { // 默认type就是header String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER); // 建立HeadExchanger return getExchanger(type); } public static Exchanger getExchanger(String type) { return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type); }
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { // 用传输层绑定返回的server建立对应的信息交换服务端 // 这里也是分红两步,下面先分析bind方法,该方法就是开启Netty4服务器监听请求 // 1) bind方法 // 2) new HeaderExchangeServer(Server server) return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }
// 步骤1)bind方法 public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } // 获取自适应Transporter实例,并调用实例方法. getTransporter()方法获取的Transporter是在运行时动态建立的, // 类名为TransporterAdaptive,也就是自适应拓展类.TransporterAdaptive会在运行时根据传入的URL参数决定加载 // 什么类型的Transporter,这里咱们配置成了netty4的NettyTransporter // String string = url.getParameter("server", url.getParameter("transporter", "netty")); // transporter = ExtensionLoader.getExtensionLoader(Transporter.class).getExtension(string); return getTransporter().bind(url, handler); }
public Server bind(URL url, ChannelHandler listener) throws RemotingException { // 建立一个NettyServer return new NettyServer(url, listener); } public NettyServer(URL url, ChannelHandler handler) throws RemotingException { // 调用父类构造方法,这里的wrap方法返回的是 // MultiMessageHandler->HeartbeatHandler->AllDispatcherHandler->DecodeHandler->HeaderExchangeHandler // -> 表示前一个handler里面包装了下一个handler super(url, ChannelHandlers.wrap(handler,ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); } public static ChannelHandler wrap(ChannelHandler handler, URL url) { return ChannelHandlers.getInstance().wrapInternal(handler, url); } // 包装了MultiMessageHandler功能,增长了多消息处理的功能,以及对心跳消息作了功能加强 protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { // 调用了多消息处理器,对心跳消息进行了功能增强 return new MultiMessageHandler( new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url))); }
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { // url形如dubbo://172.22.213.93:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true // &application=demo-provider&bind.ip=172.22.213.93&bind.port=20880&channel.readonly.sent=true // &codec=dubbo&default.server=netty4&dubbo=2.0.2&generic=false&heartbeat=60000 // &interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=6900&qos.port=22222 // &side=provider×tamp=1569633535398 // handler是MultiMessageHandler实例 super(url, handler); // 从url中得到本地地址,/172.22.213.93:20880 localAddress = getUrl().toInetSocketAddress(); // 从url配置中得到绑定的ip,本机IP地址172.22.213.93 String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost()); // 从url配置中得到绑定的端口号,20880 int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort()); // 判断url中配置anyhost是否为true或者判断host是否为不可用的本地Host,url中配置了anyhost为true if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) { bindIp = NetUtils.ANYHOST; }
// /0.0.0.0:20880 bindAddress = new InetSocketAddress(bindIp, bindPort); // 从url中获取配置,默认值为0 this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); // 从url中获取配置,默认600s this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); try { // 开启服务器 doOpen(); } DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort())); }
/** * 该类是端点的抽象类,其中封装了编解码器以及两个超时时间. * 基于dubbo 的SPI机制,得到相应的编解码器实现对象,编解码器优先从Codec2的扩展类中寻找 */ public AbstractEndpoint(URL url, ChannelHandler handler) { super(url, handler); this.codec = getChannelCodec(url); // 优先从url配置中取,若是没有,默认为1s this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 优先从url配置中取,若是没有,默认为3s this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT); }
// 该方法是建立服务器,而且开启 @Override protected void doOpen() throws Throwable { // 建立服务引导类 bootstrap = new ServerBootstrap(); // 建立boss线程组 bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); // 建立worker线程组 workerGroup = new NioEventLoopGroup( getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true)); // 建立服务器处理器 final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); // 得到通道集合 channels = nettyServerHandler.getChannels(); // 设置eventLoopGroup还有可选项 bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { // 编解码器 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); // 增长责任链 ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO)) .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) // 加入了NettyServerHandler,后续消息处理应该就是经过这个来处理,猜想,TODO .addLast("handler", nettyServerHandler); } }); // bind绑定,这里bind完成以后Netty服务器就启动了,监听20880端口上的请求,有兴趣能够研究下Netty的源码 ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); // 等待绑定完成 channelFuture.syncUninterruptibly(); // 设置通道 channel = channelFuture.channel(); }
// 步骤2)new HeaderExchangeServer(Server server) // 构造函数就是对属性的设置,心跳的机制以及默认值都跟HeaderExchangeClient中的如出一辙 public HeaderExchangeServer(Server server) { if (server == null) { throw new IllegalArgumentException("server == null"); } this.server = server; //得到心跳周期配置,若是没有配置,默认设置为0 this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); // 得到心跳超时配置,默认是心跳周期的三倍 this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); // 若是心跳超时时间小于心跳周期的两倍,则抛出异常 if (heartbeatTimeout < heartbeat * 2) { throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); } // 开始心跳 startHeartbeatTimer(); }
/** * 该方法是开始心跳,跟HeaderExchangeClient类中的开始心跳方法惟一区别是得到的通道不同, * 客户端跟通道是一一对应的,全部只要对一个通道进行心跳检测,而服务端跟通道是一对多的关系, * 全部须要对该服务器链接的全部通道进行心跳检测 */ private void startHeartbeatTimer() { // 先中止现有的心跳检测 stopHeartbeatTimer(); if (heartbeat > 0) { // 建立心跳定时器 heartbeatTimer = scheduled.scheduleWithFixedDelay( new HeartBeatTask(new HeartBeatTask.ChannelProvider() { @Override public Collection<Channel> getChannels() { // 返回一个不可修改的链接该服务器的信息交换通道集合 return Collections.unmodifiableCollection( HeaderExchangeServer.this.getChannels()); } }, heartbeat, heartbeatTimeout), heartbeat, heartbeat, TimeUnit.MILLISECONDS); } }
/** * 该类实现了Runnable接口,实现的是心跳任务,里面包含了核心的心跳策略 */ final class HeartBeatTask implements Runnable { // 通道管理 private ChannelProvider channelProvider; // 心跳间隔,单位:ms private int heartbeat; // 心跳超时时间,单位:ms private int heartbeatTimeout; HeartBeatTask(ChannelProvider provider, int heartbeat, int heartbeatTimeout) { this.channelProvider = provider; this.heartbeat = heartbeat; this.heartbeatTimeout = heartbeatTimeout; }
/** * 该方法中是心跳机制的核心逻辑,注意如下几个点: * * 若是须要心跳的通道自己若是关闭了,那么跳过,不添加心跳机制. * 不管是接收消息仍是发送消息,只要超过了设置的心跳间隔,就发送心跳消息来测试是否断开 * 若是最后一次接收到消息到到如今已经超过了心跳超时时间,那就认定对方的确断开,分两种状况来处理对方断开的状况. * 分别是服务端断开,客户端重连以及客户端断开,服务端断开这个客户端的链接.这里要好好品味一下谁是发送方, * 谁在等谁的响应,苦苦没有等到. */ @Override public void run() { try { long now = System.currentTimeMillis(); // 遍历全部通道 for (Channel channel : channelProvider.getChannels()) { // 若是通道关闭了,则跳过 if (channel.isClosed()) { continue; }
try { // 最后一次接收到消息的时间戳 Long lastRead = (Long) channel.getAttribute( HeaderExchangeHandler.KEY_READ_TIMESTAMP); // 最后一次发送消息的时间戳 Long lastWrite = (Long) channel.getAttribute( HeaderExchangeHandler.KEY_WRITE_TIMESTAMP); // 若是最后一次接收或者发送消息到时间到如今的时间间隔超过了心跳间隔时间 if ((lastRead != null && now - lastRead > heartbeat) || (lastWrite != null && now - lastWrite > heartbeat)) { // 建立一个request,设置版本号,设置须要获得响应 Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); // 设置事件类型,为心跳事件 req.setEvent(Request.HEARTBEAT_EVENT); // 发送心跳请求 channel.send(req); }
// 若是最后一次接收消息的时间到如今已经超过了超时时间 if (lastRead != null && now - lastRead > heartbeatTimeout) { // 若是该通道是客户端,也就是请求的服务器挂掉了,客户端尝试重连服务器 if (channel instanceof Client) { try { // 从新链接服务器 ((Client) channel).reconnect(); } catch (Exception e) { //do nothing } } else { // 若是不是客户端,也就是是服务端返回响应给客户端,可是客户端挂掉了, // 则服务端关闭客户端链接 channel.close(); } } } } } }
interface ChannelProvider { // 得到全部的通道集合,须要心跳的通道数组 Collection<Channel> getChannels(); }
}segmentfault