Tomcat系列(二)- EndPoint源码解析

在上一节中咱们描述了Tomcat的总体架构,java

咱们知道了Tomcat分为两个大组件,一个链接器和一个容器。编程

而咱们此次要讲的 EndPoint的组件就是属于链接器里面的。服务器

它是一个通讯的端点,就是负责对外实现TCP/IP协议。架构

EndPoint是个接口,app

它的具体实现类就是 AbstractEndpoint,而 AbstractEndpoint具体的实现类就有 AprEndpointNio2EndpointNioEndpoint框架

  • AprEndpoint:对应的是APR模式,简单理解就是从操做系统级别解决异步IO的问题,大幅度提升服务器的处理和响应性能。可是启用这种模式须要安装一些其余的依赖库。异步

  • Nio2Endpoint:利用代码来实现异步IOsocket

  • NioEndpoint:利用了JAVA的NIO实现了非阻塞IO,Tomcat默认启动是以这个来启动的,而这个也是咱们的讲述重点。ide

NioEndpoint中重要的组件

  咱们知道 NioEndpoint的原理仍是对于Linux的多路复用器的使用,而在多路复用器中简单来讲就两个步骤。模块化

    1. 建立一个Selector,在它身上注册各类Channel,而后调用select方法,等待通道中有感兴趣的事件发生。

    2. 若是有感兴趣的事情发生了,例如是读事件,那么就将信息从通道中读取出来。

  而 NioEndpoint为了实现上面这两步,用了五个组件来。

  这五个组件是 LimitLatchAcceptorPollerSocketProcessorExecutor

/**
 * Threads used to accept new connections and pass them to worker threads.
 */
protected List<Acceptor<U>> acceptors;

/**
 * counter for nr of connections handled by an endpoint
 */
private volatile LimitLatch connectionLimitLatch = null;

/**
 * The socket pollers.
 */
private Poller[] pollers = null;

// 内部类
SocketProcessor

/**
 * External Executor based thread pool.
 */
private Executor executor = null;
View Code

  咱们能够看到在代码中定义的这五个组件。具体这五个组件是干吗的呢?

    • LimitLatch:链接控制器,负责控制最大的链接数

    • Acceptor:负责接收新的链接,而后返回一个 Channel对象给 Poller

    • Poller:能够将其当作是NIO中 Selector,负责监控 Channel的状态

    • SocketProcessor:能够当作是一个被封装的任务类

    • Executor:Tomcat本身扩展的线程池,用来执行任务类

  用图简单表示就是如下的关系

  

  接下来咱们就来分别的看一下每一个组件里面关键的代码

LimitLatch

  咱们上面说了 LimitLatch主要是用来控制Tomcat所能接收的最大数量链接,若是超过了此链接,那么Tomcat就会将此链接线程阻塞等待,等里面有其余链接释放了再消费此链接。

  那么 LimitLatch是如何作到呢?咱们能够看 LimitLatch这个类

public class LimitLatch {
    private static final Log log = LogFactory.getLog(LimitLatch.class);
    
    private class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1L;
        public Sync() {}
        @Override
        protected int tryAcquireShared(int ignored) {
            long newCount = count.incrementAndGet();
            if (!released && newCount > limit) {
                // Limit exceeded
                count.decrementAndGet();
                return -1;
            } else {
                return 1;
            }
        }
        @Override
        protected boolean tryReleaseShared(int arg) {
            count.decrementAndGet();
            return true;
        }
    }

    private final Sync sync;
    private final AtomicLong count;
    private volatile long limit;
    private volatile boolean released = false;
}
View Code

  咱们能够看到它内部实现了 AbstractQueuedSynchronizer,AQS其实就是一个框架,实现它的类能够自定义控制线程何时挂起何时释放。

  limit参数就是控制的最大链接数。

  咱们能够看到 AbstractEndpoint调用 LimitLatchcountUpOrAwait方法来判断是否能获取链接。

public void countUpOrAwait() throws InterruptedException {
        if (log.isDebugEnabled()) {
            log.debug("Counting up["+Thread.currentThread().getName()+"] latch="+getCount());
        }
        sync.acquireSharedInterruptibly(1);
    }
View Code

  AQS是如何知道何时阻塞线程呢?即不能获取链接呢?

  这些就靠用户本身实现 AbstractQueuedSynchronizer本身来定义何时获取链接,何时释放链接了。

  能够看到Sync类重写了 tryAcquireSharedtryReleaseShared方法。

  在 tryAcquireShared方法中定义了一旦当前链接数大于了设置的最大链接数,那么就会返回 -1表示将此线程放入AQS队列中等待。

Acceptor

  Acceptor是接收链接的,咱们能够看到 Acceptor实现了 Runnable接口,那么在哪会新开启线程来执行 Acceptor的run方法呢?

  在 AbstractEndpointstartAcceptorThreads方法中。

protected final void startAcceptorThreads() {
    int count = getAcceptorThreadCount();
    acceptors = new Acceptor[count];

    for (int i = 0; i < count; i++) {
        acceptors[i] = createAcceptor();
        String threadName = getName() + "-Acceptor-" + i;
        acceptors[i].setThreadName(threadName);
        Thread t = new Thread(acceptors[i], threadName);
        t.setPriority(getAcceptorThreadPriority());
        t.setDaemon(getDaemon());
        t.start();
    }
}
View Code

  能够看到这里能够设置开启几个 Acceptor,默认是一个。

  而一个端口只能对应一个 ServerSocketChannel,那么这个 ServerSocketChannel在哪初始化呢?咱们能够看到在 Acceptor<U>acceptor=newAcceptor<>(this);

  这句话中传入了this进去,那么应该是由 Endpoint组件初始化的链接。

  在 NioEndpointinitServerSocket方法中初始化了链接。

  

  这里面咱们可以看到两点

    1. 在bind方法中的第二个参数表示操做系统的等待队列长度,即Tomcat再也不接受链接时(达到了设置的最大链接数),可是在操做系统层面仍是可以接受链接的,此时就将此链接信息放入等待队列,那么这个队列的大小就是此参数设置

    2. ServerSocketChannel被设置成了阻塞的模式,也就是说是以阻塞方式接受链接的。

    或许会有疑问。在平时的NIO编程中Channel不是都要设置成非阻塞模式吗?

    这里解释一下,若是是设置成非阻塞模式那么就必须设置一个 Selector不断的轮询,可是接受链接只须要阻塞一个通道便可。

  

  这里须要注意一点,每一个 Acceptor在生成 PollerEvent对象放入 Poller队列中时都是随机取出 Poller对象的,

  因此 Poller中的 Queue对象设置成了 SynchronizedQueue<PollerEvent>,由于可能有多个 Acceptor同时向此 Poller的队列中放入 PollerEvent对象。

  具体代码能够看以下,

public Poller getPoller0() {
    int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
    return pollers[idx];
}
View Code

什么是操做系统级别的链接呢?

在TCP的三次握手中,系统一般会每个LISTEN状态的Socket维护两个队列,一个是半链接队列(SYN):

这些链接已经收到客户端SYN;另外一个是全链接队列(ACCEPT):

这些连接已经收到客户端的ACK,完成了三次握手,等待被应用调用accept方法取走使用。

  全部的 Acceptor共用这一个链接,在 Acceptorrun方法中,放一些重要的代码。

public void run(){
    // Loop until we receive a shutdown command
    while(endpoint.isRunning()){
        try{
            //若是到了最大链接数,线程等待
            endpoint.countUpOrAwaitConnection();
            U socket = null;             
            try{
                //调用accept方法得到一个链接
                socket = endpoint.serverSocketAccept();
            }catch(Exception ioe){                        
                // 出异常之后当前链接数减掉1
                endpoint.countDownConnection();                   
            }
            // 配置Socket
            if(endpoint.isRunning() && !endpoint.isPaused()){
                // setSocketOptions() will hand the socket off to                   
                // an appropriate processor if successful                      
                if(!endpoint.setSocketOptions(socket)){
                    endpoint.closeSocket(socket)        
                }
            } else {
                endpoint.destroySocket(socket);                    
            }        
        }
    }
}    
View Code

  里面咱们能够获得两点

    1. 运行时会先判断是否到达了最大链接数,若是到达了那么就阻塞线程等待,里面调用的就是 LimitLatch组件判断的。

    2. 最重要的就是配置socket这一步了,是 endpoint.setSocketOptions(socket)这段代码

  

  其实里面重要的就是将 Acceptor与一个 Poller绑定起来,而后两个组件经过队列通讯,每一个Poller都维护着一个 SynchronizedQueue队列, ChannelEvent放入到队列中,而后 Poller从队列中取出事件进行消费。

Poller

  咱们能够看到 PollerNioEndpoint的内部类,而它也是实现了 Runnable接口,能够看到在其类中维护了一个Quene和Selector,定义以下。

  因此本质上 Poller就是 Selector

private Selector selector;
private final SynchronizedQueue<PollerEvent> events =new SynchronizedQueue<>();
View Code

  重点在其run方法中,这里删减了一些代码,只展现重要的。

  

  其中主要的就是调用了 events()方法,就是不断的查看队列中是否有 Pollerevent事件,若是有的话就将其取出而后把里面的 Channel取出来注册到该 Selector中,而后不断轮询全部注册过的 Channel查看是否有事件发生。

SocketProcessor

  咱们知道 Poller在轮询 Channel有事件发生时,就会调用将此事件封装起来,而后交给线程池去执行。

  那么这个包装类就是 SocketProcessor

  而咱们打开此类,可以看到它也实现了 Runnable接口,用来定义线程池 Executor中线程所执行的任务。

  那么这里是如何将 Channel中的字节流转换为Tomcat须要的 ServletRequest对象呢?其实就是调用了 Http11Processor来进行字节流与对象的转换的。

Executor

  Executor实际上是Tomcat定制版的线程池。咱们能够看它的类的定义,能够发现它实际上是扩展了Java的线程池。

public interface Executor extends java.util.concurrent.Executor, Lifecycle
View Code

  在线程池中最重要的两个参数就是核心线程数和最大线程数,正常的Java线程池的执行流程是这样的。

    1. 若是当前线程小于核心线程数,那么来一个任务就建立一个线程。

    2. 若是当前线程大于核心线程数,那么就再来任务就将任务放入到任务队列中。全部线程抢任务。

    3. 若是队列满了,那么就开始建立临时线程。

    4. 若是总线程数到了最大的线程数而且队列也满了,那么就抛出异常。

  可是在Tomcat自定义的线程池中是不同的,经过重写了 execute方法实现了本身的任务处理逻辑。

    1. 若是当前线程小于核心线程数,那么来一个任务就建立一个线程。

    2. 若是当前线程大于核心线程数,那么就再来任务就将任务放入到任务队列中。全部线程抢任务。

    3. 若是队列满了,那么就开始建立临时线程。

    4. 若是总线程数到了最大的线程数,再次得到任务队列,再尝试一次将任务加入队列中。

    5. 若是此时仍是满的,就抛异常。

  差异就在于第四步的差异,原生线程池的处理策略是只要当前线程数大于最大线程数,那么就抛异常,而Tomcat的则是若是当前线程数大于最大线程数,就再尝试一次,若是仍是满的才会抛异常。

  下面是定制化线程池 execute的执行逻辑。

public void execute(Runnable command, long timeout,TimeUnit unit){
    submittedCount.incrementAndGet();  
    try{ 
        super.execute(command);
    }catch(RejectedExecutionException rx){            
        if(super.getQueue() instanceof TaskQueue){

            //得到任务队列             
            final TaskQueue queue = (TaskQueue)super.getQueue();
            try{            
                if(!queue.force(command,timeout,unit)){
                    submittedCount.decrementAndGet();                
                    throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));                   
                }         
            }catch(InterruptedException x){
                submittedCount.decrementAndGet();
                throw new RejectedExecutionException(x);            
            }    
        }else{
            submittedCount.decrementAndGet();
            throw rx;        
        }   
    }
}
View Code

  在代码中,咱们能够看到有这么一句 submittedCount.incrementAndGet();

  为何会有这句呢?咱们能够看看这个参数的定义。

  简单来讲这个参数就是定义了任务已经提交到了线程池中,可是尚未执行的任务个数。

private final AtomicInteger submittedCount = new AtomicInteger(0);
View Code

  为何会有这么一个参数呢?

  咱们知道定制的队列是继承了 LinkedBlockingQueue,而 LinkedBlockingQueue队列默认是没有边界的。

  因而咱们就传入了一个参数, maxQueueSize给构造的队列。

  可是在Tomcat的任务队列默认状况下是无限制的,那么这样就会出一个问题,若是当前线程达到了核心线程数,则开始向队列中添加任务,那么就会一直是添加成功的。

  那么就不会再建立新的线程。那么在什么状况下要新建线程呢?

线程池中建立新线程会有两个地方,一个是小于核心线程时,来一个任务建立一个线程。另外一个是超过核心线程而且任务队列已满,则会建立临时线程。

  那么如何规定任务队列是否已满呢?若是设置了队列的最大长度固然好了,可是Tomcat默认状况下是没有设置,因此默认是无限的。因此Tomcat的 TaskQueue继承了 LinkedBlockingQueue,重写了 offer方法,在里面定义了何时返回false。

  

  这就是 submittedCount的意义,目的就是为了在任务队列长度无限的状况下,让线程池有机会建立新的线程。

总结

  上面的知识有部分是看着李号双老师的深刻拆解Tomcat总结的,又结合着源码深刻了解了一下,当时刚看文章的时候以为本身都懂了,可是再深刻源码的时候又会发现本身不懂。

  因此知识若是只是看了而不运用,那么知识永远都不会是本身的。

  经过Tomcat链接器这一小块的源码学习,除了一些经常使用知识的实际运用,例如AQS、锁的应用、自定义线程池须要考虑的点、NIO的应用等等。

  还有整体上的设计思惟的学习,模块化设计,和现在的微服务感受很类似,将一个功能点内部分为多种模块,这样不管是在之后替换或者是升级时都能游刃有余。

相关文章
相关标签/搜索