本节将主要学习Dubbo是如何使用Netty来实现网络通信的。 从官网咱们得知,Dubbo协议是使用单一长链接来进行网络传输,也就是说服务调用方持久与服务提供者创建一条链接,全部的服务调用调用信息经过。 一条TCP链接进行传输,在网络层至少要考虑以下问题:缓存
服务端,客户端网络通信模型(线程模型)网络
传输(编码解码、序列化)。架构
服务端转发策略等。并发
Dubbo服务端的网络启动流程,在上篇中已给出序列图,本节仍是以该图为切入点,引入本文的两个主人公:NettyServer、NettyClient。app
dubbo使用SPI机制,根据配置,能够支持以下框架实现网络通信模型,例如netty3,netty四、mina、grizzly,本文重点分析基于Netty4的实现,包路径:dubbo-remoting-netty4。 从上面的流程图,NettyTransport的职责就是调用new NettyServer的构造方法,从而构建NettyServer对象,在深刻NettyServer对象构造过程以前,先来看一下NettyServer的类继承层次:
框架
NettyServer构造函数:异步
public NettyServer(URL url, ChannelHandler handler) throws RemotingException { // @1 super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); // @2 }
代码@1:URL url:服务提供者URL;ChannelHandler网络事件处理器, 分布式
也就是当相应网络事件触发时,执行的事件处理器。ide
代码@2:调用ChannelHandlers.wrap对原生Handler进行包装,而后调用其父类的构造方法,首先,设置Dubbo服务端线程池中线程的名称,能够经过参数threadname来指定线程池中线程的前缀,默认为:DubboServerHandler + dubbo服务端IP与接口号。我比较好奇的是这里为何须要对ChannelHandler进行包装呢?是增长了些什么逻辑呢?带着者问题,引出本节重点探讨的内容:事件派发机制。 事件派发机制指的是网络事件(链接、读、写)等事件触发后,这些事件如何执行,是由IO线程仍是派发到线程池中执行。Dubbo定义了以下5种事件派发机制: 函数
本文将详细分析各类事件的派发实现原理。 ChannelHandlers#wrapInternal
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url))); }
这里是典型的装饰模式,MultiMessageHandler,多消息处理Handler,HeartbeatHandler,心跳Handler,其主要功能是处理心跳返回与心跳请求,直接在IO线程中执行,每次收到信息,更新通道的读事件戳,每次发送数据时,记录通道的写事件戳。这里的核心关键是利用SPI自适配,返回合适的事件派发机制。Dispatcher的类层次结构如图所示:
线程派发机制:全部的消息都派发到线程池,包括请求、响应、链接事件、断开事件、心跳等。
public class AllDispatcher implements Dispatcher { public static final String NAME = "all"; @Override public ChannelHandler dispatch(ChannelHandler handler, URL url) { return new AllChannelHandler(handler, url); } }
从中能够看出,事件派发类继承图分两个维度,Dispatcher(事件派发器)、与之对应的ChannelHandler,例如AllChannelHandler。
接下来分析事件派发机制,重点关注ChannelHandler类的实现体系。
纵观Dubbo ChannelHanler体系的设计,是经典的类装饰器模式,上述派发器主要解决的问题,是相关网络事件(链接、读(请求)、写(响应)、心跳请求、心跳响应)是在IO线程、仍是在额外定义的线程池,故WrappedChannelHandler的主要职责是定义线程池相关的逻辑,具体是在IO线程上执行,仍是在定义的线程池中执行,则由子类具体去定制,WrappedChannelHandler默认实现ChannelHandler的全部方法,各个方法的实现直接调用被装饰Handler的方法,见下图:
接下来先重点关注一下WrappedChannelHandler的成员变量和构造方法的实现。
protected static final ExecutorService SHARED_EXECUTOR = Executors.newCachedThreadPool(new NamedThreadFactory("DubboSharedHandler", true)); protected final ExecutorService executor; protected final ChannelHandler handler; protected final URL url;
public WrappedChannelHandler(ChannelHandler handler, URL url) { this.handler = handler; this.url = url; executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); // @1 String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY; if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) { componentKey = Constants.CONSUMER_SIDE; } DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); dataStore.put(componentKey, Integer.toString(url.getPort()), executor); // @2 }
代码@1:构建线程池,这里基于SPI机制,用户可选择cached、eager、fixed、limited,将在本节下面详细介绍,这里只须要知道是构建了一个线程池。 代码@2:将服务端都与线程池缓存起来,在服务端,线程池的缓存级别是 服务提供者协议(端口):线程池。
事件派发机制:全部网络事件在线程池中执行,其实现机制确定是重写ChannelHandler的全部网络事件方法,将调用其修饰的ChannelHanlder在线程池中执行。因为AllChannelHandler是第一个事件派发机制,故对其实现作一个详细描述。
public void connected(Channel channel) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t); } }
链接事件,其主要实现是,首先先获取执行线程池,其获取逻辑是若是executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class). getAdaptiveExtension().getExecutor(url);获取不到线程池,则使用共享线程池。能够看出,链接事件的业务调用时异步执行,基于线程池。 注:调用时机,服务端收到客户端链接后,该方法会被调用。
public void disconnected(Channel channel) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED)); } catch (Throwable t) { throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t); } }
其基本实现与connected相同,就是将具体的disconnected 事件所对应的业务扩展方法在线程池中执行。 注:调用时机,服务端收到客户端断开链接后,该方法会被调用。
public void received(Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out if(message instanceof Request && t instanceof RejectedExecutionException){ Request request = (Request)message; if(request.isTwoWay()){ String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); channel.send(response); return; } } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } }
调用时机:当服务端收到客户端发送的请求后,通过IO线程(Netty)会首先从二进制流中解码出一个个的请求,参数Object message,就是调用请求,而后在提交给线程池执行,执行完后,当业务处理完毕后,组装结果后,必然会在该线程中调用通道(Channel#write,flush)方法,向通道写入响应结果。 注:all事件派发机制,ChannelHandler#recive是在线程池中执行。
public void caught(Channel channel, Throwable exception) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception)); } catch (Throwable t) { throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t); } }
当发生异常时,ChannelHandler#caught也在线程池中执行。 使人颇感意外的是,AllChannelHandler并未重写WrappedChannelHandler的sent方法,也就是说ChannelHandler#sent事件回调方法,是在IO线程中执行。 WrappedChannelHandler#sent
public void sent(Channel channel, Object message) throws RemotingException { handler.sent(channel, message); }
这个和官方文档仍是有必定出入的。
对应事件派件器:ExecutionDispatcher,其配置值:execution,从其源码的实现来看,与AllDispatcher实现基本相似,惟一的区别是,若是executor线程池为空时,并不会使用共享线程池,目前我还想不出什么状况下,线程池会初始化失败。
直接派发,也就是全部的事件所有在IO线程中执行,故其实现很是简单:
public class DirectDispatcher implements Dispatcher { public static final String NAME = "direct"; @Override public ChannelHandler dispatch(ChannelHandler handler, URL url) { return handler; } }
事件派发器:只有请求事件在线程池中执行,其余响应事件、心跳,链接,断开链接等事件在IO线程上执行,故其只须要重写recive方法便可:
@Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = executor; if (cexecutor == null || cexecutor.isShutdown()) { cexecutor = SHARED_EXECUTOR; } try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } }
事件派发器:链接、断开链接事件排队执行,并可经过connect.queue.capacity属性设置队列长度,请求事件、异常事件在线程池中执行。
public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) { super(handler, url); String threadName = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); connectionExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)), new NamedThreadFactory(threadName, true), new AbortPolicyWithReport(threadName, url) ); // FIXME There's no place to release connectionExecutor! queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE); }
重点关注一下connectionExecutor ,用来执行链接、断开事件的线程池,线程池中只有一个线程,而且队列能够选择时有界队列,经过connect.queue.capacity属性配置,超过的事件,则拒绝执行。
public void connected(Channel channel) throws RemotingException { try { checkQueueLength(); connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t); } }
检查队列长度,若是超过警告值,则输出警告信息,而后提交链接线程池中执行,disconnected事件相似。其余received、caught事件,则与AllDispatcher相同,就不在重复。
总结:本文主要是分析阐述了Dubbo Dispatch 机制,但与官方文档存在出入,先概括以下: Dispatch:全部的sent事件方法、心跳请求所有在IO线程上执行。
>做者介绍:丁威,《RocketMQ技术内幕》做者,RocketMQ 社区优秀布道师、CSDN2019博客之星TOP10,维护公众号:中间件兴趣圈目前已陆续发表源码分析Java集合、Java 并发包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源码专栏。能够点击连接加入中间件知识星球 ,一块儿探讨高并发、分布式服务架构,交流源码。 </runnable>