dubbo目前的不支持优雅停机,是由于服务端关闭时,客户端主动关闭长链接,致使服务端响应消息不能返回。java
在服务端和客户端配置参数:-Ddubbo.service.shutdown.wait=30000,该参数为停机等待时间,可是结果也不生效,仍是立刻停机了。app
当服务端停机时,客户端检测断开事件,立刻关闭了与该服务端之间的链接,并没有限次重连服务提供折。若是服务提供者只有一个时,则不在进行服务的远程调用。框架
客户端断开链接时序图:函数
经过zk的节点变化事件触发notify,客户端刷新服务提供者,删除该服务提供者【destroyUnusedInvokers方法】;若是只有一个服务提供者时,则销毁全部Invoker【destroyAllInvokers方法】。测试
在dubboInvoker销毁时,直接调用了client.clise()方法,源码以下:this
public void destroy() { if (super.isDestroyed()){ return ; } else { destroyLock.lock(); try{ if (super.isDestroyed()){ return ; } super.destroy(); if (invokers != null){ invokers.remove(this); } for (ExchangeClient client : clients) { try { //直接关闭 client.close(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } }finally { destroyLock.unlock(); } } }
修改client.close()为client.clise(timeout),代码修改以下:url
public void destroy() { if (super.isDestroyed()){ return ; } else { destroyLock.lock(); try{ if (super.isDestroyed()){ return ; } super.destroy(); if (invokers != null){ invokers.remove(this); } for (ExchangeClient client : clients) { try { //修改关闭 this.close(client); } catch (Throwable t) { logger.warn(t.getMessage(), t); } } }finally { destroyLock.unlock(); } } } /** * 若是没有设置dubbo.service.shutdown.wait * 或者dubbo.service.shutdown.wait.seconds参数,则直接关闭 * @param client * @author 夏志强 */ @SuppressWarnings("deprecation") private void close(ExchangeClient client) { String timeout = ConfigUtils.getProperty(Constants.SHUTDOWN_WAIT_KEY); if(timeout != null && timeout.length() > 0) { try{ client.close(Integer.parseInt(timeout)); } catch(Exception e) { } } else { timeout = ConfigUtils.getProperty(Constants.SHUTDOWN_WAIT_SECONDS_KEY); if(timeout != null && timeout.length() > 0) { try{ client.close(Integer.parseInt(timeout)); } catch(Exception e) { } } else { client.close(); } } }
修改完上述代码,运行后发现客户端仍是立刻关闭链接,调试代码发现HeaderExchangeChannel的close方法里,判断HeaderExchangeChannel.this是否在DefaultFuture中和优雅退出是否超过超时时间,若是过了超时时间,则当即关闭,源码以下:spa
public void close(int timeout) { if (closed) { return; } closed = true; if (timeout > 0) { long start = System.currentTimeMillis(); //DefaultFuture中CHANNELS不包含HeaderExchangeChannel类型 while (DefaultFuture.hasFuture(HeaderExchangeChannel.this) && System.currentTimeMillis() - start < timeout) { try { Thread.sleep(10); } catch (InterruptedException e) { logger.warn(e.getMessage(), e); } } } close(); }
查看HeaderExchangeChannel的request方法,线程
DefaultFuture future = new DefaultFuture(channel, req, timeout);调试
实际上channel是NettyClient,因此channel不会是HeaderExchangeChannel类型。
修改代码以下:
// graceful close public void close(int timeout) { if (closed) { return; } closed = true; if (timeout > 0) { long start = System.currentTimeMillis(); while (DefaultFuture.hasFuture(channel) && System.currentTimeMillis() - start < timeout) { try { Thread.sleep(10); } catch (InterruptedException e) { logger.warn(e.getMessage(), e); } } } close(); }
再次运行代码,发现结果仍是不对,客户端一直在等待关闭,而服务端已经关闭了。
目前我分析的是ProtocolConfig的destoryAll()方法,主要分两步,第一步删除zk服务提供者节点,关闭zk监听;第二步dubbo协议销毁,连接关闭。
dubbo协议销毁时序图:
经过时序图看到服务端的优雅关闭是在AbstractServer的close(timeout)方法,源码以下:
public void close(int timeout) { ExecutorUtil.gracefulShutdown(executor ,timeout); close(); } //ExecutorUtil类 public static void gracefulShutdown(Executor executor, int timeout) { if (!(executor instanceof ExecutorService) || isShutdown(executor)) { return; } final ExecutorService es = (ExecutorService) executor; try { es.shutdown(); // Disable new tasks from being submitted } catch (SecurityException ex2) { return ; } catch (NullPointerException ex2) { return ; } try { if(! es.awaitTermination(timeout, TimeUnit.MILLISECONDS)) { es.shutdownNow(); } } catch (InterruptedException ex) { es.shutdownNow(); Thread.currentThread().interrupt(); } if (!isShutdown(es)){ newThreadToCloseExecutor(es); } }
其中executor是dubbo的线程派发模型,只有executor不为空时,才会等待线程池任务执行完后关闭。这里有一个坑,那就是调用es.awaitTermination时,必定要先调用es.shutdown(),不然就算线程池中的任务执行完或者超时后,都不会关闭,一直阻塞【详情请查看jdk】。调试时发现executor为空,executor值是经过AbstractServer的构造函数来初始化的,源码以下:
ExecutorService executor; public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); localAddress = getUrl().toInetSocketAddress(); String host = url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(getUrl().getHost()) ? NetUtils.ANYHOST : getUrl().getHost(); bindAddress = new InetSocketAddress(host, getUrl().getPort()); this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); try { doOpen(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } //设置executor if (handler instanceof WrappedChannelHandler ){ executor = ((WrappedChannelHandler)handler).getExecutor(); } }
咱们默认dubbo的底层通讯框架为netty,因此查看NettyServer代码,
public NettyServer(URL url, ChannelHandler handler) throws RemotingException{ super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); } //如下为ChannelHandlers代码 public static ChannelHandler wrap(ChannelHandler handler, URL url){ return ChannelHandlers.getInstance().wrapInternal(handler, url); } protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url))); }
构造函数中将handler进行了包装,此时handler类型已经变成了MultiMessageHandler,而不是WrappedChannelHandler。看一下handler的继承关系:
由于dubbo的默认线程模型为AllChannelHandler(参照dubbo官方文档),AllChannelHandler父类为WrappedChannelHandler类型,因此须要经过反射来设置executor值。
修改AbstractServer的构造函数,给executor赋值:
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); localAddress = getUrl().toInetSocketAddress(); String host = url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(getUrl().getHost()) ? NetUtils.ANYHOST : getUrl().getHost(); bindAddress = new InetSocketAddress(host, getUrl().getPort()); this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS); this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); try { doOpen(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } //修改s this.setExecutor(handler); } /** * 设置executor * @param handler * @author 夏志强 */ private void setExecutor(ChannelHandler handler) { if(handler != null) { if (handler instanceof WrappedChannelHandler ){ executor = ((WrappedChannelHandler)handler).getExecutor(); } else if (handler instanceof AbstractChannelHandlerDelegate ){ try { Field field = AbstractChannelHandlerDelegate.class.getDeclaredField("handler"); field.setAccessible(true); setExecutor((ChannelHandler)field.get(handler)); } catch (Exception e) { } } } }
再次运行测试代码,这时返回结果正常。