前面咱们在java线程池ThreadPoolExecutor类使用详解中对ThreadPoolExector线程池类的使用进行了详细阐述,这篇文章咱们对其具体的源码进行一下分析和总结;
html
首先咱们看下ThreadPoolExecutor用来表示线程池状态的核心变量java
//用来标记线程池状态(高3位),线程个数(低29位) //默认是RUNNING状态,线程个数为0 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //线程个数掩码位数 private static final int COUNT_BITS = Integer.SIZE - 3; //线程最大个数(低29位)00011111111111111111111111111111 private static final int CAPACITY = (1 << COUNT_BITS) - 1; //(高3位):11100000000000000000000000000000 //接受新任务而且处理阻塞队列里的任务 private static final int RUNNING = -1 << COUNT_BITS; //(高3位):00000000000000000000000000000000 //拒绝新任务可是处理阻塞队列里的任务 private static final int SHUTDOWN = 0 << COUNT_BITS; //(高3位):00100000000000000000000000000000 //拒绝新任务而且抛弃阻塞队列里的任务同时会中断正在处理的任务 private static final int STOP = 1 << COUNT_BITS; //(高3位):01000000000000000000000000000000 //全部任务都执行完(包含阻塞队列里面任务)当前线程池活动线程为0,将要调用terminated方法 private static final int TIDYING = 2 << COUNT_BITS; //(高3位):01100000000000000000000000000000 //终止状态。terminated方法调用完成之后的状态 private static final int TERMINATED = 3 << COUNT_BITS;// // 获取高三位 运行状态 private static int runStateOf(int c) { return c & ~CAPACITY; } //获取低29位 线程个数 private static int workerCountOf(int c) { return c & CAPACITY; } //计算ctl新值,线程状态 与 线程个数 private static int ctlOf(int rs, int wc) { return rs | wc; }
ThreadPoolExecutor核心函数less
//提交任务函数 void execute(Runnable command) //执行拒绝策略的函数 void reject(Runnable command) //新增WOKER线程函数 boolean addWorker(Runnable firstTask, boolean core) //WOKER线程执行函数 void runWorker(Worker w) //获取执行任务函数 Runnable getTask() //线程池中止函数 void shutdown() //线程池当即中止函数 void shutdownNow()
接下来咱们围绕这几个核心函数对ThreadPoolExector线程池类的执行流程和源码进行分享ide
execute 作为ThreadPoolExector的提交任务的函数,注解上已经说明了其实现的主要三步操做:函数
一、若是运行的线程小于corePoolSize,则尝试使用用户定义的Runnalbe对象建立一个新的线程,调用addWorker函数会原子性的检查runState和workCount,经过返回false来防止在不该该添加线程时添加了线程。
2. 若是一个任务可以成功入队列,在添加一个线城时仍须要进行双重检查(由于在前一次检查后该线程死亡了),或者当进入到此方法时,线程池已经shutdown了,因此须要再次检查状态,如有必要,当中止时还须要回滚入队列操做,或者当线程池没有线程时须要建立一个新线程。
3. 若是没法入队列,那么须要增长一个新线程,若是此操做失败,那么就意味着线程池已经shutdown或者已经饱和了,因此拒绝任务
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. 若是运行的线程小于corePoolSize,则尝试使用用户定义的Runnalbe对象建立一个新的线程, 调用addWorker函数会原子性的检查runState和workCount, 经过返回false来防止在不该该添加线程时添加了线程。 * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. 若是一个任务可以成功入队列,在添加一个线城时仍须要进行双重检查(由于在前一次检查后该线程死亡了), 或者当进入到此方法时,线程池已经shutdown了,因此须要再次检查状态, 如有必要,当中止时还须要回滚入队列操做, 或者当线程池没有线程时须要建立一个新线程。 * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. 若是没法入队列,那么须要增长一个新线程, 若是此操做失败,那么就意味着线程池已经shutdown或者已经饱和了,因此拒绝任务 */ //获取线程池的运行状态 int c = ctl.get(); //检查核心线程数量 if (workerCountOf(c) < corePoolSize) { //若是小于corePoolSize 则执行addWorker函数 if (addWorker(command, true)) return; c = ctl.get(); } //判断当前线程是否处于Running状态其任务是否其任务是否能够继续加入workQueue队列(有界仍是无界任务队列) if (isRunning(c) && workQueue.offer(command)) { //若是知足条件,则再次进行状态检查 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) //若是线程池已经不是Running状态,在队列中移除任务,切执行绝交策略 reject(command); else if (workerCountOf(recheck) == 0) //若是worker数量是0,则添加worker addWorker(null, false); } else if (!addWorker(command, false)) //添加worker失败,执行拒绝策略 reject(command); }
经过上面的代码咱们能够看到execute函数自己并不执行提交的Runnable任务 主要做用是根据固然线程池的状态,选择执行addWorker函数仍是执行reject拒绝策略
final void reject(Runnable command) { handler.rejectedExecution(command, this); }
handler为RejectedExecutionHandler接口的具体实现,执行相应的拒绝策略
下面代码为各拒绝策略的具体实现
//若是线程池的线程数量达到上限,该策略会把任务队列中的任务放在调用者线程当中运行; public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { //若是线程池仍在运行状态,执行Runnable的run方法 r.run(); } } } //该策略会直接抛出异常,阻止系统正常工做; public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } /** * 直接抛出RejectedExecutionException异常 */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } //静默策略,不予任何处理。 public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}. */ public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } //该策略会丢弃任务队列中最老的一个任务,也就是当前任务队列中最早被添加进去,立刻要被执行的那个任务,并尝试再次提交; public static class DiscardOldestPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardOldestPolicy} for the given executor. */ public DiscardOldestPolicy() { } /** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { //直接移除任务队列中第一个任务 e.getQueue().poll(); //再次提交 e.execute(r); } } }
addworker函数主要完成了两部分功能:oop
一、经过cas的方式检查线程池状态与当前线程数量,若是符合条件 增长线程个数;post
二、若是上一部分cas检查成功,线程数已经加一,那么建立Worker对象并绑定经过线程工厂分配与启动线程;ui
咱们看下具体的代码this
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { //获取线程池状态 int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && // 线程池状态大于等于SHUTDOWN,初始的ctl为RUNNING,小于SHUTDOWN ! (rs == SHUTDOWN && //线程池状态等于SHUTDOWN firstTask == null && //传入的任务为空 ! workQueue.isEmpty())) //workQueue队列不为空 return false; for (;;) { int wc = workerCountOf(c); //获取当前的worker线程数量 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) //若是当前worker数量大于最大容量或大于设置的最大线程数,返回false return false; if (compareAndIncrementWorkerCount(c))//cas的方式增长woker线程数量 //cas增长线程数量成功,跳出循序 break retry; //cas失败了,则看线程池状态是否变化了 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) //若是发生变化则跳到外层循环从新获取线程池状态,不然内层循环从新cas。 continue retry; // else CAS failed due to workerCount change; retry inner loop } } //到这里表明CAS成功了,也就是说线程个数加一了,可是如今任务还没开始执行 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //建立一个worker对象,并经过线程工厂建立线程 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock;//加锁,保证添加worker动做的同步,由于同一时间可能会有多个线程操做 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. //从新检查,保证线程状态正常没有关闭 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable //若是线程池状态异常,抛出异常 throw new IllegalThreadStateException(); //添加worker workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //添加worker成功,启动线程 t.start(); workerStarted = true; } } } finally { if (!workerStarted) //若是添加失败,作失败处理 addWorkerFailed(w); } return workerStarted; }
经过上面的代码能够看到,线程池新增的线程最终会封装为一个Worker对象,这个对象会经过轮询的方式不断从任务队列中获取任务,并经过其绑定的线程执行,咱们看下Worker类的具体内容atom
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ //构造函数,绑定任务并经过线程工厂建立线程 Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ //run方法执行runWorker函数 public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } // protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { //线程中断 t.interrupt(); } catch (SecurityException ignore) { } } } }
在Woker类的构造函数中经过线程工厂建立了线程,当线程start时就开始执行runWorker函数。
runWork函数主要实现的功能就是while轮询方式经过getTask函数获取执行任务,咱们来看下具体的代码分析
final void runWorker(Worker w) { //获取当前线程 Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //轮询,经过getTask()获取执行的任务 while (task != null || (task = getTask()) != null) { w.lock(); //(若是线程池至少是stop状态 或 (线程池至少是stop状态且线程是否处于中断))且wt线程是否处于中断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run();//执行任务 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
若是没有获取到task任务的话,执行processWorkerExit函数
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 若是completedAbruptly值为true,则说明线程执行时出现了异常,须要将workerCount减1; // 若是线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount进行了减1操做,这里就没必要再减了。 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //统计完成的任务数 completedTaskCount += w.completedTasks; // 从workers中移除,也就表示着从线程池中移除了一个工做线程 workers.remove(w); } finally { mainLock.unlock(); } // 根据线程池状态进行判断是否结束线程池 tryTerminate(); int c = ctl.get(); /* * 当线程池是RUNNING或SHUTDOWN状态时,若是worker是异常结束,那么会直接addWorker; * 若是allowCoreThreadTimeOut=true,而且等待队列有任务,至少保留一个worker; * 若是allowCoreThreadTimeOut=false,workerCount很多于corePoolSize。 */ if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
processWorkerExit执行完以后,工做线程被销毁,以上就是整个工做线程的生命周期,从execute方法开始,Worker使用ThreadFactory建立新的工做线程,runWorker经过getTask获取任务,而后执行任务,若是getTask返回null,进入processWorkerExit方法 而其中tryTerminate()函数的主要做用就是判断是否有空闲线程,并设置中断;
final void tryTerminate() { for (;;) { int c = ctl.get(); /* * 当前线程池的状态为如下几种状况时,直接返回: * 1. RUNNING,由于还在运行中,不能中止; * 2. TIDYING或TERMINATED,由于线程池中已经没有正在运行的线程了; * 3. SHUTDOWN而且等待队列非空,这时要执行完workQueue中的task; */ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 若是线程数量不为0,则中断一个空闲的工做线程,并返回 if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 这里尝试设置状态为TIDYING,若是设置成功,则调用terminated方法 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // terminated方法默认什么都不作,留给子类实现 terminated(); } finally { // 设置状态为TERMINATED ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
getTask函数经过workQueue.take()获取任务
时,若是不执行中断会一直阻塞。在下面介绍的shutdown方法中,会中断全部空闲的工做线程,若是在执行shutdown时工做线程没有空闲,而后又去调用了getTask方法,这时若是workQueue中没有任务了,调用workQueue.take()
时就会一直阻塞。因此每次在工做线程结束时调用tryTerminate方法来尝试中断一个空闲工做线程,避免在队列为空时取任务一直阻塞的状况。
getTask函数主要完成了三块功能:
一、检查线程池及队列任务状态;
二、根据maximumPoolSize、超时时间和队列任务,控制线程数量;
三、知足条件从workQueue队列中获取任务;
private Runnable getTask() { //超时标志 boolean timedOut = false; // Did the last poll() time out? for (;;) { //获取线程池状态 int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { //若是线程池状态异常或任务队列为空,返回NULL decrementWorkerCount(); return null; } //获取当前线程数量 int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /* * wc > maximumPoolSize的状况是由于可能在此方法执行阶段同时执行了setMaximumPoolSize方法; * timed && timedOut 若是为true,表示当前操做须要进行超时控制,而且上次从阻塞队列中获取任务发生了超时 * 接下来判断,若是有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1; * 若是减1失败,则返回重试。 * 若是wc == 1时,也就说明当前线程是线程池中惟一的一个线程了。 */ if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //取出任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
shutdown函数是用来中止线程池的,调用shutdown后,线程池就不会在接受新的任务了,可是工做队列里面的任务仍是要执行的,可是该方法马上返回的,并不等待队列任务完成在返回。
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock();//加锁 try { //检查是否容许Shutdown checkShutdownAccess(); //设置线程池状态为SHUTDOWN advanceRunState(SHUTDOWN); //中断当前空闲的woker线程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }
调用shutdownNow函数后,会当即中止线程池,并丢弃和返回任务队列中的任务
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //检查是否容许Shutdown checkShutdownAccess(); //设置线程池状态为STOP advanceRunState(STOP); //中断当前空闲的woker线程 interruptWorkers(); //获取当前队列中的任务 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
下面仔细分析一下:
workQueue.take()
进行阻塞;workQueue.take()
后会一直阻塞而不会被销毁,由于在SHUTDOWN状态下不容许再有新的任务添加到workQueue中,这样一来线程池永远都关闭不了了;workQueue.take()
时,若是发现当前线程在执行以前或者执行期间是中断状态,则会抛出InterruptedException,解除阻塞的状态;下面就来分析一下interruptIdleWorkers方法。
private void interruptIdleWorkers() { interruptIdleWorkers(false); } private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
interruptIdleWorkers遍历workers中全部的工做线程,若线程没有被中断tryLock成功,就中断该线程。
本文对线程池的源码进行了基本的分析与总结,大致归纳为如下几点:一、线程池经过CAS的方式记录自己的运行状态和线程池线程个数二、每一个线程都会被封装为一个WOKER线程对象,每一个worker线程能够处理多个任务;三、线程池执行流程中要经过自身的状态来判断应该结束工做线程仍是阻塞线程等待新的任务,也解释了为何关闭线程池时要中断工做线程以及为何每个worker都须要lock。