在web开发中,服务器须要接受并处理请求,因此会为一个请求来分配一个线程来进行处理。若是每次请求都新建立一个线程的话实现起来很是简便,可是存在一个问题:java
若是并发的请求数量很是多,但每一个线程执行的时间很短,这样就会频繁的建立和销毁线程,如此一来会大大下降系统的效率。可能出现服务器在为每一个请求建立新线程和销毁线程上花费的时间和消耗的系统资源要比处理实际的用户请求的时间和资源更多。web
那么有没有一种办法使执行完一个任务,并不被销毁,而是能够继续执行其余的任务呢?安全
这就是线程池的目的了。线程池为线程生命周期的开销和资源不足问题提供了解决方案。经过对多个任务重用线程,线程建立的开销被分摊到了多个任务上。服务器
何时使用线程池?多线程
使用线程池的好处并发
引用自 http://ifeve.com/java-threadpool/ 的说明:框架
Java中的线程池是用ThreadPoolExecutor类来实现的. 本文就结合JDK 1.8对该类的源码来分析一下这个类内部对于线程的建立, 管理以及后台任务的调度等方面的执行原理。异步
先看一下线程池的类图:ide
Executor框架是一个根据一组执行策略调用,调度,执行和控制的异步任务的框架,目的是提供一种将”任务提交”与”任务如何运行”分离开来的机制。oop
J.U.C中有三个Executor接口:
public interface Executor { void execute(Runnable command); }
Executor接口只有一个execute方法,用来替代一般建立或启动线程的方法。例如,使用Thread来建立并启动线程的代码以下:
Thread t = new Thread(); t.start();
使用Executor来启动线程执行任务的代码以下:
Thread t = new Thread(); executor.execute(t);
对于不一样的Executor实现,execute()方法多是建立一个新线程并当即启动,也有多是使用已有的工做线程来运行传入的任务,也多是根据设置线程池的容量或者阻塞队列的容量来决定是否要将传入的线程放入阻塞队列中或者拒绝接收传入的线程。
ExecutorService接口继承自Executor接口,提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行情况而生成 Future 的方法。增长了shutDown(),shutDownNow(),invokeAll(),invokeAny()和submit()等方法。若是须要支持即时关闭,也就是shutDownNow()方法,则任务须要正确处理中断。
ScheduledExecutorService扩展ExecutorService接口并增长了schedule方法。调用schedule方法能够在指定的延时后执行一个Runnable或者Callable任务。ScheduledExecutorService接口还定义了按照指定时间间隔按期执行任务的scheduleAtFixedRate()方法和scheduleWithFixedDelay()方法。
ThreadPoolExecutor继承自AbstractExecutorService,也是实现了ExecutorService接口。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
ctl
是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),这里能够看到,使用了Integer类型来保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。
下面再介绍下线程池的运行状态. 线程池一共有五种状态, 分别是:
下图为线程池的状态转换过程:
这里还有几个对ctl进行计算的方法:
private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; }
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
构造方法中的字段含义以下:
corePoolSize:核心线程数量,当有新任务在execute()方法提交时,会执行如下判断:
因此,任务提交时,判断的顺序为 corePoolSize –> workQueue –> maximumPoolSize。
execute()方法用来提交任务,代码以下:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * clt记录着runState和workerCount */ int c = ctl.get(); /* * workerCountOf方法取出低29位的值,表示当前活动的线程数; * 若是当前活动线程数小于corePoolSize,则新建一个线程放入线程池中; * 并把任务添加到该线程中。 */ if (workerCountOf(c) < corePoolSize) { /* * addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断仍是maximumPoolSize来判断; * 若是为true,根据corePoolSize来判断; * 若是为false,则根据maximumPoolSize来判断 */ if (addWorker(command, true)) return; /* * 若是添加失败,则从新获取ctl值 */ c = ctl.get(); } /* * 若是当前线程池是运行状态而且任务添加到队列成功 */ if (isRunning(c) && workQueue.offer(command)) { // 从新获取ctl值 int recheck = ctl.get(); // 再次判断线程池的运行状态,若是不是运行状态,因为以前已经把command添加到workQueue中了, // 这时须要移除该command // 执行事后经过handler使用拒绝策略对该任务进行处理,整个方法返回 if (! isRunning(recheck) && remove(command)) reject(command); /* * 获取线程池中的有效线程数,若是数量是0,则执行addWorker方法 * 这里传入的参数表示: * 1. 第一个参数为null,表示在线程池中建立一个线程,但不去启动; * 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断; * 若是判断workerCount大于0,则直接返回,在workQueue中新增的command会在未来的某个时刻被执行。 */ else if (workerCountOf(recheck) == 0) addWorker(null, false); } /* * 若是执行到这里,有两种状况: * 1. 线程池已经不是RUNNING状态; * 2. 线程池是RUNNING状态,但workerCount >= corePoolSize而且workQueue已满。 * 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize; * 若是失败则拒绝该任务 */ else if (!addWorker(command, false)) reject(command); }
简单来讲,在执行execute()方法时若是状态一直是RUNNING时,的执行过程以下:
workerCount < corePoolSize
,则建立并启动一个线程来执行新提交的任务;workerCount >= corePoolSize
,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;workerCount >= corePoolSize && workerCount < maximumPoolSize
,且线程池内的阻塞队列已满,则建立并启动一个线程来执行新提交的任务;workerCount >= maximumPoolSize
,而且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。这里要注意一下addWorker(null, false);
,也就是建立一个线程,但并无传入任务,由于任务已经被添加到workQueue中了,因此worker在执行的时候,会直接从workQueue中获取任务。因此,在workerCountOf(recheck) == 0
时执行addWorker(null, false);
也是为了保证线程池在RUNNING状态下必需要有一个线程来执行任务。
execute方法执行流程以下:
addWorker方法的主要工做是在线程池中建立一个新的线程并执行,firstTask参数 用于指定新增的线程执行的第一个任务,core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前须要判断当前活动线程数是否少于maximumPoolSize,代码以下:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); // 获取运行状态 int rs = runStateOf(c); /* * 这个if判断 * 若是rs >= SHUTDOWN,则表示此时再也不接收新任务; * 接着判断如下3个条件,只要有1个不知足,则返回false: * 1. rs == SHUTDOWN,这时表示关闭状态,再也不接受新提交的任务,但却能够继续处理阻塞队列中已保存的任务 * 2. firsTask为空 * 3. 阻塞队列不为空 * * 首先考虑rs == SHUTDOWN的状况 * 这种状况下不会接受新提交的任务,因此在firstTask不为空的时候会返回false; * 而后,若是firstTask为空,而且workQueue也为空,则返回false, * 由于队列中已经没有任务了,不须要再添加线程了 */ // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { // 获取线程数 int wc = workerCountOf(c); // 若是wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false; // 这里的core是addWorker方法的第二个参数,若是为true表示根据corePoolSize来比较, // 若是为false则根据maximumPoolSize来比较。 // if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 尝试增长workerCount,若是成功,则跳出第一个for循环 if (compareAndIncrementWorkerCount(c)) break retry; // 若是增长workerCount失败,则从新获取ctl的值 c = ctl.get(); // Re-read ctl // 若是当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 根据firstTask来建立Worker对象 w = new Worker(firstTask); // 每个Worker对象都会建立一个线程 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // rs < SHUTDOWN表示是RUNNING状态; // 若是rs是RUNNING状态或者rs是SHUTDOWN状态而且firstTask为null,向线程池中添加线程。 // 由于在SHUTDOWN时不会在添加新的任务,但仍是会执行workQueue中的任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // workers是一个HashSet workers.add(w); int s = workers.size(); // largestPoolSize记录着线程池中出现过的最大线程数量 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 启动线程 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
注意一下这里的t.start()
这个语句,启动时会调用Worker类中的run方法,Worker自己实现了Runnable接口,因此一个Worker类型的对象也是一个线程。
线程池中的每个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象,看一下Worker的定义:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
Worker类继承了AQS,并实现了Runnable接口,注意其中的firstTask和thread属性:firstTask用它来保存传入的任务;thread是在调用构造方法时经过ThreadFactory来建立的线程,是用来处理任务的线程。
在调用构造方法时,须要把任务传入,这里经过getThreadFactory().newThread(this);
来新建一个线程,newThread方法传入的参数是this,由于Worker自己继承了Runnable接口,也就是一个线程,因此一个Worker对象在启动的时候会调用Worker类中的run方法。
Worker继承了AQS,使用AQS来实现独占锁的功能。为何不使用ReentrantLock来实现呢?能够看到tryAcquire方法,它是不容许重入的,而ReentrantLock是容许重入的:
因此,Worker继承自AQS,用于判断线程是否空闲以及是否能够被中断。
此外,在构造方法中执行了setState(-1);
,把state变量设置为-1,为何这么作呢?是由于AQS中默认的state是0,若是刚建立了一个Worker对象,尚未执行任务时,这时就不该该被中断,看一下tryAquire方法:
protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
tryAcquire方法是根据state是不是0来判断的,因此,setState(-1);
将state设置为-1是为了禁止在执行任务前对线程进行中断。
正由于如此,在runWorker方法中会先调用Worker对象的unlock方法将state设置为0.
在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的代码以下:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // 获取第一个任务 Runnable task = w.firstTask; w.firstTask = null; // 容许中断 w.unlock(); // allow interrupts // 是否由于异常退出循环 boolean completedAbruptly = true; try { // 若是task为空,则经过getTask来获取任务 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
这里说明一下第一个if判断,目的是:
这里要考虑在执行该if语句期间可能也执行了shutdownNow方法,shutdownNow方法会把状态设置为STOP,回顾一下STOP状态:
不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态。
STOP状态要中断线程池中的全部线程,而这里使用Thread.interrupted()
来判断是否中断是为了确保在RUNNING或者SHUTDOWN状态时线程是非中断状态的,由于Thread.interrupted()方法会复位中断的状态。
总结一下runWorker方法的执行过程:
task.run()
执行任务;这里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给子类来实现。
completedAbruptly变量来表示在执行任务过程当中是否出现了异常,在processWorkerExit方法中会对该变量的值进行判断。
getTask方法用来从阻塞队列中取任务,代码以下:
private Runnable getTask() { // timeOut变量的值表示上次从阻塞队列中取任务时是否超时 boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. /* * 若是线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行如下判断: * 1. rs >= STOP,线程池是否正在stop; * 2. 阻塞队列是否为空。 * 若是以上条件知足,则将workerCount减1并返回null。 * 由于若是当前线程池状态的值是SHUTDOWN或以上时,不容许再向阻塞队列中添加任务。 */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? // timed变量用于判断是否须要进行超时控制。 // allowCoreThreadTimeOut默认是false,也就是核心线程不容许进行超时; // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量; // 对于超过核心线程数量的这些线程,须要进行超时控制 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /* * wc > maximumPoolSize的状况是由于可能在此方法执行阶段同时执行了setMaximumPoolSize方法; * timed && timedOut 若是为true,表示当前操做须要进行超时控制,而且上次从阻塞队列中获取任务发生了超时 * 接下来判断,若是有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1; * 若是减1失败,则返回重试。 * 若是wc == 1时,也就说明当前线程是线程池中惟一的一个线程了。 */ if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { /* * 根据timed来判断,若是为true,则经过阻塞队列的poll方法进行超时控制,若是在keepAliveTime时间内没有获取到任务,则返回null; * 不然经过take方法,若是这时队列为空,则take方法会阻塞直到队列不为空。 * */ Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; // 若是 r == null,说明已经超时,timedOut设置为true timedOut = true; } catch (InterruptedException retry) { // 若是获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试 timedOut = false; } } }
这里重要的地方是第二个if判断,目的是控制线程池的有效线程数量。由上文中的分析能够知道,在执行execute方法时,若是当前线程池的线程数量超过了corePoolSize且小于maximumPoolSize,而且workQueue已满时,则能够增长工做线程,但这时若是超时没有获取到任务,也就是timedOut为true的状况,说明workQueue已经为空了,也就说明了当前线程池中不须要那么多线程来执行任务了,能够把多于corePoolSize数量的线程销毁掉,保持线程数量在corePoolSize便可。
何时会销毁?固然是runWorker方法执行完以后,也就是Worker中的run方法执行完,由JVM自动回收。
getTask方法返回null时,在runWorker方法中会跳出while循环,而后会执行processWorkerExit方法。
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 若是completedAbruptly值为true,则说明线程执行时出现了异常,须要将workerCount减1; // 若是线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount进行了减1操做,这里就没必要再减了。 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //统计完成的任务数 completedTaskCount += w.completedTasks; // 从workers中移除,也就表示着从线程池中移除了一个工做线程 workers.remove(w); } finally { mainLock.unlock(); } // 根据线程池状态进行判断是否结束线程池 tryTerminate(); int c = ctl.get(); /* * 当线程池是RUNNING或SHUTDOWN状态时,若是worker是异常结束,那么会直接addWorker; * 若是allowCoreThreadTimeOut=true,而且等待队列有任务,至少保留一个worker; * 若是allowCoreThreadTimeOut=false,workerCount很多于corePoolSize。 */ if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
至此,processWorkerExit执行完以后,工做线程被销毁,以上就是整个工做线程的生命周期,从execute方法开始,Worker使用ThreadFactory建立新的工做线程,runWorker经过getTask获取任务,而后执行任务,若是getTask返回null,进入processWorkerExit方法,整个线程结束,如图所示:
tryTerminate方法根据线程池状态进行判断是否结束线程池,代码以下:
final void tryTerminate() { for (;;) { int c = ctl.get(); /* * 当前线程池的状态为如下几种状况时,直接返回: * 1. RUNNING,由于还在运行中,不能中止; * 2. TIDYING或TERMINATED,由于线程池中已经没有正在运行的线程了; * 3. SHUTDOWN而且等待队列非空,这时要执行完workQueue中的task; */ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 若是线程数量不为0,则中断一个空闲的工做线程,并返回 if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 这里尝试设置状态为TIDYING,若是设置成功,则调用terminated方法 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // terminated方法默认什么都不作,留给子类实现 terminated(); } finally { // 设置状态为TERMINATED ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
interruptIdleWorkers(ONLY_ONE);
的做用是由于在getTask方法中执行workQueue.take()
时,若是不执行中断会一直阻塞。在下面介绍的shutdown方法中,会中断全部空闲的工做线程,若是在执行shutdown时工做线程没有空闲,而后又去调用了getTask方法,这时若是workQueue中没有任务了,调用workQueue.take()
时就会一直阻塞。因此每次在工做线程结束时调用tryTerminate方法来尝试中断一个空闲工做线程,避免在队列为空时取任务一直阻塞的状况。
shutdown方法要将线程池切换到SHUTDOWN状态,并调用interruptIdleWorkers方法请求中断全部空闲的worker,最后调用tryTerminate尝试结束线程池。
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 安全策略判断 checkShutdownAccess(); // 切换状态为SHUTDOWN advanceRunState(SHUTDOWN); // 中断空闲线程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 尝试结束线程池 tryTerminate(); }
这里思考一个问题:在runWorker方法中,执行任务时对Worker对象w进行了lock操做,为何要在执行任务的时候对每一个工做线程都加锁呢?
下面仔细分析一下:
workQueue.take()
进行阻塞;workQueue.take()
后会一直阻塞而不会被销毁,由于在SHUTDOWN状态下不容许再有新的任务添加到workQueue中,这样一来线程池永远都关闭不了了;workQueue.take()
时,若是发现当前线程在执行以前或者执行期间是中断状态,则会抛出InterruptedException,解除阻塞的状态;下面就来分析一下interruptIdleWorkers方法。
private void interruptIdleWorkers() { interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
interruptIdleWorkers遍历workers中全部的工做线程,若线程没有被中断tryLock成功,就中断该线程。
为何须要持有mainLock?由于workers是HashSet类型的,不能保证线程安全。
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); // 中断全部工做线程,不管是否空闲 interruptWorkers(); // 取出队列中没有被执行的任务 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
shutdownNow方法与shutdown方法相似,不一样的地方在于:
shutdownNow方法执行完以后调用tryTerminate方法,该方法在上文已经分析过了,目的就是使线程池的状态设置为TERMINATED。
经过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可使用
经过这些方法,能够对线程池进行监控,在ThreadPoolExecutor类中提供了几个空方法,如beforeExecute方法,afterExecute方法和terminated方法,能够扩展这些方法在执行前或执行后增长一些新的操做,例如统计线程池的执行任务的时间等,能够继承自ThreadPoolExecutor来进行扩展。
本文比较详细的分析了线程池的工做流程,整体来讲有以下几个内容:
在向线程池提交任务时,除了execute方法,还有一个submit方法,submit方法会返回一个Future对象用于获取返回值,有关Future和Callable请自行了解一下相关的文章,这里就不介绍了。