NIO2(AIO) 在Tomcat的应用

概述

注: NIO2(AIO) 即异步IOjava

NIO2 简介

AIONIO2实际上是一回事,如同孙行者、者行孙其实都是孙猴子,只是名称不一样本质都同样git

那么如何理解这个概念呢?举个例子github

假设一个妹子有许多的舔狗(SpareTire),若是妹子想要完成某件事最简单、高效的方法是什么?web

答案是,舔狗那么多,交给他们去办就ok了。那么狗子办事期间,妹子会一直等待狗子把事情作好吗?不行,这期间固然能够继续将其余任务派发给其余狗子。当狗子办事期间,若是有须要妹子处理的事情,通知处理一下便可。spring

固然狗子通常都是处理一些重活累活,好比数据拷贝、I/O啊,接收新链接啥的(太惨了)。妹子则专一于核心业务的处理。apache

在这个例子中,妹子至关于核心业务线程,主要用来处理业务逻辑,而狗子们则是(内核+I/O线程)的抽象。后端

P.S.api

  • 若是你了解NIO2,建议你直接阅读NIO2模型解读章节,不须要再阅读NIO2 DEMO章节(时间宝贵)tomcat

  • 你能够直接越过全部章节去看总结,也能够简单阅读附录直接上手调试代码多线程

NIO2 DEMO

NIO2中有个核心点,就是内核负责主要负责通知程序有什么事件,而链接的接收以及数据的拷贝仍是须要程序提供线程来作这些事情,你能够理解为妹子(核心业务线程)须要提供舔狗池(线程池)给内核来作这些事情

talk is cheap, show me your hair

若是你想要学习一下NIO2,能够点击学习

该源码的注释为GBK编码,若是你看到注释为乱码,最好将其改成GBK编码

这是一个Demo,值得注意的是虽然该例子中并无显式的建立线程池,这是由于若是你在open()服务端的时候,若是没有显示指定,系统将会默认分配给ServerSocketChannel一个线程池,用于事件的处理,咱们能够打开JConsole验证一下.

channel = AsynchronousServerSocketChannel.open();
复制代码

JConsole中显示的线程

如图所示thread-0到thread-4就是系统默认分配的线程池,用来处理I/O事件。(天赐舔狗)

想象一下,若是咱们在处理I/O事件的时候将全部线程都阻塞住了,那么整个系统的I/O都将陷入阻塞, 以下图所示。

在有新的I/O事件到来的时候,内核会选择一个线程来处理这些I/O事件,若是处理I/O的线程陷入阻塞,那么来自客户端的请求将会一直被阻塞住,没法返回。

所以处理I/O事件的线程最好只处理I/O事件(接收新链接、将数据从内核拷贝到线程中)

你能够理解为,舔狗最好只作舔狗该作的事情,即重活累活,至于核心业务或者会发生阻塞的状况的事件最好提交给妹子(业务逻辑处理线程池)来处理。

Tomcat NIO2 模型

关键类 org.apache.tomcat.util.net.Nio2Endpoint

既然是讲解NIO2的处理模型,那么咱们有必要了解如下关键角色

  • Nio2Acceptor Acceptor并不与特定的线程绑定,而是当由新链接到来从线程池中选择一个线程来执行Acceptor的代码,这一个过程是由底层帮咱们完成的,Acceptor的主要任务是接收新链接,并为该链接注册读写的处理对象

  • LimitLatch 限制链接数,在异步I/O状况限制链接数的主要方式就是锁阻塞用于I/O事件的线程池中的线程

  • I/O处理器 处理I/O的类,与Nio2Acceptor运行在同一个线程池中

ServerSocket的启动

异步ServerSocket启动的流程较为枯燥,若是你不想看代码,如下为其启动的流程

  • 建立线程池,将其包装为AsynchronousChannelGroup
  • 打开ServerSocket
  • 绑定端口, 并设置最大链接数
@Override
    public void bind() throws Exception {

        // 建立线程池
        if (getExecutor() == null) {
            createExecutor();
        }
        if (getExecutor() instanceof ExecutorService) {
            //建立用于I/O的线程池(须要用AsynchronousChannelGroup包装,才能提供给AsynchronousServerSocketChanne用)
            threadGroup = AsynchronousChannelGroup.withThreadPool((ExecutorService) getExecutor());
        }
        // AsynchronousChannelGroup needs exclusive access to its executor service
        if (!internalExecutor) {
            log.warn(sm.getString("endpoint.nio2.exclusiveExecutor"));
        }
        //建立ServerSocketChannel
        serverSock = AsynchronousServerSocketChannel.open(threadGroup);
        socketProperties.setProperties(serverSock);
        InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
        //绑定端口并设置backlog的参数
        //backlog能够理解为当前最大待执行accept操做的链接数
        serverSock.bind(addr, getAcceptCount());

        // Initialize SSL if needed
        initialiseSsl();
    }
复制代码

以下图,就是和当前异步ServerSocketChannel绑定的线程池,801表示该链接器所监听的端口(附录中有开启NIO2的教程)

nio2的线程池

Nio2Acceptor

Nio2Acceptor主要功能接收新链接,并限制最大链接数,由于采用的是异步I/O,因此Acceptor并不会于特定的线程绑定,而是当新任务须要执行的时候,从线程池中选一个执行任务。以下图所示当有客户端新链接到达时,程序会从线程池选择一个线程来执行Nio2Acceptor的completed方法并传入客户端Socket开始执行新链接处理的业务逻辑

断点

AcceptHandler的注册

在异步I/O中咱们须要向ServerSocketChannel注册处理Accept事件的处理器以便完成链接事件的处理 如如下代码所示,当tomcat启动的时候,会开启一个线程调用Nio2SocketAcceptor的run方法,将Nio2SocketAcceptor注册为ServerSocketChannel的accept事件处理器

protected class Nio2Acceptor extends Acceptor<AsynchronousSocketChannel> implements CompletionHandler<AsynchronousSocketChannel, Void> {
    ...
        @Override
        public void run() {
            // The initial accept will be called in a separate utility thread
            if (!isPaused()) {
                // 链接数限制,若是达到最大链接数,则调用此方法的线程会陷入等待
                try {
                    countUpOrAwaitConnection();
                } catch (InterruptedException e) {
                    // Ignore
                }
                if (!isPaused()) {
                   //将本身注册为accept事件的处理器(注意此类实现的接口)
                    serverSock.accept(null, this);
                } else {
                    state = AcceptorState.PAUSED;
                }
            } else {
                state = AcceptorState.PAUSED;
            }
        }
    ...
}
复制代码

新链接的处理

当有新链接到达时,底层会从线程池选择一个线程来执行completed方法并传入客户端socket,此时该方法主要的流程以下

  • 检查是容器否仍在运行若是仍在运行则继续流程
  • 检查是否须要限制链接数,若是须要限制链接数,则从线程池中选择一个线程来执行Acceptor的run方法(此方法可能会发生阻塞)
  • 以上操做均已完成则调用setSocketOptions方法执行后续I/O事件处理,至此新链接的接收完成
@Override
        public void completed(AsynchronousSocketChannel socket, Void attachment) {
            // Successful accept, reset the error delay
            errorDelay = 0;
            // Continue processing the socket on the current thread
            // Configure the socket
            if (isRunning() && !isPaused()) {
                //检查限制的最大链接数,若是没有设置(即-1)则不进行链接数限制
                if (getMaxConnections() == -1) {
                    serverSock.accept(null, this);
                } else {
                   //因为有新链接的到达,所以须要从线程池选一个线程执行增长链接数的操做,此操做可能会发生阻塞
                    getExecutor().execute(this);
                }
                //执行后续的I/O事件处理
                if (!setSocketOptions(socket)) {
                    closeSocket(socket);
                }
            } else {
                if (isRunning()) {
                    state = AcceptorState.PAUSED;
                }
                destroySocket(socket);
            }
        }
复制代码

限制最大链接数的实现

因为Acceptor并不与特定的线程绑定, 所以若是须要限制最大链接数,须要使用锁将空闲的线程阻塞住,这也时为何须要在accept新链接的时候须要向线程池提交增长新链接数的任务,以下所示(也就是调用Nio2SocketAcceptor的run方法)

public void completed(AsynchronousSocketChannel socket, Void attachment) {
        ...
        getExecutor().execute(this);
        ...
}
复制代码

除此以外,还记得在建立ServerSocketChannel的时候咱们设置了backlog参数吗?

该参数主要用于设置当前ServerSocket所容许的最大未accept的链接数,也就是说若是超过了未accept得链接数backlog所设置的值,那么新来的链接都将会被丢弃掉。(API文档)

I/O 事件的处理

既然是异步I/O,那么必然要在客户端Socket注册读写的CompletionHandler, 所以setSocketOptions必然会致使这一步骤的发生,那么此步骤发生在何时呢?

通过Debug跟踪发现setSocketOptions将会致使Nio2SocketWrapper的建立,而实际I/O流程就发生在新建Nio2SocketWrapper对象时所建立的readCompletionHandler中, 如下是其代码

ReadCompletionHandler 用于监听读事件,在读取到数据以后会调用processSocket方法开始数据的解析工做

public Nio2SocketWrapper(Nio2Channel channel, final Nio2Endpoint endpoint) {
            super(channel, endpoint);
            nioChannels = endpoint.getNioChannels();
            socketBufferHandler = channel.getBufHandler();

            this.readCompletionHandler = new CompletionHandler<Integer, ByteBuffer>() {
                @Override
                public void completed(Integer nBytes, ByteBuffer attachment) {
                    if (log.isDebugEnabled()) {
                        log.debug("Socket: [" + Nio2SocketWrapper.this + "], Interest: [" + readInterest + "]");
                    }
                    readNotify = false;
                    //加锁,其余线程可能会对标志位进行修改
                    synchronized (readCompletionHandler) {
                        //nBytes表示读取到的字节数,若是小于0
                        //抛出EOF异常,没数据读,那咋办吗,只好抛异常了
                        if (nBytes.intValue() < 0) {
                            failed(new EOFException(), attachment);
                        } else {
                            if (readInterest && !Nio2Endpoint.isInline()) {
                                readNotify = true;
                            } else {
                                // Release here since there will be no
                                // notify/dispatch to do the release.
                                readPending.release();
                            }
                            readInterest = false;
                        }
                    }
                    if (readNotify) {
                        //处理读事件
                        getEndpoint().processSocket(Nio2SocketWrapper.this, SocketEvent.OPEN_READ, false);
                    }
                }
                //省略代码,后面太长了
                ...
复制代码

debug验证一下,以下图所示,attachment即咱们所读到的数据

断点

注意 在debug的时候IDEA可能发出切换线程的请求(读数据和以前的操做是不在一个线程上的,以下所示

switch thread

总结

Tomcat NIO2 模型

模型
总结以下

  • accept事件和 I/O事件共用一个线程池,不会和特定线程绑定

  • Acceptor(Nio2Acceptor) 用于接收新链接,并注册I/O事件的处理对象

  • LimitLatch经过阻塞住线程池中的线程来实现链接数限制功能

  • I/O Handler 即在Nio2SocketWrapper注册的读写处理器,有I/O事件到达时,程序会选择一个线程来执行这些处理器的代码

  • 整体流程以下 新链接到达->选择一个线程执行Nio2Acceptor的代码->向线程池中提交增长链接数的任务->注册读写处理事件->I/O事件到达,选择一个线程处理I/O事件

思想迁移

不要使用默认线程池 在异步ServerSocketChannel建立的时候,tomcat会本身建立一个线程池,而不是使用默认提供的线程池,因为线程池在咱们掌握之中,由此才实现了链接数限制的功能

不要阻塞I/O线程 I/O线程就要有I/O线程的亚子,不要在I/O线程执行会发生长时间阻塞的操做

附录 如何调试tomcat

后端程序猿都晓得,SpringBoot中内嵌了tomcat(固然还有jetty,取决于你如何选择), 所以咱们能够新建一个SpringBoot应用来专门调试学习Tomcat的源码。

如下为tomcat调试的过程

  • 第一步, 打开IDEA
  • 第二步, 新建SpringBoot工程
  • 第三步, 在项目侧边栏Ctrl+F 查找Tomcat的jar包

红框标注的为tomcat的核心包

  • 第四步, /mute all,带上耳机,打上断点

若是你要测试Tomcat的NIO处理方式,在如下类打断点 (若是你想要了解tomcat中NIO的处理方式,能够看看个人理解)

package org.apache.tomcat.util.net;
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> {
...
public class Poller implements Runnable {
    public void run() {
        //此方法的代码位于692行
    }
}
...
}
复制代码

若是你要测试Tomcat的NIO2的处理方式,则须要如下配置 将如下代码添加到你的代码中。(因为SpringBoot中内嵌的tomcat默认I/O方式为NIO因此咱们须要经过配置增长NIO2的链接器)

import org.apache.catalina.connector.Connector;
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConnectorConf {
    //注意你的SpringBoot版本,此项目的版本是2.2.0,旧的版本1.5使用不一样的类进行配置
    @Bean
    public TomcatServletWebServerFactory servletContainer() {
        TomcatServletWebServerFactory tomcatServletWebServerFactory =
                new TomcatServletWebServerFactory();
        tomcatServletWebServerFactory.addAdditionalTomcatConnectors(getConnector());
        return tomcatServletWebServerFactory;
    }

    private Connector getConnector() {
       // 关键点哦
        Connector connector = new Connector("org.apache.coyote.http11.Http11Nio2Protocol");
        //将链接器的端口设置为801,这样访问801端口的就是NIO2的模式了
        connector.setPort(801);
        return connector;
    }
}
复制代码

org.apache.tomcat.util.net.Nio2Endpoint打上断点就完事了

如何调试多线程

在多线程的状况下,可能会出现进入不了断点的状况,此时只需在断点上右键选择Thread便可, 当其余线程到达断点时IDEA法发出通知,以下图所示

相关文章
相关标签/搜索