目录java
在Web开发中,若是要密集处理多个任务时,相对于每次都一个建立线程去执行任务,新建线程来执行任务相对来讲是个更好的选择,体如今如下三点:源码分析
下面从最经常使用的线程池ThreadPoolExecutor的源码分析如何实现线程池。ui
Executor是最基础的执行接口,只提供了一个execute(Runnable command)提交任务方法;ExecutorService接口继承了Executor,在其上作了一些shutdown()、submit()的扩展,能够说是真正的线程池接口AbstractExecutorService抽象类实现了ExecutorService接口中的大部分方法;TheadPoolExecutor继承了AbstractExecutorService,是线程池的具体实现。
this
public class ThreadPoolExecutor extends AbstractExecutorService { // 线程池的控制状态(用来表示线程池的运行状态(整形的高3位)和运行的worker数量(低29位)) private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 偏移量 private static final int COUNT_BITS = Integer.SIZE - 3; // 最大工做线程数量(2^29 - 1) private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits // 线程运行状态,总共有5个状态,须要3位来表示(因此偏移量的29 = 32 - 3) private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // 阻塞队列,存放提交给线程池的任务 private final BlockingQueue<Runnable> workQueue; // 可重入锁 private final ReentrantLock mainLock = new ReentrantLock(); // 存放工做线程集合 private final HashSet<Worker> workers = new HashSet<Worker>(); // 终止条件 private final Condition termination = mainLock.newCondition(); // 最大线程池容量 private int largestPoolSize; // 已完成任务数量 private long completedTaskCount; // 线程工厂 private volatile ThreadFactory threadFactory; // 拒绝执行处理器 private volatile RejectedExecutionHandler handler; // 线程等待运行时间 private volatile long keepAliveTime; // 是否运行核心线程超时 private volatile boolean allowCoreThreadTimeOut; // 核心池的大小 private volatile int corePoolSize; // 最大线程池大小 private volatile int maximumPoolSize; // 默认拒绝执行处理器 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); }
线程池自己有两个很重要的状态信息:线程池的运行状态和工做线程数,这两个状态信息都包含在变量ctl(int型,32位)中:ctl的高3位表示线程状态runState,低29位表示工做线程worker的数量workCount。线程状态信息以下:线程
核心参数含义以下:code
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
execute方法是向线程池提交任务的,此时线程池的状态为RUNNING(其余状态不接收新提交的任务),主要判断:对象
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //ctl记录线程池状态信息和线程池线程数 int c = ctl.get(); //比较当前线程数是否小于corePoolSize,若是小于则新建一个线程放入线程池中 if (workerCountOf(c) < corePoolSize) { //成功加入则返回 if (addWorker(command, true)) return; //加入失败,从新获取ctl c = ctl.get(); } //若是当前线程数大于等于corePoolSize,判断线程池是否仍在运行,是的话加入阻塞队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //再次检查线程池是否仍在运行 if (! isRunning(recheck) && remove(command)) reject(command); /** 线程池在运行可是工做线程数为0,此时可能阻塞队列有任务但线程池没有工做线程池, * 若是配置了参数allowCoreThreadTimeOut(默认是false)为true可能由于核心线程执行 * 完任务且阻塞队列也没有线程等待获取任务,此时属于空闲线程,因为超时会回收核心线程 **/ else if (workerCountOf(recheck) == 0) /** 传false将会在addWorker方法中判断线程池的工做线程数量和最大线程数量作比较 * 传一个空的任务,开启一个工做线程,但这个工做线程会发现当前的任务是空,而后会去队列中取任务 * 这样就避免了线程池的状态是running,并且队列中还有任务,但线程池却不执行队列中的任务 **/ addWorker(null, false); } /** * 若是执行到这里,有两种状况: * 1. 线程池已经不是RUNNING状态; * 2. 线程池是RUNNING状态,但workerCount >= corePoolSize而且workQueue已满。 * 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为 * maximumPoolSize;若是失败则拒绝该任务 **/ else if (!addWorker(command, false)) reject(command); }
addWorker方法用与建立工做线程,firstTask表示第一个任务,core为true那么线程数受corePoolSize制约,为false则受maximumPoolSize制约。执行流程:blog
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); //运行状态 int rs = runStateOf(c); /** * 若是rs >= SHUTDOWN,则表示此时再也不接收新任务 * 知足rs >= SHUTDOWN条件后接着判断如下3个条件,只要有1个不知足,则返回false: * 1. rs == SHUTDOWN,这时表示关闭状态,再也不接受新提交的任务,但却能够继续处理阻塞队列中已保 * 存的任务 2. firsTask为空 3. 阻塞队列不为空 **/ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //当前工做线程数 int wc = workerCountOf(c); //检查线程数量是否超出 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 尝试CAS增长workerCount,若是成功,则跳出第一个for循环 if (compareAndIncrementWorkerCount(c)) break retry; //CAS失败,从新获取ctl的值 c = ctl.get(); // Re-read ctl // 若是当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行 if (runStateOf(c) != rs) continue retry; } } //CAS增长workCount成功,退出循环进入到这里 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 根据firstTask来建立Worker对象 w = new Worker(firstTask); // 每个Worker对象都会建立一个线程 final Thread t = w.thread; if (t != null) { //上锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); // rs < SHUTDOWN表示是RUNNING状态; // 若是rs是RUNNING状态或者rs是SHUTDOWN状态而且firstTask为null,向线程池中添加线程。 // 由于在SHUTDOWN时不会在添加新的任务,但仍是会执行workQueue中的任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); //将工做线程work加入到HashSet对象workers workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } //启动线程 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
线程池的工做线程是经过包装成Worker对象,Worker类自己既实现了Runnable接口,又继承了同步器AQS,实现了一个简易的不可重入的互斥锁,经过同步状态state控制中断:继承
private final class Worker extends AbstractQueuedSynchronizerimplements Runnable{ private static final long serialVersionUID = 6138294804551838833L; //工做线程 final Thread thread; //新建Worker传入的任务command,可能为null Runnable firstTask; //执行完的任务数量 volatile long completedTasks; //同步状态state为0表明为锁定,state为1表明锁定,state为-1表明初始状态 Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; //建立线程 this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } }
runWork是工做线程执行任务的方法,执行过程以下:接口
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; //同步状态state设置为0,容许中断 w.unlock(); // allow interrupts //用于标识是否工做线程因为异常忽然终止,在执行任务抛出异常或线程被中断两种状况为true boolean completedAbruptly = true; try { //循环取任务执行 while (task != null || (task = getTask()) != null) { //上锁,表示正在工做线程正在执行任务,不能响应中断 w.lock(); /** * 确保在线程池状态在STOP及以上时,才会被设置中断标示,不然清除中断标示,判断如下两个条件: * 一、若是线程池状态>=stop,且当前线程没有设置中断状态,wt.interrupt() * 二、若是一开始判断线程池状态<stop,但Thread.interrupted()为true,即线程已经被中断,又 * 清除了中断标示,再次判断线程池状态是否>=stop(可能调用了shutdownNow关闭线程池) **/ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { //执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
当工做线程数达到corePoolSize,后续提交的任务就会放到阻塞队列workQueue中,工做线程经过getTask方法从阻塞队列取出任务,执行如下步骤:
private Runnable getTask() { // timeOut变量的值表示上次从阻塞队列中取任务时是否超时 boolean timedOut = false; for (;;) { int c = ctl.get(); int rs = runStateOf(c); /** * 1.rs>SHUTDOWN 因此rs至少等于STOP,这时再也不处理队列中的任务,无论workQueue是否为空都返回null * 2.rs = SHUTDOWN 因此rs>=STOP确定不成立,这时还须要处理队列中的任务除非workQueue为空 * 若是以上条件知足,则将workerCount减1并返回null。由于若是当前线程池状态的值是SHUTDOWN * 或以上时,不容许再向阻塞队列中添加任务。 */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); /** * timed表示工做线程是否须要剔除,为true * allowCoreThreadTimeOut默认为false,表示核心线程不作超时控制 * wc > corePoolSize 超过核心线程数 * timed为true下面的if条件经过返回null,从而剔除掉超过corePoolSize数目的线程,使线程数 * 回复corePoolSize **/ boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /** * 条件1: * wc > maximumPoolSize 检查是否超出maximumPoolSize,线程池可能重置了maximumPoolSize * timed && timedOut 当前线程须要超时控制且上次取任务超时为true * 条件2:若是线程数量大于1,或者阻塞队列是空的 * 两个条件都为true把workCount减一,返回null **/ if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; //CAS失败从新循环 continue; } try { /** * 根据timed来判断,若是为true,则经过阻塞队列的poll方法进行超时控制,若是在 * keepAliveTime时间内没有获取到任务,则返回null; * 不然经过take方法,若是这时队列为空,则take方法会阻塞直到队列不为空。 **/ Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; //若是r==null,说明是超时了 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
当getTask返回null,会跳出runWork的while循环,此时工做线程的run方法执行完毕,线程会终止,同时会执行processWorkerExit方法,步骤以下:
private void processWorkerExit(Worker w, boolean completedAbruptly) { //若是是忽然终止,从新调整workCount if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //统计完成的任务数 completedTaskCount += w.completedTasks; //从集合中移出Worker对象 workers.remove(w); } finally { mainLock.unlock(); } // 根据线程池状态进行判断是否结束线程池 tryTerminate(); int c = ctl.get(); //线程状态小于STOP,即线程池处于RUNNING或SHUTDOWN状态 if (runStateLessThan(c, STOP)) { //检查是否异常终止 if (!completedAbruptly) { //若是allowCoreThreadTimeOut=true,而且等待队列有任务,至少保留一个worker; //若是allowCoreThreadTimeOut=false,workerCount很多于corePoolSize。 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } //忽然终止,添加一个Worker addWorker(null, false); } }
关闭线程池,线程池状态由RUNNING变为SHUTDOWN,只处理已有任务再也不接收新提交的任务,中断空闲线程。
为何要中断空闲线程:当线程池状态为RUNNING可是阻塞队列为空,allowCoreThreadTimeOut为默认值false(既不支持核心线程超时回收),那么工做线程必然堵塞在workQueue.take()方法上,而调用了shutdown()方法后线程池状态变为SHUTDOWN不接收新提交的任务,那么阻塞队列永远为空,因此须要经过中断让线程由阻塞状态返回null。
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //检查是否有关闭线程池权限 checkShutdownAccess(); //把线程池运行状态切换为SHUTDOWN advanceRunState(SHUTDOWN); //中断空闲线程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
中断空闲线程。
private void interruptIdleWorkers() { //false代表中断全部空闲线程 interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; // t.isInterrupted()检查线程是否已经中断过 // w.tryLock() runWork在执行任务会上锁,执行完解锁去阻塞队列得到任务,若是tryLock成功 //说明没有执行任务,是空闲线程。 if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
根据线程池状态尝试关闭线程池。这里解释一下interruptIdleWorkers(ONLY_ONE):
当到达workerCountOf(c) != 0这个判断时,说明线程池处于SHUTDOWN状态,且阻塞队列已经为空,这是若判断成立,那么还有工做线程等待在线程池上,会中断一个空闲线程,这个被中断的空闲线程的Worker返回null又会调用tryTerminate,从而把线程池关闭的消息传给每一个线程,回收空闲线程。
final void tryTerminate() { for (;;) { int c = ctl.get(); /* * 当前线程池的状态为如下几种状况时,直接返回: * 1. RUNNING,由于还在运行中,不能中止; * 2. TIDYING或TERMINATED,由于线程池中已经没有正在运行的线程了; * 3. SHUTDOWN而且等待队列非空,这时要执行完workQueue中的task; */ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; //工做线程数不为0 if (workerCountOf(c) != 0) { // Eligible to terminate //中断一个空闲线程(等待在阻塞队列上获取任务的线程) //中断的线程在回收Worker时还会调用tryTerminate方法,从而回收空闲线程 interruptIdleWorkers(ONLY_ONE); return; } //到这里说明工做线程数workCount为0,线程池状态置为TIDYING final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
关闭线程池,运行状态修改成 STOP, 中断全部线程; 并返回未处理的任务
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); //更改线程池状态 advanceRunState(STOP); // 中断全部工做线程,不管是否空闲 interruptWorkers(); //取出阻塞队列中没有被执行的任务 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
不论线程是否空闲,中断全部线程。
private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) w.interruptIfStarted(); } finally { mainLock.unlock(); } } void interruptIfStarted() { Thread t; /** * getState() >= 0 同步状态state=-1线程还没启动,大于等于0说明线程以及启动,处于 * 执行任务或空闲状态。 * (t = thread) != null 线程不为null * !t.isInterrupted() 检查线程是否被中断过。 **/ if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }
本文分析了线程池ThreadPoolExecutor的实现,主要从向线程池提交任务和关闭线程池这两个方法分析的,了解了线程池复用线程资源减小线程建立和切换的开销背后的秘密。