那些有趣的代码(一)--有点萌的 Tomcat 的线程池

最近抓紧时间看看了看 Tomcat 的源代码。发现了一些有趣的代码,这里和你们分享一下。java

Tomcat 做为一个老牌的 servlet 容器,处理多线程确定驾轻就熟,为了能保证多线程环境下的高效,必然使用了线程池。tomcat

可是,Tomcat 并无直接使用 j.u.c 里面的线程池,而是对线程池进行了扩展,首先咱们回忆一下,j.u.c 中的线程池的几个核心参数是怎么配合的:多线程

  1. 若是当前运行的线程,少于corePoolSize,则建立一个新的线程来执行任务。
  2. 若是运行的线程等于或多于 corePoolSize,将任务加入 BlockingQueue。
  3. 若是 BlockingQueue 内的任务超过上限,则建立新的线程来处理任务。
  4. 若是建立的线程超出 maximumPoolSize,任务将被拒绝策略拒绝。

这个时候咱们来仔细看看 Tomcat 的代码:less

首先写了一个 TaskQueue 继承了非阻塞无界队列 LinkedBlockingQueue<Runnable> 并重写了的 offer 方法:ide

@Override
public boolean offer(Runnable o) {
    //we can't do any checks
    if (parent==null) return super.offer(o);
    //we are maxed out on threads, simply queue the object
    if (parent.getPoolSize() == parent.getMaximumPoolSize()){
        return super.offer(o);
    }
    //we have idle threads, just add it to the queue
    if (parent.getSubmittedCount()<=(parent.getPoolSize())) {
        return super.offer(o);
    }
    //if we have less threads than maximum force creation of a new thread
    if (parent.getPoolSize()<parent.getMaximumPoolSize()) {
    return false;
    }  
    //if we reached here, we need to add it to the queue
    return super.offer(o);
}
复制代码

在提交任务的时候,增长了几个分支判断。优化

首先咱们看看 parent 是什么:this

private transient volatile ThreadPoolExecutor parent = null;
复制代码

这里须要特别注意这里的 ThreadPoolExecutor 并非 jdk里面的 java.util.concurrent.ThreadPoolExecutor 而是 tomcat 本身实现的。spa

咱们分别来看 offer 中的几个 if 分支。线程

首先咱们须要明确一下,当一个线程池须要调用阻塞队列的 offer 的时候,说明线程池的核心线程数已经被占满了。(记住这个前提很是重要)code

要理解下面的代码,首先须要复习一下线程池的 getPoolSize() 获取的是什么?咱们看源码:

/** * Returns the current number of threads in the pool. * * @return the number of threads */
public int getPoolSize() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // Remove rare and surprising possibility of
        // isTerminated() && getPoolSize() > 0
        return runStateAtLeast(ctl.get(), TIDYING) ? 0
            : workers.size();
    } finally {
        mainLock.unlock();
    }
}
复制代码

须要注意的是,workers.size() 包含了 coreSize 的核心线程和临时建立的小于 maxSize 的临时线程。

先看第一个 if

// 若是线程池的工做线程数等于 线程池的最大线程数,这个时候没有工做线程了,就尝试加入到阻塞队列中
if (parent.getPoolSize() == parent.getMaximumPoolSize()){
    return super.offer(o);
}
复制代码

通过第一个 if 以后,线程数必然在核心线程数和最大线程数之间。

if (parent.getSubmittedCount()<=(parent.getPoolSize())) {
    return super.offer(o);
}
复制代码

对于 parent.getSubiitedCount() ,咱们要先搞清楚 submiitedCount 是什么

/** * The number of tasks submitted but not yet finished. This includes tasks * in the queue and tasks that have been handed to a worker thread but the * latter did not start executing the task yet. * This number is always greater or equal to {@link #getActiveCount()}. */
private final AtomicInteger submittedCount = new AtomicInteger(0);
复制代码

这个数是一个原子类的整数,用于记录提交到线程中,且尚未结束的任务数。包含了在阻塞队列中的任务数和正在被执行的任务数两部分之和 。

因此这行代码的策略是,若是已提交的线程数小于等于线程池中的线程数,代表这个时候还有空闲线程,直接加入阻塞队列中。为何会有这种状况发生?其实个人理解是,以前建立的临时线程尚未被回收,这个时候直接把线程加入到队列里面,天然就会被空闲的临时线程消费掉了。

咱们继续往下看:

//if we have less threads than maximum force creation of a new thread
if (parent.getPoolSize()<parent.getMaximumPoolSize()) {
    return false;
}
复制代码

因为上一个 if 条件的存在,走到这个 if 条件的时候,提交的线程数已经大于核心线程数了,且没有空闲线程,因此返回一个 false 标明,表示任务添加到阻塞队列失败。线程池就会认为阻塞队列已经没法继续添加任务到队列中了,根据默认线程池的工做逻辑,线程池就会建立新的线程直到最大线程数。

回忆一下 jdk 默认线程池的实现,若是阻塞队列是无界的,任务会无限的添加到无界的阻塞队列中,线程池就没法利用核心线程数和最大线程数之间的线程数了。

Tomcat 的实现就是为了,线程池即便核心线程数满了之后,且使用无界队列的时候,线程池依然有机会建立新的线程,直到达到线程池的最大线程数。

Tomcat 对线程池的优化并没结束,Tomcat 还重写了线程池的 execute 方法:

public void execute(Runnable command, long timeout, TimeUnit unit) {
    //提交任务数加一
    submittedCount.incrementAndGet();
    try {
        super.execute(command);
    } catch (RejectedExecutionException rx) {
        // 被拒绝之后尝试,再次向阻塞队列中提交任务
        if (super.getQueue() instanceof TaskQueue) {
            final TaskQueue queue = (TaskQueue)super.getQueue();
        try {
            if (!queue.force(command, timeout, unit)) {
                submittedCount.decrementAndGet();
                throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
            }
        } catch (InterruptedException x) {
            submittedCount.decrementAndGet();
            throw new RejectedExecutionException(x);
        }
        } else {
            submittedCount.decrementAndGet();
            throw rx;
        }
    }
}
复制代码

终于到整篇文章的萌点了,就是提交线程的时候,若是被线程池拒绝了,Tomcat 的线程池,还会厚着脸皮再次尝试,调用 force() 方法”强行”的尝试向阻塞队列中添加任务。

tomcat

在群里和朋友讲完 Tomcat 线程池的实现,帆哥给了一个特别厉害的例子。

总结一下:

Tomcat 线程池的逻辑:

  1. 若是当前运行的线程,少于corePoolSize,则建立一个新的线程来执行任务。
  2. 若是线程数大于 corePoolSize了,Tomcat 的线程不会直接把线程加入到无界的阻塞队列中,而是去判断,submittedCount(已经提交线程数)是否等于 maximumPoolSize。
  3. 若是等于,表示线程池已经满负荷运行,不能再建立线程了,直接把线程提交到队列.
  4. 若是不等于,则须要判断,是否有空闲线程能够消费。
  5. 若是有空闲线程则加入到阻塞队列中,等待空闲线程消费。
  6. 若是没有空闲线程,尝试建立新的线程。(这一步保证了使用无界队列,仍然能够利用线程的 maximumPoolSize)。
  7. 若是总线程数达到 maximumPoolSize,则继续尝试把线程加入 BlockingQueue 中。
  8. 若是 BlockingQueue 达到上限(假如设置了上限),被默认线程池启动拒绝策略,tomcat 线程池会 catch 住拒绝策略抛出的异常,再次把尝试任务加入中 BlockingQueue 中。
  9. 再次加入失败,启动拒绝策略。

如此努力的 Tomcat 线程池,有点萌啊。

相关文章
相关标签/搜索