dubbo经过netty将请求发送到provider的时候,provider以前已经启动好的NettyServer监听指定端口的时候会收到来自consumer的请求,将经过网络发送来的二进制编码成Request交给上层处理。dubbo从Request中取出调用信息,找到以前的Invoker,而后通过filter,最后经过代理调用到提供服务的方法。java
provider处理请求的调用堆栈以下:网络
sayHe110:18, TestDubb0Servicelmpl (com.test.service.impl) invokeMethod:-1, Wrapper1 (com. alibabadubbo. common.bytecode) dolnvoke:46, JavassistProxyFactory$1 (com.alibaba.dubbo.rpc.proxy.javassist) invoke:72, AbstractProxylnvoker (com.alibaba.dubbo.rpc.proxy) invoke:53, InvokerWrapper (com.alibaba.dubbo.rpc.protocol) invoke:64, ExceptionFilter .com alibaba.dubbo.rpc filter) invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol) invoke:64, MonitorFilter .com alibaba.dubbo. monitor.support) invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol) invoke:42, TimeoutFilter .com alibaba.dubbo. rpc.filter) invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol) invoke:49, TokenFilter .com alibaba.dubbo. roc. filter) invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol) invoke:78, TraceFilter .com alibaba dubbo. roc. protocol.dubbo.filter) invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol) invoke:60, ContextFilter .com alibaba.dubbo. roc. filter) invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol) invoke:132, GenericFilter .com alibaba.dubbo. roc. filter) invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol) invoke:38, ClassLoaderFilter .com alibaba dubbo.rpc.filter) invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol) invoke:38, EchoFilter .com alibaba dubbo. rpc filter) invoke:91, ProtocolFilterWrapper$1 (com.alibaba.dubbo.rpc.protocol) reply:108, DubboProtocol$1 .com alibaba dubbo.rpcprotocol.dubbo) handleRequest:86, HeaderExchangeHandler (com.alibaba.dubbo.remoting.exchange.support.header) received:172, HeaderExchangeHandler (com.alibaba dubbo. remoting. exchange.support.header) received:52, DecodeHandler (com.alibaba dubbo.remoting. transport) run:82, ChannelEventRunnable (com.alibaba.dubbo.remoting.transport.dispatcher) runWorker:1142, ThreadPoolExecutor (java.util.concurrent) run:617, ThreadPoolExecutor$Worker (java.util.concurrent) run:745, Thread (java.lang)
从调用堆栈基本能够看出provider整个处理请求的过程,比较简单,可是须要知道为何调用过程是这样的?其中关键类是何时在初始化的?怎么初始化的?app
接下来解决一下问题:socket
- 为何是从ChannelEventRunnable开始的?谁初始化的ChannelEventRunnable?ChannelEventRunnable做用是什么?
- 为何会调用到上面堆栈中的几个handler(也就是handler是怎么初始化的)?
- filter链怎么初始化的?
原本这些问题在export的时候若是仔细查看源码已经能够解决了,可是真正用到的时候是处理请求的时候,因此这里算是补上以前export过程的一些关键步骤。ide
ChannelEventRunnable初始化
上面的调用堆栈中,是在线程池中一个单独的线程来处理请求,因此先从线程池中调用的线程开始,ChannelEventRunnable的构造过程。oop
接着前面provider export的时候会启动NettyServer,因此ChannelEventRunnable的建立也从NettyServer的启动提及,ChannelEventRunnable被初始化的过程会涉及到netty的部份内容:ui
- NettyServer#doOpen,NettyServer启动的时候会建立NioServerSocketChannelFactory,该factory负责建立netty放入全部channel
- 在NioServerSocketChannelFactory构造方法中会初始化NioWorkerPool,在该类的构造方法中建立NioWorker
- 在建立NioWorker的过程当中,调用超类AbstractNioSelector的构造方法
// NioWorker构造方法中会调用超类AbstractNioSelector的构造方法 AbstractNioSelector(Executor executor, ThreadNameDeterminer determiner) { this.executor = executor; openSelector(determiner); } // org.jboss.netty.channel.socket.nio.AbstractNioSelector#openSelector private void openSelector(ThreadNameDeterminer determiner) { try { // open selector selector = SelectorUtil.open(); } catch (Throwable t) { throw new ChannelException("Failed to create a selector.", t); } // Start the worker thread with the new Selector. boolean success = false; try { // new一个thread,将当前初始化的NioWorker做为入参,也就是说最终要运行的是NioWorker.run // 这个start方法里面会将新建的这个线程放到线程池中运行 // 这里的executor就是new NioServerSocketChannelFactory时候的入参worker,也就是worker线程池 DeadLockProofWorker.start(executor, newThreadRenamingRunnable(id, determiner)); success = true; } finally { // 省略中间代码... } assert selector != null && selector.isOpen(); } // org.jboss.netty.channel.socket.nio.AbstractNioWorker#newThreadRenamingRunnable @Override protected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) { // 这里的this就是初始化的NioWorker return new ThreadRenamingRunnable(this, "New I/O worker #" + id, determiner); } // org.jboss.netty.channel.socket.nio.NioWorker#run @Override public void run() { // 上面DeadLockProofWorker.start里面启动的线程会调用这个run方法 // 这里调用了超类的run方法,最终会调用到org.jboss.netty.channel.socket.nio.AbstractNioSelector#run // AbstractNioSelector#run super.run(); recvBufferPool.releaseExternalResources(); } // AbstractNioSelector#run // 这个方法是NioWorker真正处理逻辑的地方,死循环调用select接受IO事件,而后处理 public void run() { thread = Thread.currentThread(); int selectReturnsImmediately = 0; Selector selector = this.selector; if (selector == null) { return; } // use 80% of the timeout for measure final long minSelectTimeout = SelectorUtil.SELECT_TIMEOUT_NANOS * 80 / 100; boolean wakenupFromLoop = false; for (;;) { wakenUp.set(false); try { long beforeSelect = System.nanoTime(); // 监听I/O事件发生 int selected = select(selector); // 省略中间代码... if (shutdown) { // 省略中间代码... } else { // 处理I/O事件 process(selector); } } catch (Throwable t) { // 省略中间代码... } } }
接下来到初始化ChannelEventRunnable的调用堆栈this
终于到了ChannelEventRunnable开始初始化的地方,全部的ChannelEventRunnable都是在AllChannelHandler中完成初始化,并加入到线程池中执行,下面以收到connect事件为例编码
public void connected(Channel channel) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try{ // 初始化ChannelEventRunnable并将其加入线程池 // 这里的线程池是com.alibaba.dubbo.common.threadpool.ThreadPool这个扩展,默认配置的是"fixed",也就是FixedThreadPool cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED)); }catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass()+" error when process connected event ." , t); } }
处理请求
上面最终启动了ChannelEventRunnable线程,在这个线程中会最终调用到咱们的SayHello方法中,这个类负责分类处理各类接收到的I/O事件url
// com.alibaba.dubbo.remoting.transport.dispatcher.ChannelEventRunnable#run public void run() { switch (state) { case CONNECTED: try{ // 接收到链接 handler.connected(channel); }catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; case DISCONNECTED: try{ // 链接断开 handler.disconnected(channel); }catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; case SENT: try{ // 发送数据 handler.sent(channel,message); }catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is "+ message,e); } break; case RECEIVED: try{ // 收到数据 handler.received(channel, message); }catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is "+ message,e); } break; case CAUGHT: try{ // 处理异常 handler.caught(channel, exception); }catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is "+ channel + ", message is: " + message + ", exception is " + exception,e); } break; default: logger.warn("unknown state: " + state + ", message is " + message); } }
上面经过调用handler的相关方法来处理的,接下来看看handler是什么?
handler初始化
从最上面的调用堆栈里面有这些handler
com.alibaba.dubbo.remoting.transport.DecodeHandler#DecodeHandler com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler // 最上面调用堆栈中com alibaba dubbo.rpcprotocol.dubbo.DubboProtocol$1.reply其实就是线面这个接口的实现类 com.alibaba.dubbo.remoting.exchange.ExchangeHandler
以前在dubbo export中说过启动NettyServer的调用堆栈,可是并无详细看每个调用方法,这里把相关重要的方法拿出来
// com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol#requestHandler private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { // 这些请求received、connected、disconnected最终都会调用下面这个方法处理 public Object reply(ExchangeChannel channel, Object message) throws RemotingException { // 省略中间代码... } // 省略中间代码... } // com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol#createServer private ExchangeServer createServer(URL url) { // 省略中间代码... // 这里的handler就是上面初始化的,是一个匿名内部类,也就是com.alibaba.dubbo.remoting.exchange.ExchangeHandler的实现类 server = Exchangers.bind(url, requestHandler); // 省略中间代码... return server; } // com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchanger#bind public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { // 这里的handler就是上面bind方法传入的requestHandler // 因此这里就是初始化DecodeHandler和HeaderExchangeHandler的地方,也就说传入Transporters.bind方法的是DecodeHandler类型 return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }
ChannelEventRunnable中的handler是什么类型?
从最上面的堆栈已经知道这个handler其实就是DecodeHandler,也就是初始化ChannelEventRunnable的时候传入的handler,接下来须要弄清楚的是为何是DecodeHandler。
上面刚说过ChannelEventRunnable的初始化是由AllChannelHandler中的某一个方法初始化的,那么做为构造参数传入ChannelEventRunnable的handler也就是WrappedChannelHandler#handler(这个类是AllChannelHandler的超类),如今要找到AllChannelHandler是怎么初始化的。
// com.alibaba.dubbo.remoting.transport.netty.NettyServer#NettyServer // 上面说handler的初始化的时候,Transporters.bind方法会最终调用NettyServer的构造方法 public NettyServer(URL url, ChannelHandler handler) throws RemotingException{ // 这里的handler就是DecodeHandler super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); } // com.alibaba.dubbo.remoting.transport.dispatcher.ChannelHandlers#wrap public static ChannelHandler wrap(ChannelHandler handler, URL url){ // 这里的handler是DecodeHandler return ChannelHandlers.getInstance().wrapInternal(handler, url); } // com.alibaba.dubbo.remoting.transport.dispatcher.ChannelHandlers#wrapInternal protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { // 这里的handler是DecodeHandler // 先获取Dispatcher的扩展类,默认是com.alibaba.dubbo.remoting.transport.dispatcher.all.AllDispatcher // 而后调用AllDispatcher.dispatch方法 return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url))); } // com.alibaba.dubbo.remoting.transport.dispatcher.all.AllDispatcher#dispatch public ChannelHandler dispatch(ChannelHandler handler, URL url) { // 这里的handler是DecodeHandler,因此AllChannelHandler的超类WrappedChannelHandler#handler就是DecodeHandler return new AllChannelHandler(handler, url); }
也就是ChannelEventRunnable中的handler就是HeaderExchanger#bind方法中new出来的DecodeHandler类型的对象
filter链构造
filter链的构造原本也是在provider export服务的时候完成的,同理consumer端是在refer服务的时候完成filter链的构造。
consumer和provider的filter链都是在下面的类中构造的,查看前面的service_export和service_reference的调用堆栈就能够看到对该类的调用。
// com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper public class ProtocolFilterWrapper implements Protocol { private final Protocol protocol; public ProtocolFilterWrapper(Protocol protocol){ if (protocol == null) { throw new IllegalArgumentException("protocol == null"); } this.protocol = protocol; } public int getDefaultPort() { return protocol.getDefaultPort(); } // service export的时候调用 public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) { return protocol.export(invoker); } // 先构造filter链再继续后面的export return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER)); } // consumer refer的仍是调用 public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } // 这里是先refer调用建立DubboInvoker,而后才构造filter链,由于consumer是先通过filter链,再通过DubboInvoker处理,而provider是先通过DubboProtocol处理,而后调用filter链 return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER); } public void destroy() { protocol.destroy(); } // private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; // 获取全部符合条件的filter扩展,条件包括 // 1. filter扩展类上面group对应的值和要求的group(入参)一致 // 2. url中也能够指定加载的filter或者剔除的filter,url配置的key就是入参的key List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (filters.size() > 0) { for (int i = filters.size() - 1; i >= 0; i --) { final Filter filter = filters.get(i); final Invoker<T> next = last; // 每一个filter使用一个Invoker包裹 last = new Invoker<T>() { public Class<T> getInterface() { return invoker.getInterface(); } public URL getUrl() { return invoker.getUrl(); } public boolean isAvailable() { return invoker.isAvailable(); } public Result invoke(Invocation invocation) throws RpcException { // 将next传入,在filter负责调用,由此构成链 return filter.invoke(next, invocation); } public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; } }
因此如今返回看最前面的调用堆栈一切应该是瓜熟蒂落了,netty接收到I/O请求后,通知到NioWorker,在NioWorker线程中通过pipeline的处理后启动了ChannelEventRunnable线程;在ChannelEventRunnable线程线程中根据接收到的不一样事件调用handler的不一样方法来处理,通过多个handler处理以后,通过的是filter链,最后会调用到咱们编写的service方法。执行完咱们的方法以后,dubo会将结果经过netty发送给consumer。
总结
上面经过提问题的方式,解读了一些阅读源码中的关键代码,如今将service export和service reply结合起来,再去阅读源代码就就本能读懂全部主流程了,就能明白源代码为何这么写。