尊重原创,转载注明出处,原文地址:http://www.cnblogs.com/cishengchongyan/p/6160194.html html
本文咱们将先从NioEventLoop开始来学习服务端的处理流程。话很少说,开始学习~~~~react
咱们从上文中已经知道server在启动的时候会开启两个线程:bossGroup和workerGroup,这两个线程分别是boss线程池(用于接收client请求)和worker线程池(用于处理具体的读写操做),这两个线程调度器都是NioEventLoopGroup,bossGroup有一个NioEventLoop,而worker线程池有n*cup数量个NioEventLoop。那么咱们看看在NioEventLoop中的是如何开始的:linux
NioEventLoop本质上是一个线程调度器(继承自ScheduledExecutorService),当bind以后就开始run起一个线程: promise
(代码一)
1 @Override 2 protected void run() { 3 for (;;) { 4 boolean oldWakenUp = wakenUp.getAndSet(false); 5 try { 6 if (hasTasks()) { 7 selectNow(); 8 } else { 9 select(oldWakenUp); 10
11 if (wakenUp.get()) { 12 selector.wakeup(); 13 } 14 } 15
16 cancelledKeys = 0; 17 needsToSelectAgain = false; 18 final int ioRatio = this.ioRatio; 19 if (ioRatio == 100) { 20 processSelectedKeys(); 21 runAllTasks(); 22 } else { 23 final long ioStartTime = System.nanoTime(); 24
25 processSelectedKeys(); 26
27 final long ioTime = System.nanoTime() - ioStartTime; 28 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); 29 } 30
31 if (isShuttingDown()) { 32 closeAll(); 33 if (confirmShutdown()) { 34 break; 35 } 36 } 37 } catch (Throwable t) { 38 ... 39 } 40 } 41 }
这个for(;;)里面就是boss线程的核心处理流程:异步
【代码一主线】1,不断地监听selector拿到socket句柄而后建立channel。每次run的时候先拿到wakeup的值,而且set进去false(PS:wakeup是什么鬼?一个AtomicBoolean,表明是否用户唤醒,若是不人为将其set成true,永远是false)。socket
【代码一主线】2,若是任务队列中已有任务,那么selectNow(),(PS:selectNow是什么鬼?咱们知道selector.select()是一个阻塞调用,而selectNow方法是个非阻塞方法,若是没有到达的socket句柄则返回0),所以若队列中已有任务的话应该当即开始执行,而不能阻塞到selector.select()上,不然则调用select()方法,继续看select()里面:ide
(代码二)
1 private void select(boolean oldWakenUp) throws IOException { 2 Selector selector = this.selector; 3 try { 4 int selectCnt = 0; 5 long currentTimeNanos = System.nanoTime(); 6 long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); 7 for (;;) { 8 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; 9 if (timeoutMillis <= 0) { 10 if (selectCnt == 0) { 11 selector.selectNow(); 12 selectCnt = 1; 13 } 14 break; 15 } 16
17 int selectedKeys = selector.select(timeoutMillis); 18 selectCnt ++; 19
20 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { 21 // 若是selectedKeys不为空、或者被用户唤醒、或者队列中有待处理任务、或者调度器中有任务,则break
22 break; 23 } 24 if (Thread.interrupted()) { 25 //若是线程被中断则重置selectedKeys,同时break出本次循环,因此不会陷入一个繁忙的循环。
26 selectCnt = 1; 27 break; 28 } 29
30 long time = System.nanoTime(); 31 if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { 32 // selector超时
33 selectCnt = 1; 34 } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { 35 // selector屡次过早返回,从新创建并打开Selector
36 ... 37 } 38
39 currentTimeNanos = time; 40 } 41 ... 42 } catch (CancelledKeyException e) { 43 ... 44 } 45 }
咱们看到,select()方法进入一个for循环去select阻塞等待socket(这里的selector的实如今是根据操做系统和netty的版原本定的,在最新的netty中是使用的linux的epoll模型),同时入参里有“超时时间”,若是超过了这个时间仍然没有socket到来则从新将selectCnt置为1从新循环等待,直到有socket到来。若是selectedKeys不为空、或者被用户唤醒、或者队列中有待处理任务、或者调度器中有任务,那么就是说该eventLoop有活干了,先break出去去干活,完了再打开selector从新阻塞等待。正常状况下会等待到一个socket,break出去以后回到代码一oop
【代码一主线】3,根据ioRatio来选择任务执行策略(PS:ioRatio是什么鬼?看了下用途应该是这样的,这个ioRatio表明该eventLoop指望在I/O操做上花费时间的比例)。而NioEventLoop中有两类操做,一类是I/O操做(读写之类),调用processSelectedKeys;一类是非I/O操做(例如register等),调用runAllTasks。若是ioRatio是100的话那么会按照顺序执行I/O操做->非I/O操做;若是不是会按照这个比例算出一个超时时间,在run任务队列的时候若是超过了这个时间会当即返回,确保I/O操做能够获得及时的调用。学习
咱们关心的是I/O操做,那么进入processSelectedKeys()看下发生了什么吧。this
(代码三)
1 private void processSelectedKeys() { 2 if (selectedKeys != null) { 3 processSelectedKeysOptimized(selectedKeys.flip()); 4 } else { 5 processSelectedKeysPlain(selector.selectedKeys()); 6 } 7 }
正常状况下会走到processSelectedKeysOptimized中:
(代码四)
1 private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { 2 for (int i = 0;; i ++) { 3 final SelectionKey k = selectedKeys[i]; 4 if (k == null) { 5 break; 6 } 7 selectedKeys[i] = null; 8
9 final Object a = k.attachment(); 10
11 if (a instanceof AbstractNioChannel) { 12 processSelectedKey(k, (AbstractNioChannel) a); 13 } else { 14 @SuppressWarnings("unchecked") 15 NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; 16 processSelectedKey(k, task); 17 } 18
19 if (needsToSelectAgain) { 20 for (;;) { 21 if (selectedKeys[i] == null) { 22 break; 23 } 24 selectedKeys[i] = null; 25 i++; 26 } 27
28 selectAgain(); 29 selectedKeys = this.selectedKeys.flip(); 30 i = -1; 31 } 32 } 33 }
遍历拿到全部的SelectionKey,而后判断每一个SelectionKey的attachment,上篇文章中已经分析过给ServerBootstrap注册的Channel是NioServerSocketChannel(继承自AbstractNioChannel),所以进入processSelectedKey中:
(代码五)
1 private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { 2 final NioUnsafe unsafe = ch.unsafe(); 3 if (!k.isValid()) { 4 unsafe.close(unsafe.voidPromise()); 5 return; 6 } 7
8 try { 9 int readyOps = k.readyOps(); 10 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { 11 unsafe.read(); 12 if (!ch.isOpen()) { 13 return; 14 } 15 } 16 if ((readyOps & SelectionKey.OP_WRITE) != 0) { 17 ch.unsafe().forceFlush(); 18 } 19 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { 20 int ops = k.interestOps(); 21 ops &= ~SelectionKey.OP_CONNECT; 22 k.interestOps(ops); 23
24 unsafe.finishConnect(); 25 } 26 } catch (CancelledKeyException ignored) { 27 unsafe.close(unsafe.voidPromise()); 28 } 29 }
在这里根据传入的SelectionKey的已就绪操做类型来决定下一步的操做,若是是一个读操做,那么进入AbstractNioMessageChannel$NioMessageUnsafe的read实现,这里代码不少,咱们只贴一下核心的代码:
(代码六)
1 @Override 2 public void read() { 3 ... 4 final ChannelPipeline pipeline = pipeline(); 5 ... 6 try { 7 int size = readBuf.size(); 8 for (int i = 0; i < size; i ++) { 9 pipeline.fireChannelRead(readBuf.get(i)); 10 } 11 ... 12 readBuf.clear(); 13 pipeline.fireChannelReadComplete(); 14 } finally { 15 } 16 }
核心就是这个pipeline.fireChannelRead(readBuf.get(i));,这已经到了pipeline阶段,可能有些人会误觉得这是否是已经到了worker线程中,可是不可能啊,咱们的代码其实在处于processSelectedKeys的逻辑里面。实际上,不管是boss仍是worker,他们都是NioEventLoopGroup,玩法都是同样的,只不过职责不同而已。boss也有本身的handler,上篇文章中咱们提到了netty中的reactor模式的玩法,从Doug Lea的图中能够看出,boss(实际上就是mainReactor)的handler其实就是这个acceptor。
在此咱们顺便学习一下netty中的handler:
从用途上来讲,handler分为ChannelInboundHandler(读)和ChannelOutboundHandler(写),增长一层适配器产生了两handler的Adapter,咱们使用到的类都是继承自这两个Adapter。咱们常常用到的SimpleChannelInboundHandler就继承ChannelInboundHandlerAdapter,用于初始化用户handler链的ChannelInitializer和boss线程绑定的ServerBootstrapAcceptor也都继承于此。
回到【代码六主线】咱们从pipeline.fireChannelRead继续追踪下去会追到ChannelInboundHandler的channelRead的实现,而这里的Hander就是ServerBootstrapAcceptor。
(代码七)
1 @Override 2 @SuppressWarnings("unchecked") 3 public void channelRead(ChannelHandlerContext ctx, Object msg) { 4 final Channel child = (Channel) msg; 5
6 child.pipeline().addLast(childHandler); 7
8 for (Entry<ChannelOption<?>, Object> e: childOptions) { 9 try { 10 if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { 11 } 12 } catch (Throwable t) { 13 } 14 } 15
16 for (Entry<AttributeKey<?>, Object> e: childAttrs) { 17 child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); 18 } 19
20 try { 21 childGroup.register(child).addListener(new ChannelFutureListener() { 22 @Override 23 public void operationComplete(ChannelFuture future) throws Exception { 24 if (!future.isSuccess()) { 25 forceClose(child, future.cause()); 26 } 27 } 28 }); 29 } catch (Throwable t) { 30 forceClose(child, t); 31 } 32 }
因为ServerBootstrapAcceptor 很重要,咱们先看一下都有什么内容:
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { private final EventLoopGroup childGroup; private final ChannelHandler childHandler; private final Entry<ChannelOption<?>, Object>[] childOptions; private final Entry<AttributeKey<?>, Object>[] childAttrs; }
我本身的理解:
childGroup就是subReactor(也就是worker线程);childHandler就是xxx;childOptions和childAttrs是为channel准备的一些参数。
回到【代码七主线】在这里作了3件事:
1.为客户端channel的pipeline中添加childHandler,那么这个childHandler是什么鬼呢?回忆一下上文中的服务端启动代码,有bootStrap.childHandler(xxx)这样的代码,因此此处就是把在服务端启动时咱们定义好的Handler链绑定给每一个channel。
2.把咱们服务端初始化时的参数绑定到每一个channel中。
3.childGroup.register(child).addListener(new ChannelFutureListener()),后面这个异步listener做用很明确,问题是这个childGroup是什么鬼?我理解应该就是worker线程了。详细说一下childGroup.register(child),继续跟下去,跟到AbstractChannel$AbstractUnsafe中
(代码八)
1 @Override 2 public final void register(EventLoop eventLoop, final ChannelPromise promise) { 3 ... 4 AbstractChannel.this.eventLoop = eventLoop; 5
6 if (eventLoop.inEventLoop()) { 7 register0(promise); 8 } else { 9 ... 10 } catch (Throwable t) { 11 } 12 } 13 }
继续register0:
(代码九)
1 private void register0(ChannelPromise promise) { 2 try { 3 if (!promise.setUncancellable() || !ensureOpen(promise)) { 4 return; 5 } 6 boolean firstRegistration = neverRegistered; 7 doRegister(); 8 neverRegistered = false; 9 registered = true; 10 safeSetSuccess(promise); 11 pipeline.fireChannelRegistered(); 12 if (firstRegistration && isActive()) { 13 pipeline.fireChannelActive(); 14 } 15 } catch (Throwable t) { 16 } 17 }
这里核心有两步:
1.doRegister(),其实咱们在上篇文章中分析过,就是将channel绑定到selector上。此处有点懵逼,我猜想是绑定到worker线程的selector中,若是有大神知道请留言个人微博。
2.pipeline.fireChannelRegistered(),继续往下跟跟进到ChannelInboundHandler的channelRegistered方法中,而此时会调用咱们定义的ChannelInitializer,将咱们定义的handler注册到pipeline中。
至此【代码一主线】执行完毕,咱们浏览了一遍boss线程的在接收socket请求期间的处理流程,过程当中是结合reactor模式去理解的,有些地方本身也有点不懂,还请各位指正。
总结一下:
1.boss线程就是个loop循环,打开selector -> 得到监听到的SelectionKey -> 处理I/O请求 -> 处理非I/O请求,而咱们最关心的就是处理I/O请求(在processSelectedKeys()方法中完成)。
2.遍历准备就绪的SelectionKey,根据其可操做类型(read or write。。)来决定下一步的具体操做,咱们着重去了解了read逻辑。
3.NioServerSocketChannel调用父类AbstractNioMessageChannel的unsafe类NioMessageUnsafe来处理读取逻辑:调用pipeline处理readbuf。
4.pipeline.fireChannelRead会调用ServerBootstrapAcceptor的channelRead:初始化客户端channel参数,将该channel绑定到worker线程的selector中,为channel注册用户定义的handler链。
再精炼一点:
boss线程只是接收客户端socket并初始化客户端channle,将channel丢给acceptor,acceptor会将这个channel注册到worker线程中。整个loop过程都是一个非阻塞过程(所有异步化),同时boss中不会作耗时的I/O读取,只是将channel丢给worker。所以是一个高效的loop过程。
下文中咱们将分析worker线程的处理流程,敬请期待。。。