由于已经看到了ThreadPoolExecutor的源码,因此很容易就看到了ThreadPoolExecutor线程池的数据结构。图1描述了这种数据结构。
图1 ThreadPoolExecutor 数据结构
其实,即使没有上述图形描述ThreadPoolExecutor的数据结构,我们根据线程池的要求也很能够猜测出其数据结构出来。
对于ThreadPoolExecutor而言,一个线程就是一个Worker对象,它与一个线程绑定,当Worker执行完毕就是线程执行完毕,这个在后面详细讨论线程池中线程的运行方式。
既然是线程池,那么就首先研究下线程的构造方法。
-
public
interface ThreadFactory {
-
Thread newThread(Runnable r);
-
}
ThreadPoolExecutor使用一个线程工厂来构造线程。线程池都是提交一个任务Runnable,然后在某一个线程Thread中执行,ThreadFactory 负责如何创建一个新线程。
在J.U.C中有一个通用的线程工厂java.util.concurrent.Executors.DefaultThreadFactory,它的构造方式如下:
-
static
class DefaultThreadFactory implements ThreadFactory {
-
static
final AtomicInteger poolNumber =
new AtomicInteger(
1);
-
final ThreadGroup group;
-
final AtomicInteger threadNumber =
new AtomicInteger(
1);
-
final String namePrefix;
-
DefaultThreadFactory() {
-
SecurityManager s = System.getSecurityManager();
-
group = (s !=
null)? s.getThreadGroup() :
-
Thread.currentThread().getThreadGroup();
-
namePrefix =
"pool-" +
-
poolNumber.getAndIncrement() +
-
"-thread-";
-
}
-
public Thread newThread(Runnable r) {
-
Thread t =
new Thread(group, r,
-
namePrefix + threadNumber.getAndIncrement(),
-
0);
-
if (t.isDaemon())
-
t.setDaemon(
false);
-
if (t.getPriority() != Thread.NORM_PRIORITY)
-
t.setPriority(Thread.NORM_PRIORITY);
-
return t;
-
}
-
}
在这个线程工厂中,同一个线程池的所有线程属于同一个线程组,也就是创建线程池的那个线程组,同时线程池的名称都是“pool-<poolNum>-thread-<threadNum>”,其中poolNum是线程池的数量序号,threadNum是此线程池中的线程数量序号。这样如果使用jstack的话很容易就看到了系统中线程池的数量和线程池中线程的数量。另外对于线程池中的所有线程默认都转换为非后台线程,这样主线程退出时不会直接退出JVM,而是等待线程池结束。还有一点就是默认将线程池中的所有线程都调为同一个级别,这样在操作系统角度来看所有系统都是公平的,不会导致竞争堆积。
一个线程Worker被构造出来以后就开始处于运行状态。以下是一个线程执行的简版逻辑。
-
private
final
class Worker implements Runnable {
-
private
final ReentrantLock runLock =
new ReentrantLock();
-
private Runnable firstTask;
-
Thread thread;
-
Worker(Runnable firstTask) {
-
this.firstTask = firstTask;
-
}
-
private void runTask(Runnable task) {
-
final ReentrantLock runLock =
this.runLock;
-
runLock.lock();
-
try {
-
task.run();
-
}
finally {
-
runLock.unlock();
-
}
-
}
-
public void run() {
-
try {
-
Runnable task = firstTask;
-
firstTask =
null;
-
while (task !=
null || (task = getTask()) !=
null) {
-
runTask(task);
-
task =
null;
-
}
-
}
finally {
-
workerDone(
this);
-
}
-
}
-
}
当提交一个任务时,如果需要创建一个线程(何时需要在下一节中探讨)时,就调用线程工厂创建一个线程,同时将线程绑定到Worker工作队列中。需要说明的是,Worker队列构造的时候带着一个任务Runnable,因此Worker创建时总是绑定着一个待执行任务。换句话说,创建线程的前提是有必要创建线程(任务数已经超出了线程或者强制创建新的线程,至于为何强制创建新的线程后面章节会具体分析),不会无缘无故创建一堆空闲线程等着任务。这是节省资源的一种方式。
一旦线程池启动线程后(调用线程run())方法,那么线程工作队列Worker就从第1个任务开始执行(这时候发现构造Worker时传递一个任务的好处了),一旦第1个任务执行完毕,就从线程池的任务队列中取出下一个任务进行执行。循环如此,直到线程池被关闭或者任务抛出了一个RuntimeException。
由此可见,线程池的基本原理其实也很简单,无非预先启动一些线程,线程进入死循环状态,每次从任务队列中获取一个任务进行执行,直到线程池被关闭。如果某个线程因为执行某个任务发生异常而终止,那么重新创建一个新的线程而已。如此反复。
其实,线程池原理看起来简单,但是复杂的是各种策略,例如何时该启动一个线程,何时该终止、挂起、唤醒一个线程,任务队列的阻塞与超时,线程池的生命周期以及任务拒绝策略等等。
我们从一个API开始接触Executor是如何处理任务队列的。
java.util.concurrent.Executor.execute(Runnable)
Executes the given task sometime in the future. The task may execute in a new thread or in an existing pooled thread. If the task cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached, the task is handled by the current RejectedExecutionHandler.
线程池中所有任务执行都依赖于此接口。这段话有以下几个意思:
回答上面两个“可能“。任务可能被执行,那不可能的情况就是上面说的情况3;可能不是立即执行,是因为任务可能还在队列中排队,因此还在等待分配线程执行。了解完了字面上的问题,我们再来看具体的实现。
-
public void execute(Runnable command) {
-
if (command ==
null)
-
throw
new NullPointerException();
-
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
-
if (runState == RUNNING && workQueue.offer(command)) {
-
if (runState != RUNNING || poolSize ==
0)
-
ensureQueuedTaskHandled(command);
-
}
-
else
if (!addIfUnderMaximumPoolSize(command))
-
reject(command);
// is shutdown or saturated
-
}
-
}
这一段代码看起来挺简单的,其实这就是线程池最重要的一部分,如果能够完全理解这一块,线程池还是挺容易的。整个执行流程是这样的:
文字描述步骤不够简单?下面图形详细表述了此过程。
老实说这个图比上面步骤更难以理解,那么从何入手呢。
流程的入口很简单,我们就是要执行一个任务(Runnable command),那么它的结束点在哪或者有哪几个?
根据左边这个图我们知道可能有以下几种出口:
(1)图中的P1、P7,我们根据这条路径可以看到,仅仅是将任务加入任务队列(offer(command))了;
(2)图中的P3,这条路径不将任务加入任务队列,但是启动了一个新工作线程(Worker)进行扫尾操作,用户处理为空的任务队列;
(3)图中的P4,这条路径没有将任务加入任务队列,但是启动了一个新工作线程(Worker),并且工作现场的第一个任务就是当前任务;
(4)图中的P5、P6,这条路径没有将任务加入任务队列,也没有启动工作线程,仅仅是抛给了任务拒绝策略。P2是任务加入了任务队列却因为线程池已经关闭于是又从任务队列中删除,并且抛给了拒绝策略。
如果上面的解释还不清楚,可以去研究下面两段代码:
java.util.concurrent.ThreadPoolExecutor.addIfUnderCorePoolSize(Runnable) java.util.concurrent.ThreadPoolExecutor.addIfUnderMaximumPoolSize(Runnable) java.util.concurrent.ThreadPoolExecutor.ensureQueuedTaskHandled(Runnable)
那么什么时候一个任务被立即执行呢?
在线程池运行状态下,如果线程池大小 小于 核心线程池大小或者线程池已满(任务队列已满)并且线程池大小 小于 最大线程池大小(此时线程池大小 大于 核心线程池大小的),用程序描述为:
runState == RUNNING && ( poolSize < corePoolSize || poolSize < maxnumPoolSize && workQueue.isFull())
上面的条件就是一个任务能够被立即执行的条件。
有了execute的基础,我们看看ExecutorService中的几个submit方法的实现。
-
public Future<?> submit(Runnable task) {
-
if (task ==
null)
throw
new NullPointerException();
-
RunnableFuture<Object> ftask = newTaskFor(task,
null);
-
execute(ftask);
-
return ftask;
-
}
-
-
public <T>
Future<T> submit(Runnable task, T result) {
-
if (task ==
null)
throw
new NullPointerException();
-
RunnableFuture<T> ftask = newTaskFor(task, result);
-
execute(ftask);
-
return ftask;
-
}
-
-
public <T>
Future<T> submit(Callable<T> task) {
-
if (task ==
null)
throw
new NullPointerException();
-
RunnableFuture<T> ftask = newTaskFor(task);
-
execute(ftask);
-
return ftask;
-
}
很简单,不是么?对于一个线程池来说复杂的地方也就在execute方法的执行流程。在下一节中我们来讨论下如何获取任务的执行结果,也就是Future类的使用和原理。
这一节来探讨下线程池中任务执行的结果以及如何阻塞线程、取消任务等等。
-
package info.imxylz.study.concurrency.future;
-
-
public
class SleepForResultDemo implements Runnable {
-
-
static
boolean result =
false;
-
-
static void sleepWhile(long ms) {
-
try {
-
Thread.sleep(ms);
-
}
catch (Exception e) {}
-
}
-
-
@Override
-
public void run() {
-
//do work
-
System.out.println(
"Hello, sleep a while.");
-
sleepWhile(
2000L);
-
result =
true;
-
}
-
-
public static void main(String[] args) {
-
SleepForResultDemo demo =
new SleepForResultDemo();
-
Thread t =
new Thread(demo);
-
t.start();
-
sleepWhile(
3000L);
-
System.out.println(result);
-
}
-
-
}
在没有线程池的时代里面,使用Thread.sleep(long)去获取线程执行完毕的场景很多。显然这种方式很笨拙,他需要你事先知道任务可能的执行时间,并且还会阻塞主线程,不管任务有没有执行完毕。
-
package info.imxylz.study.concurrency.future;
-
-
public
class SleepLoopForResultDemo implements Runnable {
-
-
boolean result =
false;
-
-
volatile
boolean finished =
false;
-
-
static void sleepWhile(long ms) {
-
try {
-
Thread.sleep(ms);
-
}
catch (Exception e) {}
-
}
-
-
@Override
-
public void run() {
-
//do work
-
try {
-
System.out.println(
"Hello, sleep a while.");
-
sleepWhile(
2000L);
-
result =
true;
-
}
finally {
-
finished =
true;
-
}
-
}
-
-
public static void main(String[] args) {
-
SleepLoopForResultDemo demo =
new SleepLoopForResultDemo();
-
Thread t =
new Thread(demo);
-
t.start();
-
while (!demo.finished) {
-
sleepWhile(
10L);
-
}
-
System.out.println(demo.result);
-
}
-
-
}
使用volatile与while死循环的好处就是等待的时间可以稍微小一点,但是依然有CPU负载高并且阻塞主线程的问题。最简单的降低CPU负载的方式就是使用Thread.join().
-
SleepLoopForResultDemo demo =
new SleepLoopForResultDemo();
-
Thread t =
new Thread(demo);
-
t.start();
-
t.join();
-
System.out.println(demo.result);
显然这也是一种不错的方式,另外还有自己写锁使用wait/notify的方式。其实join()从本质上讲就是利用while和wait来实现的。
上面的方式中都存在一个问题,那就是会阻塞主线程并且任务不能被取消。为了解决这个问题,线程池中提供了一个Future接口。
在Future接口中提供了5个方法。
API看起来容易,来研究下异常吧。get()请求获取一个结果会阻塞当前进程,并且可能抛出以下三种异常:
对于get(long timeout, TimeUnit unit)而言,除了get()方法的异常外,由于有超时机制,因此还可能得到一个TimeoutException。
boolean cancel(boolean mayInterruptIfRunning)方法比较复杂,各种情况比较多:
来看看Future接口的实现类java.util.concurrent.FutureTask<V>具体是如何操作的。
在FutureTask中使用了一个AQS数据结构来完成各种状态以及加锁、阻塞的实现。
在此AQS类java.util.concurrent.FutureTask.Sync中一个任务用4中状态:
初始情况下任务状态state=0,任务执行(innerRun)后状态变为运行状态RUNNING(state=1),执行完毕后变成运行结束状态RAN(state=2)。任务在初始状态或者执行状态被取消后就变为状态CANCELLED(state=4)。AQS最擅长无锁情况下处理几种简单的状态变更的。
-
void innerRun() {
-
if (!compareAndSetState(
0, RUNNING))
-
return;
-
try {
-
runner = Thread.currentThread();
-
if (getState() == RUNNING)
// recheck after setting thread
-
innerSet(callable.call());
-
else
-
releaseShared(
0);
// cancel
-
}
catch (Throwable ex) {
-
innerSetException(ex);
-
}
-
}
执行一个任务有四步:设置运行状态、设置当前线程(AQS需要)、执行任务(Runnable#run或者Callable#call)、设置执行结果。这里也可以看到,一个任务只能执行一次,因为执行完毕后它的状态不在为初始值0,要么为CANCELLED,要么为RAN。
取消一个任务(cancel)又是怎样进行的呢?对比下前面取消任务的描述是不是很简单,这里无非利用AQS的状态来改变任务的执行状态,最终达到放弃未启动或者正在执行的任务的目的。
-
boolean innerCancel(boolean mayInterruptIfRunning) {
-
for (;;) {
-
int s = getState();
-
if (ranOrCancelled(s))
-
return
false;
-
if (compareAndSetState(s, CANCELLED))
-
break;
-
}
-
if (mayInterruptIfRunning) {
-
Thread r = runner;
-
if (r !=
null)
-
r.interrupt();
-
}
-
releaseShared(
0);
-
done();
-
return
true;
-
}
到目前为止我们依然没有说明到底是如何阻塞获取一个结果的。下面四段代码描述了这个过程。
-
V innerGet() throws InterruptedException, ExecutionException {
-
acquireSharedInterruptibly(
0);
-
if (getState() == CANCELLED)
-
throw
new CancellationException();
-
if (exception !=
null)
-
throw
new ExecutionException(exception);
-
return result;
-
}
-
//AQS#acquireSharedInterruptibly
-
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
-
if (Thread.interrupted())
-
throw
new InterruptedException();
-
if (tryAcquireShared(arg) <
0)
-
doAcquireSharedInterruptibly(arg);
//park current Thread for result
-
}
-
protected int tryAcquireShared(int ignore) {
-
return innerIsDone()?
1 : -
1;
-
}
-
-
boolean innerIsDone() {
-
return ranOrCancelled(getState()) && runner ==
null;
-
}
当调用Future#get()的时候尝试去获取一个共享变量。这就涉及到AQS的使用方式了。这里获取一个共享变量的状态是任务是否结束(innerIsDone()),也就是任务是否执行完毕或者被取消。如果不满足条件,那么在AQS中就会doAcquireSharedInterruptibly(arg)挂起当前线程,直到满足条件。AQS前面讲过,挂起线程使用的是LockSupport的park方式,因此性能消耗是很低的。
至于将Runnable接口转换成Callable接口,java.util.concurrent.Executors.callable(Runnable, T)也提供了一个简单实现。
-
static
final
class RunnableAdapter<T> implements Callable<T> {
-
final Runnable task;
-
final T result;
-
RunnableAdapter(Runnable task, T result) {
-
this.task = task;
-
this.result = result;
-
}
-
public T call() {
-
task.run();
-
return result;
-
}
-
}
java.util.concurrent.ScheduledThreadPoolExecutor是默认的延迟、周期性任务调度的实现。
有了整个线程池的实现,再回头来看延迟、周期性任务调度的实现应该就很简单了,因为所谓的延迟、周期性任务调度,无非添加一系列有序的任务队列,然后按照执行顺序的先后来处理整个任务队列。如果是周期性任务,那么在执行完毕的时候加入下一个时间点的任务即可。
由此可见,ScheduledThreadPoolExecutor和ThreadPoolExecutor的唯一区别在于任务是有序(按照执行时间顺序)的,并且需要到达时间点(临界点)才能执行,并不是任务队列中有任务就需要执行的。也就是说唯一不同的就是任务队列BlockingQueue<Runnable> workQueue不一样。ScheduledThreadPoolExecutor的任务队列是java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue,它是基于java.util.concurrent.DelayQueue<RunnableScheduledFuture>队列的实现。
DelayQueue是基于有序队列PriorityQueue实现的。PriorityQueue 也叫优先级队列,按照自然顺序对元素进行排序,类似于TreeMap/Collections.sort一样。
同样是有序队列,DelayQueue和PriorityQueue区别在什么地方?
由于DelayQueue在获取元素时需要检测元素是否“可用”,也就是任务是否达到“临界点”(指定时间点),因此加入元素和移除元素会有一些额外的操作。
典型的,移除元素需要检测元素是否达到“临界点”,增加元素的时候如果有一个元素比“头元素”更早达到临界点,那么就需要通知任务队列。因此这需要一个条件变量final Condition available 。
移除元素(出队列)的过程是这样的:
-
public E take() throws InterruptedException {
-
final ReentrantLock lock =
this.lock;
-
lock.lockInterruptibly();
-
try {
-
for (;;) {
-
E first = q.peek();
-
if (first ==
null) {
-
available.await();
-
}
else {
-
long delay = first.getDelay(TimeUnit.NANOSECONDS);
-
if (delay >
0) {
-
long tl = available.awaitNanos(delay);
-
}
else {
-
E x = q.poll();
-
assert x !=
null;
-
if (q.size() !=
0)
-
available.signalAll();
// wake up other takers
-
return x;
-
-
}
-
}
-
}
-
}
finally {
-
lock.unlock();
-
}
-
}
同样加入元素也会有相应的条件变量操作。当前仅当队列为空或者要加入的元素比队列中的头元素还小的时候才需要唤醒“等待线程”去检测元素。因为头元素都没有唤醒那么比头元素更延迟的元素就更加不会唤醒。
同样加入元素也会有相应的条件变量操作。当前仅当队列为空或者要加入的元素比队列中的头元素还小的时候才需要唤醒“等待线程”去检测元素。因为头元素都没有唤醒那么比头元素更延迟的元素就更加不会唤醒。
同样加入元素也会有相应的条件变量操作。当前仅当队列为空或者要加入的元素比队列中的头元素还小的时候才需要唤醒“等待线程”去检测元素。因为头元素都没有唤醒那么比头元素更延迟的元素就更加不会唤醒。
-
public E take() throws InterruptedException {
-
final ReentrantLock lock =
this.lock;
-
lock.lockInterruptibly();
-
try {
-
for (;;) {
-
E first = q.peek();
-
if (first ==
null) {
-
available.await();
-
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- for (;;) {
- E first&