我的单方面认为,NIO与BIO的最大区别在于主动和被动,使用BIO的方式须要等待被调用方返回数据,很明显此时调用者是被动的。java
举个例子apache
阻塞IO
假设你是一个胆小又害羞的男孩子,你约了隔壁测试的妹子,但你并不敢主动约会,因此你把本身的手机号码给她,并暗示她想要约会的时候打电话给你。很明显此时你陷入了被动,约不约会的结果须要妹子主动告知你,若是她忘了,那么你要陷入长时间的等待中以及无尽的猜想和自我怀疑中(太惨了)。[若是你是一个胆小害羞又好色的男孩子,那就惨了]缓存
非阻塞IO 咱们知道,渣男一般有不少的备胎,我管这个叫作备胎池(SpareTirePool), 那么当他想要约会的时候,只要群发问妹子要不要约会,若是要约会的话就和妹子约会,约会结束以后,处理其余约会事件,若是没有继续下一次询问。在这个例子中约会能够视为IO事件,问妹子的过程能够视为备胎池的轮询。tomcat
若是你要学习NIO,能够学习网络
既然是网络通讯的I/O那必然有如下两个步骤并发
关键代码在 package org.apache.tomcat.util.net.NioEndpoint 中app
P.S. 文章太长,若是不想看能够直接阅读结论socket
在最开始看代码,是震惊的,真的,若是你看Reactor模型的话ide
如下bind方法代码是启动ServerSocket的流程,主要流程以下高并发
@Override public void bind() throws Exception { if (!getUseInheritedChannel()) { serverSock = ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort())); serverSock.socket().bind(addr,getAcceptCount()); } else { // Retrieve the channel provided by the OS Channel ic = System.inheritedChannel(); if (ic instanceof ServerSocketChannel) { serverSock = (ServerSocketChannel) ic; } if (serverSock == null) { throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited")); } } // 以阻塞的方式来接收链接!! serverSock.configureBlocking(true); //mimic APR behavior // 设置Acceptor和Poller的数量 if (acceptorThreadCount == 0) { // FIXME: Doesn't seem to work that well with multiple accept threads // 顾名思义,Acceptor是用来处理新链接的 acceptorThreadCount = 1; } if (pollerThreadCount <= 0) { // Poller 用来处理I/O事件 pollerThreadCount = 1; } setStopLatch(new CountDownLatch(pollerThreadCount)); // Initialize SSL if needed initialiseSsl(); // 今后处能够看出tomcat池化了selector selectorPool.open(); }
先说结论,Tomcat NIO模型中有如下关键角色
Acceptor的主要工做就是不断接收来自客户端的链接,在简单处理以后将该链接交给Poller处理
接收来自客户端链接, 若是你不想看代码,如下是其主要流程
@Override public void run() { int errorDelay = 0; // running的检测贯穿了Accpetor的处理流程,在每次关键操做的时候都会执行检测 while (running) { // 若是进入暂停状态则每隔一段时间检测一下 while (paused && running) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } } // 再次检测 if (!running) { break; } state = AcceptorState.RUNNING; try { //检查是否达到最大链接数若是是则陷入等待,若是不是则增长当前链接数 countUpOrAwaitConnection(); SocketChannel socket = null; try { //接收新链接 socket = serverSock.accept(); } catch (IOException ioe) { // 发生异常,则减小链接数 countDownConnection(); if (running) { handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } else { break; } } // Successful accept, reset the error delay errorDelay = 0; // Configure the socket if (running && !paused) { //setSocketOptions会致使将该链接交给Poller处理 if (!setSocketOptions(socket)) { closeSocket(socket); } } else { closeSocket(socket); } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("endpoint.accept.fail"), t); } } state = AcceptorState.ENDED; }
再来看看setSocketOptions作了什么,不想看代码的话,总结以下
protected boolean setSocketOptions(SocketChannel socket) { // Process the connection try { //设置为非阻塞模式,以便经过selector进行查询 socket.configureBlocking(false); Socket sock = socket.socket(); socketProperties.setProperties(sock); //从对象池中获取一个NioChannel,tomcat会复用一切能够复用的对象以减小建立新对象所带来的消耗 NioChannel channel = nioChannels.pop(); if (channel == null) { // 没有获取到,那就新建一个呗 SocketBufferHandler bufhandler = new SocketBufferHandler( socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); // SSL这一块还没研究 if (isSSLEnabled()) { channel = new SecureNioChannel(socket, bufhandler, selectorPool, this); } else { channel = new NioChannel(socket, bufhandler); } } else { channel.setIOChannel(socket); //从新设置SocketBufferHandler,将其设置为可写和可读 channel.reset(); } //从Poller池中获取一个Poller(按照次序获取,能够理解为一个圆环),并将Channel注册到上面 getPoller0().register(channel); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); try { log.error("",t); } catch (Throwable tt) { ExceptionUtils.handleThrowable(tt); } // Tell to close the socket return false; } return true; }
从链接注册到Poller提及
具体说明见代码
关键点:对一个数A取余会将余数的结果限制在A的范围内
/** * Return an available poller in true round robin fashion. * 很明显,取余的方式揭示了获取Poller的方法。你能够理解为 * Poller会组成一个圆环,这样咱们就能够经过不断递增获取 * 下一个Poller,可是数据会溢出因此咱们要取绝对值 * @return The next poller in sequence */ public Poller getPoller0() { int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length; return pollers[idx]; }
该方法会对新的建的链接进行封装,并以PollerEvent的形式注册到相应的Poller中
须要注意的是,真正的注册读事件并非在此方法注册的(当前方法调用者为Acceptor线程),而是在Poller线程中注册读事件的
/** * Registers a newly created socket with the poller. * 将新建的socket注册到Poller上 * @param socket The newly created socket */ public void register(final NioChannel socket) { //如下代码为设置各类参数,能够从方法名进行推测,再也不进行叙述 socket.setPoller(this); NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this); socket.setSocketWrapper(ka); ka.setPoller(this); ka.setReadTimeout(getSocketProperties().getSoTimeout()); ka.setWriteTimeout(getSocketProperties().getSoTimeout()); ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); ka.setSecure(isSSLEnabled()); ka.setReadTimeout(getConnectionTimeout()); ka.setWriteTimeout(getConnectionTimeout()); //从缓存中获取一个PollerEvent PollerEvent r = eventCache.pop(); // 注册读事件 ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. // 若是没有从缓存中获取,那么就新建一个 if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); else r.reset(socket,ka,OP_REGISTER); addEvent(r); }
Poller 处理I/O事件的的代码较长,并且细节也较多,总结其主要做用以下
@Override public void run() { // Loop until destroy() is called // 一直循环直到destroy方法被调用 while (true) { boolean hasEvents = false; try { if (!close) { // events 方法会处理Acceptor注册到Poller中的PollerEvent // 主要是注册读事件 hasEvents = events(); if (wakeupCounter.getAndSet(-1) > 0) { //if we are here, means we have other stuff to do //do a non blocking select keyCount = selector.selectNow(); } else { keyCount = selector.select(selectorTimeout); } wakeupCounter.set(0); } // 检测到关闭,则处理剩余的事件并关闭selector if (close) { // 处理Acceptors注册到Poller中的PollerEvent events(); //selector time out 或者poller被关闭就会调用timeout方法 timeout(0, false); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe); } break; } } catch (Throwable x) { ExceptionUtils.handleThrowable(x); log.error("",x); continue; } //either we timed out or we woke up, process events first if ( keyCount == 0 ) hasEvents = (hasEvents | events()); // 执行 select 操做,查询I/O事件 Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (attachment == null) { iterator.remove(); } else { iterator.remove(); // 处理检测到的I/O事件 processKey(sk, attachment); } }//while //timeout 会检查是否关闭,若是已经关闭而且有事件未处理会调用cancelledKey方法 //cancelledKey:该方法主要是对和该链接相关的资源执行关闭操做 timeout(keyCount,hasEvents); }//while getStopLatch().countDown(); }
processKey主要工做以下
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) { try { if ( close ) { // 若是Poller关闭则关闭和释放和此链接相关的资源 cancelledKey(sk); } else if ( sk.isValid() && attachment != null ) { if (sk.isReadable() || sk.isWritable() ) { if ( attachment.getSendfileData() != null ) { processSendfile(sk,attachment, false); } else { // 取消注册事件 // sk.interestOps()& (~readyOps) unreg(sk, attachment, sk.readyOps()); boolean closeSocket = false; // Read goes before write 先读后写 if (sk.isReadable()) { // 关键代码,调用processSocket方法处理读事件 if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) { closeSocket = true; } } if (!closeSocket && sk.isWritable()) { if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) { closeSocket = true; } } if (closeSocket) { cancelledKey(sk); } } } } else { //invalid key cancelledKey(sk); } } catch ( CancelledKeyException ckx ) { cancelledKey(sk); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error("",t); } }
processSocket定义在org.apache.tomcat.util.net.AbstractEndPoint中, 也就是意味着不管你采用的是BIO仍是NIO或者NIO2最终读写数据都是调用此方法
从代码中能够看出,依然是对象池,依然是再次封装(套娃),并将其提交到线程池中执行,接下来的内容就再也不本次讨论范围内呢。
public boolean processSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) { try { if (socketWrapper == null) { return false; } SocketProcessorBase<S> sc = processorCache.pop(); if (sc == null) { sc = createSocketProcessor(socketWrapper, event); } else { sc.reset(socketWrapper, event); } Executor executor = getExecutor(); if (dispatch && executor != null) { executor.execute(sc); } else { sc.run(); } } catch (RejectedExecutionException ree) { getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree); return false; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full getLog().error(sm.getString("endpoint.process.fail"), t); return false; } return true; }
手抖了,线不怎么♂
LimitLatch 为全部的Acceptor共用,用来限制当前的最大链接数
Acceptor 以阻塞的形式来接收新链接,并将其封装成PollerEvent对象提交到Poller中
Poller 接收来自Acceptor的PollerEvent并注册读事件,以及轮询和其绑定的客户端Socket有无读事件,若是有则执行进一步操做,将其提交到其余地方执行处理(解析Http协议)
学习源码就是为了学习其设计思想. -- 沃兹及.硕德
对象池化 池化对象、池化链接能够大大下降新建对象以及GC所带来的消耗,当须要使用从池中取出来从新设置相关值便可
环形队列 虽然这玩意不新鲜,但配合上原子类,就能够在高并发的状况,高效的获取队列中的下一个元素(环形队列中索引溢出的处理在以前我是没有考虑到的)
阻塞获取连接,非阻塞处理IO事件 与Reactor模型造成强烈的对比,学习NIO的时候思惟被限制住了,认为非阻塞的获取链接会得到更高的性能,但如今状况不必定了(还没测试,哪位老哥试了告诉我一下)
关键操做时,对标志位进行检测 若是你要经过一个标志变量来控制你的线程,且线程循环一次须要相对较长的时间(你代码太长,操做太多)那么最好在执行关键操做以前对你的标志变量进行检查,来决定是否要改变线程的行为(康康poller和Acceptor的代码)
初次学习Tomcat的代码,有理解错误的地方还请大佬指出
转载自 https://juejin.im/post/5daea81b518825630e5d1aa9 做者: 柯三