最近抓紧时间看看了看 Tomcat 的源代码。发现了一些有趣的代码,这里和你们分享一下。java
Tomcat 做为一个老牌的 servlet 容器,处理多线程确定驾轻就熟,为了能保证多线程环境下的高效,必然使用了线程池。tomcat
可是,Tomcat 并无直接使用 j.u.c 里面的线程池,而是对线程池进行了扩展,首先咱们回忆一下,j.u.c 中的线程池的几个核心参数是怎么配合的:多线程
这个时候咱们来仔细看看 Tomcat 的代码:less
首先写了一个 TaskQueue 继承了非阻塞无界队列 LinkedBlockingQueue<Runnable>
并重写了的 offer 方法:ide
@Override
public boolean offer(Runnable o) {
//we can't do any checks
if (parent==null) return super.offer(o);
//we are maxed out on threads, simply queue the object
if (parent.getPoolSize() == parent.getMaximumPoolSize()){
return super.offer(o);
}
//we have idle threads, just add it to the queue
if (parent.getSubmittedCount()<=(parent.getPoolSize())) {
return super.offer(o);
}
//if we have less threads than maximum force creation of a new thread
if (parent.getPoolSize()<parent.getMaximumPoolSize()) {
return false;
}
//if we reached here, we need to add it to the queue
return super.offer(o);
}
复制代码
在提交任务的时候,增长了几个分支判断。优化
首先咱们看看 parent 是什么:this
private transient volatile ThreadPoolExecutor parent = null;
复制代码
这里须要特别注意这里的 ThreadPoolExecutor 并非 jdk里面的 java.util.concurrent.ThreadPoolExecutor 而是 tomcat 本身实现的。spa
咱们分别来看 offer 中的几个 if 分支。线程
首先咱们须要明确一下,当一个线程池须要调用阻塞队列的 offer 的时候,说明线程池的核心线程数已经被占满了。(记住这个前提很是重要)code
要理解下面的代码,首先须要复习一下线程池的 getPoolSize() 获取的是什么?咱们看源码:
/** * Returns the current number of threads in the pool. * * @return the number of threads */
public int getPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Remove rare and surprising possibility of
// isTerminated() && getPoolSize() > 0
return runStateAtLeast(ctl.get(), TIDYING) ? 0
: workers.size();
} finally {
mainLock.unlock();
}
}
复制代码
须要注意的是,workers.size() 包含了 coreSize 的核心线程和临时建立的小于 maxSize 的临时线程。
先看第一个 if
// 若是线程池的工做线程数等于 线程池的最大线程数,这个时候没有工做线程了,就尝试加入到阻塞队列中
if (parent.getPoolSize() == parent.getMaximumPoolSize()){
return super.offer(o);
}
复制代码
通过第一个 if 以后,线程数必然在核心线程数和最大线程数之间。
if (parent.getSubmittedCount()<=(parent.getPoolSize())) {
return super.offer(o);
}
复制代码
对于 parent.getSubiitedCount() ,咱们要先搞清楚 submiitedCount 是什么
/** * The number of tasks submitted but not yet finished. This includes tasks * in the queue and tasks that have been handed to a worker thread but the * latter did not start executing the task yet. * This number is always greater or equal to {@link #getActiveCount()}. */
private final AtomicInteger submittedCount = new AtomicInteger(0);
复制代码
这个数是一个原子类的整数,用于记录提交到线程中,且尚未结束的任务数。包含了在阻塞队列中的任务数和正在被执行的任务数两部分之和 。
因此这行代码的策略是,若是已提交的线程数小于等于线程池中的线程数,代表这个时候还有空闲线程,直接加入阻塞队列中。为何会有这种状况发生?其实个人理解是,以前建立的临时线程尚未被回收,这个时候直接把线程加入到队列里面,天然就会被空闲的临时线程消费掉了。
咱们继续往下看:
//if we have less threads than maximum force creation of a new thread
if (parent.getPoolSize()<parent.getMaximumPoolSize()) {
return false;
}
复制代码
因为上一个 if 条件的存在,走到这个 if 条件的时候,提交的线程数已经大于核心线程数了,且没有空闲线程,因此返回一个 false 标明,表示任务添加到阻塞队列失败。线程池就会认为阻塞队列已经没法继续添加任务到队列中了,根据默认线程池的工做逻辑,线程池就会建立新的线程直到最大线程数。
回忆一下 jdk 默认线程池的实现,若是阻塞队列是无界的,任务会无限的添加到无界的阻塞队列中,线程池就没法利用核心线程数和最大线程数之间的线程数了。
Tomcat 的实现就是为了,线程池即便核心线程数满了之后,且使用无界队列的时候,线程池依然有机会建立新的线程,直到达到线程池的最大线程数。
Tomcat 对线程池的优化并没结束,Tomcat 还重写了线程池的 execute 方法:
public void execute(Runnable command, long timeout, TimeUnit unit) {
//提交任务数加一
submittedCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
// 被拒绝之后尝试,再次向阻塞队列中提交任务
if (super.getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}
复制代码
终于到整篇文章的萌点了,就是提交线程的时候,若是被线程池拒绝了,Tomcat 的线程池,还会厚着脸皮再次尝试,调用 force() 方法”强行”的尝试向阻塞队列中添加任务。
在群里和朋友讲完 Tomcat 线程池的实现,帆哥给了一个特别厉害的例子。
总结一下:
Tomcat 线程池的逻辑:
如此努力的 Tomcat 线程池,有点萌啊。