### 写在前面的话
最近一直都在研究Java的线程池ThreadPoolExecutor,可是虽然它那么好,可是在实际的用途中怎么去用,对于我来讲就不知道如何下手了,还好有开源社区咱们能够了解不少项目中所运用到的线程池,好比最熟悉的就是Apache Tomcat了,相信都对它不默生,一个Apache软件基金下的一个开源Web容器,因此今天就来聊一下Tomcat的线程池实现。java
### 准备工做
首先去Apache Tomcat的官网下载Tomcat的源代码,这里给出<a href="http://mirrors.tuna.tsinghua.edu.cn/apache/tomcat/tomcat-7/v7.0.72/src/apache-tomcat-7.0.72-src.zip">Tomcat源码连接</a>,下载下来以后,它是一个zip文件,须要把它进行解压到相应的文件夹下面,以便咱们能方便的查看其源代码。分析源码最行之有效的方法就是知道这个类有哪些方法,哪些字段,继承了哪些类,实现了哪些接口,因此咱们这里推荐一款UML工具, *astah*-*professional*,可自行下载安装,这是一个收费软件,可是它有50天的试用期,因此咱们能够以使用的身份使用该软件。准备工做作好以后就能够进行下一步的操做了。apache
### 初探Tomcat线程池
Tomcat的线程池的类文件在../apache-tomcat-7.0.72-src\java\org\apache\catalina\core包下面,定位到这个文件夹下面能够看到StandardThreadExecutor.java就是咱们找寻的类了,用文本工具打开就能够查看其源码了。这里源码以下:
StandardThreadExecutor.java
``` java
public class StandardThreadExecutor extends LifecycleMBeanBase
implements Executor, ResizableExecutor {
//默认线程的优先级
protected int threadPriority = Thread.NORM_PRIORITY;
//守护线程
protected boolean daemon = true;
//线程名称的前缀
protected String namePrefix = "tomcat-exec-";
//最大线程数默认200个
protected int maxThreads = 200;
//最小空闲线程25个
protected int minSpareThreads = 25;
//超时时间为6000
protected int maxIdleTime = 60000;
//线程池容器
protected ThreadPoolExecutor executor = null;
//线程池的名称
protected String name;
//是否提早启动线程
protected boolean prestartminSpareThreads = false;
//队列最大大小
protected int maxQueueSize = Integer.MAX_VALUE;
//为了不在上下文中止以后,全部的线程在同一时间段被更新,因此进行线程的延迟操做
protected long threadRenewalDelay = 1000L;
//任务队列
private TaskQueue taskqueue = null;tomcat
//容器启动时进行,具体可参考org.apache.catalina.util.LifecycleBase#startInternal()
@Override
protected void startInternal() throws LifecycleException {
//实例化任务队列
taskqueue = new TaskQueue(maxQueueSize);
//自定义的线程工厂类,实现了JDK的ThreadFactory接口
TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
//这里的ThreadPoolExecutor是tomcat自定义的,不是JDK的ThreadPoolExecutor
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
executor.setThreadRenewalDelay(threadRenewalDelay);
//是否提早启动线程,若是为true,则提早初始化minSpareThreads个的线程,放入线程池内
if (prestartminSpareThreads) {
executor.prestartAllCoreThreads();
}
//设置任务容器的父级线程池对象
taskqueue.setParent(executor);
//设置容器启动状态
setState(LifecycleState.STARTING);
}
//容器中止时的生命周期方法,进行关闭线程池和资源清理
@Override
protected void stopInternal() throws LifecycleException {多线程
setState(LifecycleState.STOPPING);
if ( executor != null ) executor.shutdownNow();
executor = null;
taskqueue = null;
}
//这个执行线程方法有超时的操做,参考org.apache.catalina.Executor接口
@Override
public void execute(Runnable command, long timeout, TimeUnit unit) {
if ( executor != null ) {
executor.execute(command,timeout,unit);
} else {
throw new IllegalStateException("StandardThreadExecutor not started.");
}
}app
//JDK默认操做线程的方法,参考java.util.concurrent.Executor接口
@Override
public void execute(Runnable command) {
if ( executor != null ) {
try {
executor.execute(command);
} catch (RejectedExecutionException rx) {
//there could have been contention around the queue
if ( !( (TaskQueue) executor.getQueue()).force(command) ) throw new RejectedExecutionException("Work queue full.");
}
} else throw new IllegalStateException("StandardThreadPool not started.");
}less
//因为继承了org.apache.tomcat.util.threads.ResizableExecutor接口,因此能够从新定义线程池的大小
@Override
public boolean resizePool(int corePoolSize, int maximumPoolSize) {
if (executor == null)
return false;eclipse
executor.setCorePoolSize(corePoolSize);
executor.setMaximumPoolSize(maximumPoolSize);
return true;
}
}
```
看完了上面的源码以后,不知此刻的你是一面茫然仍是认为小菜一碟呢,无论怎样,咱们先来看下UML类图吧,了解一下具体的继承关系,你就明白了,废话很少说,能用图片解决的东西尽可能少用文字。ide
工具
接下来,咱们来看一下ResizableExecutor这个接口:
``` java
import java.util.concurrent.Executor;优化
public interface ResizableExecutor extends Executor {
/**
* Returns the current number of threads in the pool.
*
* @return the number of threads
*/
public int getPoolSize();
public int getMaxThreads();
/**
* Returns the approximate number of threads that are actively executing
* tasks.
*
* @return the number of threads
*/
public int getActiveCount();
public boolean resizePool(int corePoolSize, int maximumPoolSize);
public boolean resizeQueue(int capacity);
}
```
实现这个接口以后,就能动态改变线程池的大小和任务队列的大小了,它是继承自JDK的Executor接口的,其它的接口再也不多说,可自行查看源码。
### Tomcat线程池的实现
Tomcat的线程池的名字也叫做ThreadPoolExecutor,刚开始看源代码的时候还觉得是使用了JDK的ThreadPoolExecutor了呢,后面仔细查看才知道是Tomcat本身实现的一个ThreadPoolExecutor,不过基本上都差很少,都是在JDK之上封装了一些本身的东西,上源码:
``` java
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
protected static final StringManager sm = StringManager
.getManager("org.apache.tomcat.util.threads.res");
/**
* 已经提交但还没有完成的任务数量。
* 这包括已经在队列中的任务和已经交给工做线程的任务但还未开始执行的任务
* 这个数字老是大于getActiveCount()的
**/
private final AtomicInteger submittedCount = new AtomicInteger(0);
private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);
/**
* 最近的时间在ms时,一个线程决定杀死本身来避免
* 潜在的内存泄漏。 用于调节线程的更新速率。
*/
private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L);
//延迟2个线程之间的延迟。 若是为负,不更新线程。
private long threadRenewalDelay = 1000L;
//4个构造方法 ... 省略
public long getThreadRenewalDelay() {
return threadRenewalDelay;
}
public void setThreadRenewalDelay(long threadRenewalDelay) {
this.threadRenewalDelay = threadRenewalDelay;
}
/**
* 方法在完成给定Runnable的执行时调用。
* 此方法由执行任务的线程调用。 若是
* 非null,Throwable是未捕获的{@code RuntimeException}
* 或{@code Error},致使执行忽然终止。...
* @param r 已完成的任务
* @param t 引发终止的异常,若是执行正常完成则为null
**/
@Override
protected void afterExecute(Runnable r, Throwable t) {
submittedCount.decrementAndGet();
if (t == null) {
stopCurrentThreadIfNeeded();
}
}
//若是当前线程在上一次上下文中止以前启动,则抛出异常,以便中止当前线程。
protected void stopCurrentThreadIfNeeded() {
if (currentThreadShouldBeStopped()) {
long lastTime = lastTimeThreadKilledItself.longValue();
if (lastTime + threadRenewalDelay < System.currentTimeMillis()) {
if (lastTimeThreadKilledItself.compareAndSet(lastTime,
System.currentTimeMillis() + 1)) {
// OK, it's really time to dispose of this thread
final String msg = sm.getString(
"threadPoolExecutor.threadStoppedToAvoidPotentialLeak",
Thread.currentThread().getName());
throw new StopPooledThreadException(msg);
}
}
}
}
//当前线程是否须要被终止
protected boolean currentThreadShouldBeStopped() {
if (threadRenewalDelay >= 0
&& Thread.currentThread() instanceof TaskThread) {
TaskThread currentTaskThread = (TaskThread) Thread.currentThread();
//线程建立的时间<上下文中止的时间,则能够中止该线程
if (currentTaskThread.getCreationTime() <
this.lastContextStoppedTime.longValue()) {
return true;
}
}
return false;
}
public int getSubmittedCount() {
return submittedCount.get();
}
@Override
public void execute(Runnable command) {
execute(command,0,TimeUnit.MILLISECONDS);
}
public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
if (super.getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.");
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
Thread.interrupted();
throw new RejectedExecutionException(x);
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}
public void contextStopping() {
this.lastContextStoppedTime.set(System.currentTimeMillis());
int savedCorePoolSize = this.getCorePoolSize();
TaskQueue taskQueue =
getQueue() instanceof TaskQueue ? (TaskQueue) getQueue() : null;
if (taskQueue != null) {
taskQueue.setForcedRemainingCapacity(Integer.valueOf(0));
}
// setCorePoolSize(0) wakes idle threads
this.setCorePoolSize(0);
if (taskQueue != null) {
// ok, restore the state of the queue and pool
taskQueue.setForcedRemainingCapacity(null);
}
this.setCorePoolSize(savedCorePoolSize);
}
}
```
Tomcat的线程池根据文档来讲:和java.util.concurrent同样,可是它实现了一个高效的方法getSubmittedCount()方法用来处理工做队列。具体能够查看上面的注释和源码就知道了。把UML图献上。

### Tomcat线程工厂
想要自定义线程工厂类,只须要实现JDK的ThreadFactory接口就能够了,咱们来看看Tomcat是如何实现的吧:
``` java
public class TaskThreadFactory implements ThreadFactory {
//线程组
private final ThreadGroup group;
//线程增加因子
private final AtomicInteger threadNumber = new AtomicInteger(1);
//名称前缀
private final String namePrefix;
//是不是守护线程
private final boolean daemon;
//线程优先级
private final int threadPriority;
public TaskThreadFactory(String namePrefix, boolean daemon, int priority) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
this.namePrefix = namePrefix;
this.daemon = daemon;
this.threadPriority = priority;
}
@Override
public Thread newThread(Runnable r) {
TaskThread t = new TaskThread(group, r, namePrefix + threadNumber.getAndIncrement());
t.setDaemon(daemon);
t.setPriority(threadPriority);
return t;
}
}
```
Tomcat的线程工厂类和JDK实现的线程工厂类相差无几,具体能够参考一下JDK线程工厂Executors.DefaultThreadFactory工厂类的实现。
### Tomcat的线程类
Tomcat本身定义了TaskThread用于线程的执行,里面增长了creationTime字段用于定义线程建立的开始时间,以便后面线程池获取这个时间来进行优化。
``` java
/**
* 一个实现建立时间纪录的线程类
*/
public class TaskThread extends Thread {
private static final Log log = LogFactory.getLog(TaskThread.class);
private final long creationTime;
public TaskThread(ThreadGroup group, Runnable target, String name) {
super(group, new WrappingRunnable(target), name);
this.creationTime = System.currentTimeMillis();
}
public TaskThread(ThreadGroup group, Runnable target, String name,
long stackSize) {
super(group, new WrappingRunnable(target), name, stackSize);
this.creationTime = System.currentTimeMillis();
}
public final long getCreationTime() {
return creationTime;
}
/**
* 封装{@link Runnable}以接受任何{@link StopPooledThreadException},而不是让它走,并可能在调试器中触发中断。
*/
private static class WrappingRunnable implements Runnable {
private Runnable wrappedRunnable;
WrappingRunnable(Runnable wrappedRunnable) {
this.wrappedRunnable = wrappedRunnable;
}
@Override
public void run() {
try {
wrappedRunnable.run();
} catch(StopPooledThreadException exc) {
//expected : we just swallow the exception to avoid disturbing
//debuggers like eclipse's
log.debug("Thread exiting on purpose", exc);
}
}
}
}
```
按照Tomcat的注解可知,它就是一个普通的线程类而后增长一个纪录线程建立的时间纪录而已,后面还使用动态内部类封装了一个Runnable,用于调试出发中断。
### Tomcat任务队列
Tomcat的线程队列由org.apache.tomcat.util.threads.TaskQueue来处理,它集成自LinkedBlockingQueue(一个阻塞的链表队列),来看下源代码吧。
``` java
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
private static final long serialVersionUID = 1L;
private ThreadPoolExecutor parent = null;
// no need to be volatile, the one times when we change and read it occur in
// a single thread (the one that did stop a context and fired listeners)
private Integer forcedRemainingCapacity = null;
public TaskQueue() {
super();
}
public TaskQueue(int capacity) {
super(capacity);
}
public TaskQueue(Collection<? extends Runnable> c) {
super(c);
}
public void setParent(ThreadPoolExecutor tp) {
parent = tp;
}
public boolean force(Runnable o) {
if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
return super.offer(o); //forces the item onto the queue, to be used if the task is rejected
}
public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
}
@Override
public boolean offer(Runnable o) {
//we can't do any checks
if (parent==null) return super.offer(o);
//we are maxed out on threads, simply queue the object
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//we have idle threads, just add it to the queue
if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
//if we have less threads than maximum force creation of a new thread
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
//if we reached here, we need to add it to the queue
return super.offer(o);
}
@Override
public Runnable poll(long timeout, TimeUnit unit)
throws InterruptedException {
Runnable runnable = super.poll(timeout, unit);
if (runnable == null && parent != null) {
// the poll timed out, it gives an opportunity to stop the current
// thread if needed to avoid memory leaks.
parent.stopCurrentThreadIfNeeded();
}
return runnable;
}
@Override
public Runnable take() throws InterruptedException {
if (parent != null && parent.currentThreadShouldBeStopped()) {
return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS);
// yes, this may return null (in case of timeout) which normally
// does not occur with take()
// but the ThreadPoolExecutor implementation allows this
}
return super.take();
}
@Override
public int remainingCapacity() {
if (forcedRemainingCapacity != null) {
return forcedRemainingCapacity.intValue();
}
return super.remainingCapacity();
}
public void setForcedRemainingCapacity(Integer forcedRemainingCapacity) {
this.forcedRemainingCapacity = forcedRemainingCapacity;
}
}
```
TaskQueue这个任务队列是专门为线程池而设计的。优化任务队列以适当地利用线程池执行器内的线程。
若是你使用一个普通的队列,执行器将产生线程,当有空闲线程,你不能强制项目到队列自己。
### 总结 从0到1分析一下Apache Tomcat的线程池,感受心好累啊,不过有收获,至少多线程池这一块又增强了,首先是定位到了StandardThreadExecutor这个类,而后由此展开,ResizableExecutor(动态大小的线程池接口) 、ThreadPoolExecutor (Tomcat线程池具体实现对象)、TaskThreadFactory(Tomcat线程工厂)、TaskThread(Tomcat线程类-一个纪录建立时间的线程类)、TaskQueue(Tomcat的任务队列-一个专门为线程池而设计优化的任务队列),喝口水,压压惊。