Tomcat的NIO是基于I/O复用来实现的。对这点必定要清楚,否则咱们的讨论就不在一个逻辑线上。下面这张图学习过I/O模型知识的通常都见过,出自《UNIX网络编程》,I/O模型一共有阻塞式I/O,非阻塞式I/O,I/O复用(select/poll/epoll),信号驱动式I/O和异步I/O。这篇文章讲的是I/O复用。html
IO复用.pngjava
这里先来讲下用户态和内核态,直白来说,若是线程执行的是用户代码,当前线程处在用户态,若是线程执行的是内核里面的代码,当前线程处在内核态。更深层来说,操做系统为代码所处的特权级别分了4个级别。不过现代操做系统只用到了0和3两个级别。0和3的切换就是用户态和内核态的切换。更详细的可参照《深刻理解计算机操做系统》。I/O复用模型,是同步非阻塞,这里的非阻塞是指I/O读写,对应的是recvfrom操做,由于数据报文已经准备好,无需阻塞。说它是同步,是由于,这个执行是在一个线程里面执行的。有时候,还会说它又是阻塞的,其实是指阻塞在select上面,必须等到读就绪、写就绪等网络事件。有时候咱们又说I/O复用是多路复用,这里的多路是指N个链接,每个链接对应一个channel,或者说多路就是多个channel。复用,是指多个链接复用了一个线程或者少许线程(在Tomcat中是Math.min(2,Runtime.getRuntime().availableProcessors()))。web
上面提到的网络事件有链接就绪,接收就绪,读就绪,写就绪四个网络事件。I/O复用主要是经过Selector复用器来实现的,能够结合下面这个图理解上面的叙述。apache
Selector图解.png编程
tomcat支持IO类型图.pngtomcat
tomcat从6之后开始支持NIO模型,实现是基于JDK的java.nio包。这里能够看到对read body 和response body是Blocking的。关于这点在第6.3节源代码阅读有重点介绍。服务器
在Connector节点配置protocol="org.apache.coyote.http11.Http11NioProtocol",Http11NioProtocol协议下默认最大链接数是10000,也能够从新修改maxConnections的值,同时咱们能够设置最大线程数maxThreads,这里设置的最大线程数就是Excutor的线程池的大小。在BIO模式下其实是没有maxConnections,即便配置也不会生效,BIO模式下的maxConnections是保持跟maxThreads大小一致,由于它是一请求一线程模式。网络
tomcatnio组成.png并发
咱们要理解tomcat的nio最主要就是对NioEndpoint的理解。它一共包含LimitLatch、Acceptor、Poller、SocketProcessor、Excutor5个部分。LimitLatch是链接控制器,它负责维护链接数的计算,nio模式下默认是10000,达到这个阈值后,就会拒绝链接请求。Acceptor负责接收链接,默认是1个线程来执行,将请求的事件注册到事件列表。有Poller来负责轮询,Poller线程数量是cpu的核数Math.min(2,Runtime.getRuntime().availableProcessors())。由Poller将就绪的事件生成SocketProcessor同时交给Excutor去执行。Excutor线程池的大小就是咱们在Connector节点配置的maxThreads的值。在Excutor的线程中,会完成从socket中读取http request,解析成HttpServletRequest对象,分派到相应的servlet并完成逻辑,而后将response经过socket发回client。在从socket中读数据和往socket中写数据的过程,并无像典型的非阻塞的NIO的那样,注册OP_READ或OP_WRITE事件到主Selector,而是直接经过socket完成读写,这时是阻塞完成的,可是在timeout控制上,使用了NIO的Selector机制,可是这个Selector并非Poller线程维护的主Selector,而是BlockPoller线程中维护的Selector,称之为辅Selector。详细源代码能够参照 第6.3节。异步
tomcatnio序列图.png
在下一小节NioEndpoint源码解读中咱们将对步骤1-步骤11依次找到对应的代码来讲明。
不管是BIO仍是NIO,开始都会初始化链接限制,不可能无限增大,NIO模式下默认是10000。
public void startInternal() throws Exception { if (!running) { //省略代码... initializeConnectionLatch(); //省略代码... } } protected LimitLatch initializeConnectionLatch() { if (maxConnections==-1) return null; if (connectionLimitLatch==null) { connectionLimitLatch = new LimitLatch(getMaxConnections()); } return connectionLimitLatch; }
下面咱们着重叙述跟NIO相关的流程,共分为11个步骤,分别对应上面序列图中的步骤。
步骤1:绑定IP地址及端口,将ServerSocketChannel设置为阻塞。
这里为何要设置成阻塞呢,咱们一直都在说非阻塞。Tomcat的设计初衷主要是为了操做方便。这样这里就跟BIO模式下同样了。只不过在BIO下这里返回的是Socket,NIO下这里返回的是SocketChannel。
public void bind() throws Exception { //省略代码... serverSock.socket().bind(addr,getBacklog()); serverSock.configureBlocking(true); //省略代码... selectorPool.open(); }
步骤2:启动接收线程
public void startInternal() throws Exception { if (!running) { //省略代码... startAcceptorThreads(); } } //这个方法实际是在它的超类AbstractEndpoint里面 protected final void startAcceptorThreads() { int count = getAcceptorThreadCount(); acceptors = new Acceptor[count]; for (int i = 0; i < count; i++) { acceptors[i] = createAcceptor(); Thread t = new Thread(acceptors[i], getName() + "-Acceptor-" + i); t.setPriority(getAcceptorThreadPriority()); t.setDaemon(getDaemon()); t.start(); } }
步骤3:ServerSocketChannel.accept()接收新链接
protected class Acceptor extends AbstractEndpoint.Acceptor { @Override public void run() { while (running) { try { //省略代码... SocketChannel socket = null; try { socket = serverSock.accept();//接收新链接 } catch (IOException ioe) { //省略代码... throw ioe; } //省略代码... if (running && !paused) { if (!setSocketOptions(socket)) { //省略代码... } } else { //省略代码... } } catch (SocketTimeoutException sx) { } catch (IOException x) { //省略代码... } catch (OutOfMemoryError oom) { //省略代码... } catch (Throwable t) { //省略代码... } } } }
步骤4:将接收到的连接通道设置为非阻塞
步骤5:构造NioChannel对象
步骤6:register注册到轮询线程
protected boolean setSocketOptions(SocketChannel socket) { try { socket.configureBlocking(false);//将链接通道设置为非阻塞 Socket sock = socket.socket(); socketProperties.setProperties(sock); NioChannel channel = nioChannels.poll();//构造NioChannel对象 //省略代码... getPoller0().register(channel);//register注册到轮询线程 } catch (Throwable t) { //省略代码... } //省略代码... }
步骤7:构造PollerEvent,并添加到事件队列
protected ConcurrentLinkedQueue<Runnable> events = new ConcurrentLinkedQueue<Runnable>(); public void register(final NioChannel socket) { //省略代码... PollerEvent r = eventCache.poll(); //省略代码... addEvent(r); }
步骤8:启动轮询线程
public void startInternal() throws Exception { if (!running) { //省略代码... // Start poller threads pollers = new Poller[getPollerThreadCount()]; for (int i=0; i<pollers.length; i++) { pollers[i] = new Poller(); Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i); pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true); pollerThread.start(); } //省略代码... } }
步骤9:取出队列中新增的PollerEvent并注册到Selector
public static class PollerEvent implements Runnable { //省略代码... @Override public void run() { if ( interestOps == OP_REGISTER ) { try { socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key); } catch (Exception x) { log.error("", x); } } else { //省略代码... }//end if }//run //省略代码... }
步骤10:Selector.select()
public void run() { // Loop until destroy() is called while (true) { try { //省略代码... try { if ( !close ) { if (wakeupCounter.getAndSet(-1) > 0) { keyCount = selector.selectNow(); } else { keyCount = selector.select(selectorTimeout); } //省略代码... } //省略代码... } catch ( NullPointerException x ) { //省略代码... } catch ( CancelledKeyException x ) { //省略代码... } catch (Throwable x) { //省略代码... } //省略代码... Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); KeyAttachment attachment = (KeyAttachment)sk.attachment(); if (attachment == null) { iterator.remove(); } else { attachment.access(); iterator.remove(); processKey(sk, attachment);//此方法跟下去就是把SocketProcessor交给Excutor去执行 } }//while //省略代码... } catch (OutOfMemoryError oom) { //省略代码... } }//while //省略代码... }
步骤11:根据选择的SelectionKey构造SocketProcessor提交到请求处理线程
public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) { try { //省略代码... SocketProcessor sc = processorCache.poll(); if ( sc == null ) sc = new SocketProcessor(socket,status); else sc.reset(socket,status); if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc); else sc.run(); } catch (RejectedExecutionException rx) { //省略代码... } catch (Throwable t) { //省略代码... } //省略代码... }
上面的序列图有个地方我没有描述,就是NioSelectorPool这个内部类,是由于在总体理解tomcat的nio上面在序列图里面不包括它更好理解。在有了上面的基础后,咱们在来讲下NioSelectorPool这个类,对更深层了解Tomcat的NIO必定要知道它的做用。NioEndpoint对象中维护了一个NioSelecPool对象,这个NioSelectorPool中又维护了一个BlockPoller线程,这个线程就是基于辅Selector进行NIO的逻辑。以执行servlet后,获得response,往socket中写数据为例,最终写的过程调用NioBlockingSelector的write方法。代码以下:
public int write(ByteBuffer buf, NioChannel socket, long writeTimeout,MutableInteger lastWrite) throws IOException { SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); if ( key == null ) throw new IOException("Key no longer registered"); KeyAttachment att = (KeyAttachment) key.attachment(); int written = 0; boolean timedout = false; int keycount = 1; //assume we can write long time = System.currentTimeMillis(); //start the timeout timer try { while ( (!timedout) && buf.hasRemaining()) { if (keycount > 0) { //only write if we were registered for a write //直接往socket中写数据 int cnt = socket.write(buf); //write the data lastWrite.set(cnt); if (cnt == -1) throw new EOFException(); written += cnt; //写数据成功,直接进入下一次循环,继续写 if (cnt > 0) { time = System.currentTimeMillis(); //reset our timeout timer continue; //we successfully wrote, try again without a selector } } //若是写数据返回值cnt等于0,一般是网络不稳定形成的写数据失败 try { //开始一个倒数计数器 if ( att.getWriteLatch()==null || att.getWriteLatch().getCount()==0) att.startWriteLatch(1); //将socket注册到辅Selector,这里poller就是BlockSelector线程 poller.add(att,SelectionKey.OP_WRITE); //阻塞,直至超时时间唤醒,或者在尚未达到超时时间,在BlockSelector中唤醒 att.awaitWriteLatch(writeTimeout,TimeUnit.MILLISECONDS); }catch (InterruptedException ignore) { Thread.interrupted(); } if ( att.getWriteLatch()!=null && att.getWriteLatch().getCount()> 0) { keycount = 0; }else { //还没超时就唤醒,说明网络状态恢复,继续下一次循环,完成写socket keycount = 1; att.resetWriteLatch(); } if (writeTimeout > 0 && (keycount == 0)) timedout = (System.currentTimeMillis() - time) >= writeTimeout; } //while if (timedout) throw new SocketTimeoutException(); } finally { poller.remove(att,SelectionKey.OP_WRITE); if (timedout && key != null) { poller.cancelKey(socket, key); } } return written; }
也就是说当socket.write()返回0时,说明网络状态不稳定,这时将socket注册OP_WRITE事件到辅Selector,由BlockPoller线程不断轮询这个辅Selector,直到发现这个socket的写状态恢复了,经过那个倒数计数器,通知Worker线程继续写socket动做。看一下BlockSelector线程的代码逻辑:
public void run() { while (run) { try { ...... Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; while (run && iterator != null && iterator.hasNext()) { SelectionKey sk = (SelectionKey) iterator.next(); KeyAttachment attachment = (KeyAttachment)sk.attachment(); try { attachment.access(); iterator.remove(); ; sk.interestOps(sk.interestOps() & (~sk.readyOps())); if ( sk.isReadable() ) { countDown(attachment.getReadLatch()); } //发现socket可写状态恢复,将倒数计数器置位,通知Worker线程继续 if (sk.isWritable()) { countDown(attachment.getWriteLatch()); } }catch (CancelledKeyException ckx) { if (sk!=null) sk.cancel(); countDown(attachment.getReadLatch()); countDown(attachment.getWriteLatch()); } }//while }catch ( Throwable t ) { log.error("",t); } } events.clear(); try { selector.selectNow();//cancel all remaining keys }catch( Exception ignore ) { if (log.isDebugEnabled())log.debug("",ignore); } }
使用这个辅Selector主要是减小线程间的切换,同时还可减轻主Selector的负担。
下面这份报告是咱们压测的一个结果,跟想象的是否是不太同样?几乎没有差异,实际上NIO优化的是I/O的读写,若是瓶颈不在这里的话,好比传输字节数很小的状况下,BIO和NIO其实是没有差异的。NIO的优点更在于用少许的线程hold住大量的链接。还有一点,咱们在压测的过程当中,遇到在NIO模式下刚开始的一小段时间内容,会有错误,这是由于通常的压测工具是基于一种长链接,也就是说好比模拟1000并发,那么同时创建1000个链接,下一时刻再发送请求就是基于先前的这1000个链接来发送,还有TOMCAT的NIO处理是有POLLER线程来接管的,它的线程数通常等于CPU的核数,若是一瞬间有大量并发过来,POLLER也会顿时处理不过来。
压测1.jpeg
压测2.jpeg
NIO只是优化了网络IO的读写,若是系统的瓶颈不在这里,好比每次读取的字节说都是500b,那么BIO和NIO在性能上没有区别。NIO模式是最大化压榨CPU,把时间片都更好利用起来。对于操做系统来讲,线程之间上下文切换的开销很大,并且每一个线程都要占用系统的一些资源如内存,有关线程资源可参照这篇文章《一台java服务器能够跑多少个线程》。所以,使用的线程越少越好。而I/O复用模型正是利用少许的线程来管理大量的链接。在对于维护大量长链接的应用里面更适合用基于I/O复用模型NIO,好比web qq这样的应用。因此咱们要清楚系统的瓶颈是I/O仍是CPU的计算。
转载请注明做者及出处,并附上连接https://my.oschina.net/wangxindong/blog/1562957
参考资料:
http://tomcat.apache.org/tomcat-7.0-doc/config/http.html
http://gearever.iteye.com/blog/1844203 《Tomcat内核设计剖析》 《深刻理解计算机操做系统》 《UNIX网络编程》卷1