在平常的开发调试中,咱们常常会直接new一个Thread对象来执行某个任务。这种方式在任务数较少的状况下比较简单实用,可是在并发量较大的场景中却有着致命的缺陷。例如在访问量巨大的网站中,若是每一个请求都开启一个线程来处理的话,即便是再强大的服务器也支撑不住。一台电脑的CPU资源是有限的,在CPU较为空闲的状况下,新增线程能够提升CPU的利用率,达到提高性能的效果。可是在CPU满载运行的状况下,再继续增长线程不只不能提高性能,反而由于线程的竞争加大而致使性能降低,甚至致使服务器宕机。所以,在这种状况下咱们能够利用线程池来使线程数保持在合理的范围内,使得CPU资源被充分的利用,且避免因过载而致使宕机的危险。在Executors中为咱们提供了多种静态工厂方法来建立各类特性的线程池,其中大多数是返回ThreadPoolExecutor对象。所以本篇咱们从ThreadPoolExecutor类着手,深刻探究线程池的实现机制。缓存
1. 线程池状态和线程数的表示服务器
1 //高3位表示线程池状态, 后29位表示线程个数 2 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 3 private static final int COUNT_BITS = Integer.SIZE - 3; 4 private static final int CAPACITY = (1 << COUNT_BITS) - 1; 5 6 //运行状态 例:11100000000000000000000000000000 7 private static final int RUNNING = -1 << COUNT_BITS; 8 9 //关闭状态 例:00000000000000000000000000000000 10 private static final int SHUTDOWN = 0 << COUNT_BITS; 11 12 //中止状态 例:00100000000000000000000000000000 13 private static final int STOP = 1 << COUNT_BITS; 14 15 //整理状态 例:01000000000000000000000000000000 16 private static final int TIDYING = 2 << COUNT_BITS; 17 18 //终止状态 例:01100000000000000000000000000000 19 private static final int TERMINATED = 3 << COUNT_BITS; 20 21 private static int runStateOf(int c) { return c & ~CAPACITY; } 22 private static int workerCountOf(int c) { return c & CAPACITY; } 23 private static int ctlOf(int rs, int wc) { return rs | wc; }
在继续接下来的探究以前,咱们先来搞清楚ThreadPoolExecutor是怎样存放状态信息和线程数信息的。ThreadPoolExecutor利用原子变量ctl来同时存储运行状态和线程数的信息,其中高3位表示线程池的运行状态(runState),后面的29位表示线程池中的线程数(workerCount)。上面代码中,runStateOf方法是从ctl取出状态信息,workerCountOf方法是从ctl取出线程数信息,ctlOf方法是将状态信息和线程数信息糅合进ctl中。具体的计算过程以下图所示。并发
2. 线程池各个状态的具体含义性能
就像人的生老病死同样,线程池也有本身的生命周期,从建立到终止,线程池在每一个阶段所作的事情是不同的。新建一个线程池时它的状态为Running,这时它不断的从外部接收并处理任务,当处理不过来时它会把任务放到任务队列中;以后咱们可能会调用shutdown()来终止线程池,这时线程池的状态从Running转为Shutdown,它开始拒绝接收从外部传过来的任务,可是会继续处理完任务队列中的任务;咱们也可能调用shutdownNow()来马上中止线程池,这时线程池的状态从Running转为Stop,而后它会快速排空任务队列中的任务并转到Tidying状态,处于该状态的线程池须要执行terminated()来作相关的扫尾工做,执行完terminated()以后线程池就转为Terminated状态,表示线程池已终止。这些状态的转换图以下所示。网站
3. 关键成员变量的介绍ui
1 //任务队列 2 private final BlockingQueue<Runnable> workQueue; 3 4 //工做者集合 5 private final HashSet<Worker> workers = new HashSet<Worker>(); 6 7 //线程达到的最大值 8 private int largestPoolSize; 9 10 //已完成任务总数 11 private long completedTaskCount; 12 13 //线程工厂 14 private volatile ThreadFactory threadFactory; 15 16 //拒绝策略 17 private volatile RejectedExecutionHandler handler; 18 19 //闲置线程存活时间 20 private volatile long keepAliveTime; 21 22 //是否容许核心线程超时 23 private volatile boolean allowCoreThreadTimeOut; 24 25 //核心线程数量 26 private volatile int corePoolSize; 27 28 //最大线程数量 29 private volatile int maximumPoolSize; 30 31 //默认拒绝策略 32 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
在深刻探究线程池的实现机制以前,咱们有必要了解一下各个成员变量的做用。上面列出了主要的成员变量,除了一些用于统计的变量,例如largestPoolSize和completedTaskCount,其中大部分变量的值都是能够在构造时进行设置的。下面咱们就看一下它的核心构造器。this
1 //核心构造器 2 public ThreadPoolExecutor(int corePoolSize, 3 int maximumPoolSize, 4 long keepAliveTime, 5 TimeUnit unit, 6 BlockingQueue<Runnable> workQueue, 7 ThreadFactory threadFactory, 8 RejectedExecutionHandler handler) { 9 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) { 10 throw new IllegalArgumentException(); 11 } 12 if (workQueue == null || threadFactory == null || handler == null) { 13 throw new NullPointerException(); 14 } 15 this.corePoolSize = corePoolSize; //设置核心线程数量 16 this.maximumPoolSize = maximumPoolSize; //设置最大线程数量 17 this.workQueue = workQueue; //设置任务队列 18 this.keepAliveTime = unit.toNanos(keepAliveTime); //设置存活时间 19 this.threadFactory = threadFactory; //设置线程工厂 20 this.handler = handler; //设置拒绝策略 21 }
ThreadPoolExecutor有多个构造器,全部的构造器都会调用上面的核心构造器。经过核心构造器咱们能够为线程池设置不一样的参数,由此线程池也能表现出不一样的特性。所以完全搞懂这几个参数的含义能使咱们更好的使用线程池,下面咱们就来详细看一下这几个参数的含义。
corePoolSize:
核心线程数最大值,默认状况下新建线程池时并不建立线程,后续每接收一个任务就新建一个核心线程来处理,直到核心线程数达到corePoolSize。这时后面到来的任务都会被放到任务队列中等待。
maximumPoolSize:
总线程数最大值,当任务队列被放满了以后,将会新建非核心线程来处理后面到来的任务。当总的线程数达到maximumPoolSize后,将再也不继续建立线程,而是对后面的任务执行拒绝策略。
workQueue:
任务队列,当核心线程数达到corePoolSize后,后面到来的任务都会被放到任务队列中,该任务队列是阻塞队列,工做线程能够经过定时或者阻塞方式从任务队列中获取任务。
keepAliveTime:
闲置线程存活时间,该参数默认状况下只在线程数大于corePoolSize时起做用,闲置线程在任务队列上等待keepAliveTime时间后将会被终止,直到线程数减至corePoolSize。也能够经过设置allowCoreThreadTimeOut变量为true来使得keepAliveTime在任什么时候候都起做用,这时线程数最后会减至0。spa
4. execute方法的执行过程线程
1 //核心执行方法 2 public void execute(Runnable command) { 3 if (command == null) throw new NullPointerException(); 4 int c = ctl.get(); 5 //线程数若小于corePoolSize则新建核心工做者 6 if (workerCountOf(c) < corePoolSize) { 7 if (addWorker(command, true)) return; 8 c = ctl.get(); 9 } 10 //不然将任务放到任务队列 11 if (isRunning(c) && workQueue.offer(command)) { 12 int recheck = ctl.get(); 13 //若不是running状态则将该任务从队列中移除 14 if (!isRunning(recheck) && remove(command)) { 15 //成功移除后再执行拒绝策略 16 reject(command); 17 //若线程数为0则新增一个非核心线程 18 }else if (workerCountOf(recheck) == 0) { 19 addWorker(null, false); 20 } 21 //若队列已满则新增非核心工做者 22 }else if (!addWorker(command, false)) { 23 //若新建非核心线程失败则执行拒绝策略 24 reject(command); 25 } 26 }
execute方法是线程池接收任务的入口方法,当建立好一个线程池以后,咱们会调用execute方法并传入一个Runnable交给线程池去执行。从上面代码中能够看到execute方法首先会去判断当前线程数是否小于corePoolSize,若是小于则调用addWorker方法新建一个核心线程去处理该任务,不然调用workQueue的offer方法将该任务放入到任务队列中。经过offer方法添加并不会阻塞线程,若是添加成功会返回true,若队列已满则返回false。在成功将任务放入到任务队列后,还会再次检查线程池是不是Running状态,若是不是则将刚刚添加的任务从队列中移除,而后再执行拒绝策略。若是从队列中移除任务失败,则再检查一下线程数是否为0(有可能恰好所有线程都被终止了),是的话就新建一个非核心线程去处理。若是任务队列已经满了,此时offer方法会返回false,接下来会再次调用addWorker方法新增一个非核心线程来处理该任务。若是期间建立线程失败,则最后会执行拒绝策略。调试
5. 工做线程的实现
1 //工做者类 2 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { 3 //关联线程 4 final Thread thread; 5 //初始任务 6 Runnable firstTask; 7 //完成任务数 8 volatile long completedTasks; 9 10 //构造器 11 Worker(Runnable firstTask) { 12 //抑制中断直到runWorker 13 setState(-1); 14 //设置初始任务 15 this.firstTask = firstTask; 16 //设置关联线程 17 this.thread = getThreadFactory().newThread(this); 18 } 19 20 public void run() { 21 runWorker(this); 22 } 23 24 //判断是否占有锁, 0表明未占用, 1表明已占用 25 protected boolean isHeldExclusively() { 26 return getState() != 0; 27 } 28 29 //尝试获取锁 30 protected boolean tryAcquire(int unused) { 31 if (compareAndSetState(0, 1)) { 32 setExclusiveOwnerThread(Thread.currentThread()); 33 return true; 34 } 35 return false; 36 } 37 38 //尝试释放锁 39 protected boolean tryRelease(int unused) { 40 setExclusiveOwnerThread(null); 41 setState(0); 42 return true; 43 } 44 45 public void lock() { acquire(1); } 46 public boolean tryLock() { return tryAcquire(1); } 47 public void unlock() { release(1); } 48 public boolean isLocked() { return isHeldExclusively(); } 49 50 //中断关联线程 51 void interruptIfStarted() { 52 Thread t; 53 //将活动线程和闲置线程都中断 54 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { 55 try { 56 t.interrupt(); 57 } catch (SecurityException ignore) { 58 //ignore 59 } 60 } 61 } 62 }
ThreadPoolExecutor内部实现了一个Worker类,用它来表示工做线程。每一个Worker对象都持有一个关联线程和分配给它的初始任务。Worker类继承自AQS并实现了本身的加锁解锁方法,说明每一个Worker对象也是一个锁对象。同时Worker类还实现了Runnable接口,所以每一个Worker对象都是能够运行的。Worker类有一个惟一的构造器,须要传入一个初始任务给它,在构造器里面首先将同步状态设置为-1,这个操做主要是抑制中断直到runWorker方法运行,为啥要这样作?咱们继续看下去,能够看到在设置完初始任务以后,立刻就开始设置关联线程,关联线程是经过线程工厂的newThread方法来生成的,这时将Worker对象自己看成任务传给关联线程。所以在启动关联线程时(调用start方法),会运行Worker对象自身的run方法。而run方法里面紧接着调用runWorker方法,也就是说只有在runWorker方法运行时才代表关联线程已启动,这时去中断关联线程才有意义,所以前面要经过设置同步状态为-1来抑制中断。那么为啥将同步状态设置为-1就能够抑制中断?每一个Worker对象都是经过调用interruptIfStarted方法来中断关联线程的,在interruptIfStarted方法内部会判断只有同步状态>=0时才会中断关联线程。所以将同步状态设置为-1能起到抑制中断的做用。
6. 工做线程的建立
1 //添加工做线程 2 private boolean addWorker(Runnable firstTask, boolean core) { 3 retry: 4 for (;;) { 5 int c = ctl.get(); 6 int rs = runStateOf(c); 7 //只有如下两种状况会继续添加线程 8 //1.状态为running 9 //2.状态为shutdown,首要任务为空,但任务队列中还有任务 10 if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) { 11 return false; 12 } 13 for (;;) { 14 int wc = workerCountOf(c); 15 //如下三种状况不继续添加线程: 16 //1.线程数大于线程池总容量 17 //2.当前线程为核心线程,且核心线程数达到corePoolSize 18 //3.当前线程非核心线程,且总线程达到maximumPoolSize 19 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) { 20 return false; 21 } 22 //不然继续添加线程, 先将线程数加一 23 if (compareAndIncrementWorkerCount(c)) { 24 //执行成功则跳过外循环 25 break retry; 26 } 27 //CAS操做失败再次检查线程池状态 28 c = ctl.get(); 29 //若是线程池状态改变则继续执行外循环 30 if (runStateOf(c) != rs) { 31 continue retry; 32 } 33 //不然代表CAS操做失败是workerCount改变, 继续执行内循环 34 } 35 } 36 boolean workerStarted = false; 37 boolean workerAdded = false; 38 Worker w = null; 39 try { 40 final ReentrantLock mainLock = this.mainLock; 41 w = new Worker(firstTask); 42 final Thread t = w.thread; 43 if (t != null) { 44 mainLock.lock(); 45 try { 46 int c = ctl.get(); 47 int rs = runStateOf(c); 48 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { 49 //若是线程已经开启则抛出异常 50 if (t.isAlive()) throw new IllegalThreadStateException(); 51 //将工做者添加到集合中 52 workers.add(w); 53 int s = workers.size(); 54 //记录线程达到的最大值 55 if (s > largestPoolSize) { 56 largestPoolSize = s; 57 } 58 workerAdded = true; 59 } 60 } finally { 61 mainLock.unlock(); 62 } 63 //将工做者添加到集合后则启动线程 64 if (workerAdded) { 65 t.start(); 66 workerStarted = true; 67 } 68 } 69 } finally { 70 //若是线程启动失败则回滚操做 71 if (!workerStarted) { 72 addWorkerFailed(w); 73 } 74 } 75 return workerStarted; 76 }
上面咱们知道在execute方法里面会调用addWorker方法来添加工做线程。经过代码能够看到进入addWorker方法里面会有两层自旋循环,在外层循环中获取线程池当前的状态,若是线程池状态不符合就直接return,在内层循环中获取线程数,若是线程数超过限定值也直接return。只有通过这两重判断以后才会使用CAS方式来将线程数加1。成功将线程数加1以后就跳出外层循环去执行后面的逻辑,不然就根据不一样条件来进行自旋,若是是线程池状态改变就执行外层循环,若是是线程数改变就执行内层循环。当线程数成功加1以后,后面就是去新建一个Worker对象,并在构造时传入初始任务给它。而后将这个Worker对象添加到工做者集合当中,添加成功后就调用start方法来启动关联线程。
7. 工做线程的执行
1 //运行工做者 2 final void runWorker(Worker w) { 3 //获取当前工做线程 4 Thread wt = Thread.currentThread(); 5 //获取工做者的初始任务 6 Runnable task = w.firstTask; 7 //将工做者的初始任务置空 8 w.firstTask = null; 9 //将同步状态从-1设为0 10 w.unlock(); 11 boolean completedAbruptly = true; 12 try { 13 //初始任务不为空则执行初始任务, 不然从队列获取任务 14 while (task != null || (task = getTask()) != null) { 15 //确保获取到任务后才加锁 16 w.lock(); 17 //若状态大于等于stop, 保证当前线程被中断 18 //若状态小于stop, 保证当前线程未被中断 19 //在清理中断状态时可能有其余线程在修改, 因此会再检查一次 20 if ((runStateAtLeast(ctl.get(), STOP) || 21 (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) { 22 wt.interrupt(); 23 } 24 try { 25 //任务执行前作些事情 26 beforeExecute(wt, task); 27 Throwable thrown = null; 28 try { 29 //执行当前任务 30 task.run(); 31 } catch (RuntimeException x) { 32 thrown = x; throw x; 33 } catch (Error x) { 34 thrown = x; throw x; 35 } catch (Throwable x) { 36 thrown = x; throw new Error(x); 37 } finally { 38 //任务执行后作一些事情 39 afterExecute(task, thrown); 40 } 41 } finally { 42 //将执行完的任务置空 43 task = null; 44 //将完成的任务数加一 45 w.completedTasks++; 46 w.unlock(); 47 } 48 } 49 //设置该线程为正常完成任务 50 completedAbruptly = false; 51 } finally { 52 //执行完全部任务后将线程删除 53 processWorkerExit(w, completedAbruptly); 54 } 55 }
上面咱们知道,将Worker对象添加到workers集合以后就会去调用关联线程的start方法,因为传给关联线程的Runnable就是Worker对象自己,所以会调用Worker对象实现的run方法,最后会调用到runWorker方法。咱们看到上面代码,进入到runWorker方法里面首先获取了Worker对象的初始任务,而后调用unlock方法将同步状态加1,因为在构造Worker对象时将同步状态置为-1了,因此这里同步状态变回0,所以在这以后才能够调用interruptIfStarted方法来中断关联线程。若是初始任务不为空就先去执行初始任务,不然就调用getTask方法去任务队列中获取任务,能够看到这里是一个while循环,也就是说工做线程在执行完本身的任务以后会不断的从任务队列中获取任务,直到getTask方法返回null,而后工做线程退出while循环最后执行processWorkerExit方法来移除本身。若是须要在全部任务执行以前或以后作些处理,能够分别实现beforeExecute方法和afterExecute方法。
8. 任务的获取
1 //从任务队列中获取任务 2 private Runnable getTask() { 3 //上一次获取任务是否超时 4 boolean timedOut = false; 5 retry: 6 //在for循环里自旋 7 for (;;) { 8 int c = ctl.get(); 9 int rs = runStateOf(c); 10 //如下两种状况会将工做者数减为0并返回null,并直接使该线程终止: 11 //1.状态为shutdown而且任务队列为空 12 //2.状态为stop, tidying或terminated 13 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 14 decrementWorkerCount(); 15 return null; 16 } 17 18 boolean timed; 19 //判断是否要剔除当前线程 20 for (;;) { 21 int wc = workerCountOf(c); 22 //如下两种状况会在限定时间获取任务: 23 //1.容许核心线程超时 24 //2.线程数大于corePoolSize 25 timed = allowCoreThreadTimeOut || wc > corePoolSize; 26 //如下两种状况不执行剔除操做: 27 //1.上次任务获取未超时 28 //2.上次任务获取超时, 但没要求在限定时间获取 29 if (wc <= maximumPoolSize && !(timedOut && timed)) { 30 break; 31 } 32 //若上次任务获取超时, 且规定在限定时间获取, 则将线程数减一 33 if (compareAndDecrementWorkerCount(c)) { 34 //CAS操做成功后直接返回null 35 return null; 36 } 37 //CAS操做失败后再次检查状态 38 c = ctl.get(); 39 //若状态改变就从外层循环重试 40 if (runStateOf(c) != rs) { 41 continue retry; 42 } 43 //不然代表是workerCount改变, 继续在内循环重试 44 } 45 46 try { 47 //若timed为true, 则在规定时间内返回 48 //若timed为false, 则阻塞直到获取成功 49 //注意:闲置线程会一直在这阻塞 50 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); 51 //获取任务不为空则返回该任务 52 if (r != null) { 53 return r; 54 } 55 //不然将超时标志设为true 56 timedOut = true; 57 } catch (InterruptedException retry) { 58 timedOut = false; 59 } 60 } 61 }
工做线程在while循环里不断的经过getTask方法来从任务队列中获取任务,咱们看一下getTask方法是怎样获取任务的。进入第一个for循环以后有一个if判断,从这里咱们能够看到,若是线程池状态为shutdown,会继续消费任务队列里面的任务;若是线程池状态为stop,则中止消费任务队列里剩余的任务。进入第二个for循环后会给timed变量赋值,因为allowCoreThreadTimeOut变量默认是false,因此timed的值取决于线程数是否大于corePoolSize,小于为false,大于则为true。从任务队列里面获取任务的操做在try块里面,若是timed为true,则调用poll方法进行定时获取;若是timed为flase,则调用take方法进行阻塞获取。也就是说默认状况下,若是线程数小于corePoolSize,则调用take方法进行阻塞获取,即便任务队列为空,工做线程也会一直等待;若是线程数大于corePoolSize,则调用poll方法进行定时获取,在keepAliveTime时间内获取不到任务则会返回null,对应的工做线程也会被移除,但线程数会保持在corePoolSize上。固然若是设置allowCoreThreadTimeOut为true,则会一直经过调用poll方法来从任务队列中获取任务,若是任务队列长时间为空,则工做线程会减小至0。
9. 工做线程的退出
1 //删除工做线程 2 private void processWorkerExit(Worker w, boolean completedAbruptly) { 3 //若非正常完成则将线程数减为0 4 if (completedAbruptly) { 5 decrementWorkerCount(); 6 } 7 final ReentrantLock mainLock = this.mainLock; 8 mainLock.lock(); 9 try { 10 //统计完成的任务总数 11 completedTaskCount += w.completedTasks; 12 //在这将工做线程移除 13 workers.remove(w); 14 } finally { 15 mainLock.unlock(); 16 } 17 //尝试终止线程池 18 tryTerminate(); 19 //再次检查线程池状态 20 int c = ctl.get(); 21 //若状态为running或shutdown, 则将线程数恢复到最小值 22 if (runStateLessThan(c, STOP)) { 23 //线程正常完成任务被移除 24 if (!completedAbruptly) { 25 //容许核心线程超时最小值为0, 不然最小值为核心线程数 26 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; 27 //若是任务队列还有任务, 则保证至少有一个线程 28 if (min == 0 && !workQueue.isEmpty()) { 29 min = 1; 30 } 31 //若线程数大于最小值则不新增了 32 if (workerCountOf(c) >= min) { 33 return; 34 } 35 } 36 //新增工做线程 37 addWorker(null, false); 38 } 39 }
工做线程若是从getTask方法中得到null,则会退出while循环并随后执行processWorkerExit方法,该方法会在这个工做线程终止以前执行一些操做,咱们看到它会去统计该工做者完成的任务数,而后将其从workers集合中删除,每删除一个工做者以后都会去调用tryTerminate方法尝试终止线程池,但并不必定会真的终止线程池。从tryTerminate方法返回后再次去检查一遍线程池的状态,若是线程池状态为running或者shutdown,而且线程数小于最小值,则恢复一个工做者。这个最小值是怎样计算出来的呢?咱们来看看。若是allowCoreThreadTimeOut为true则最小值为0,不然最小值为corePoolSize。但还有一个例外状况,就是虽然容许核心线程超时了,可是若是任务队列不为空的话,那么必须保证有一个线程存在,所以这时最小值设为1。后面就是判断若是工做线程数大于最小值就不新增线程了,不然就新增一个非核心线程。从这个方法能够看到,每一个线程退出时都会去判断要不要再恢复一个线程,所以线程池中的线程总数也是动态增减的。
10. 线程池的终止
1 //平缓关闭线程池 2 public void shutdown() { 3 final ReentrantLock mainLock = this.mainLock; 4 mainLock.lock(); 5 try { 6 //检查是否有关闭的权限 7 checkShutdownAccess(); 8 //将线程池状态设为shutdown 9 advanceRunState(SHUTDOWN); 10 //中断闲置的线程 11 interruptIdleWorkers(); 12 //对外提供的钩子 13 onShutdown(); 14 } finally { 15 mainLock.unlock(); 16 } 17 //尝试终止线程池 18 tryTerminate(); 19 } 20 21 //马上关闭线程池 22 public List<Runnable> shutdownNow() { 23 List<Runnable> tasks; 24 final ReentrantLock mainLock = this.mainLock; 25 mainLock.lock(); 26 try { 27 //检查是否有关闭的权限 28 checkShutdownAccess(); 29 //将线程池状态设为stop 30 advanceRunState(STOP); 31 //中断全部工做线程 32 interruptWorkers(); 33 //排干任务队列 34 tasks = drainQueue(); 35 } finally { 36 mainLock.unlock(); 37 } 38 //尝试终止线程池 39 tryTerminate(); 40 return tasks; 41 }
能够经过两个方法来终止线程池,经过调用shutdown方法能够平缓的终止线程池,经过调用shutdownNow方法能够当即终止线程池。调用shutdown()方法后首先会将线程池状态设置为shutdown,这时线程池会拒绝接收外部传过来的任务,而后调用interruptIdleWorkers()方法中断闲置线程,剩余的线程会继续消费完任务队列里的任务以后才会终止。调用shutdownNow()方法会将线程池状态设置为stop,这是线程池也再也不接收外界的任务,而且立刻调用interruptWorkers()方法将全部工做线程都中断了,而后排干任务队列里面没有被处理的任务,最后返回未被处理的任务集合。调用shutdown()和shutdownNow()方法后还未真正终止线程池,这两个方法最后都会调用tryTerminate()方法来终止线程池。咱们看看该方法的代码。
1 //尝试终止线程池 2 final void tryTerminate() { 3 for (;;) { 4 int c = ctl.get(); 5 //如下两种状况终止线程池,其余状况直接返回: 6 //1.状态为stop 7 //2.状态为shutdown且任务队列为空 8 if (isRunning(c) || runStateAtLeast(c, TIDYING) || 9 (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty())) { 10 return; 11 } 12 //若线程不为空则中断一个闲置线程后直接返回 13 if (workerCountOf(c) != 0) { 14 interruptIdleWorkers(ONLY_ONE); 15 return; 16 } 17 final ReentrantLock mainLock = this.mainLock; 18 mainLock.lock(); 19 try { 20 //将状态设置为tidying 21 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { 22 try { 23 //线程池终止后作的事情 24 terminated(); 25 } finally { 26 //将状态设置为终止状态(TERMINATED) 27 ctl.set(ctlOf(TERMINATED, 0)); 28 //唤醒条件队列全部线程 29 termination.signalAll(); 30 } 31 return; 32 } 33 } finally { 34 mainLock.unlock(); 35 } 36 //若状态更改失败则再重试 37 } 38 }
tryTerminate()方法在其余不少地方也被调用过,好比processWorkerExit()和addWorkerFailed()。调用该方法来尝试终止线程池,在进入for循环后第一个if判断过滤了不符合条件的终止操做,只有状态为stop,或者状态为shutdown且任务队列为空这两种状况才能继续执行。第二个if语句判断工做者数量是否为0,不为0的话也直接返回。通过这两重判断以后才符合终止线程池的条件,因而先经过CAS操做将线程池状态设置为tidying状态,在tidying状态会调用用户本身实现的terminated()方法来作一些处理。到了这一步,无论terminated()方法是否成功执行最后都会将线程池状态设置为terminated,也就标志着线程池真正意义上的终止了。最后会唤醒全部等待线程池终止的线程,让它们继续执行。
11. 经常使用线程池参数配置
Executors中有许多静态工厂方法来建立线程池,在平时使用中咱们都是经过Executors的静态工厂方法来建立线程池的。这其中有几个使用线程池的典型例子咱们来看一下。
1 //固定线程数的线程池 2 //注:该线程池将corePoolSize和maximumPoolSize都设置为同一数值,线程池刚建立时线程数为0, 3 //以后每接收一个任务建立一个线程,直到线程数达到nThreads,此后线程数再也不增加。若是其中有某个 4 //线程由于发生异常而终止,线程池将补充一个新的线程。 5 public static ExecutorService newFixedThreadPool(int nThreads) { 6 return new ThreadPoolExecutor(nThreads, nThreads, 7 0L, TimeUnit.MILLISECONDS, 8 new LinkedBlockingQueue<Runnable>()); 9 } 10 11 //单个线程的线程池 12 //注:该线程池将corePoolSize和maximumPoolSize都设置为1,所以线程池中永远只有一个线程, 13 //若是该线程由于不可预知的异常而被终止,线程池将会补充一个新的线程。 14 public static ExecutorService newSingleThreadExecutor() { 15 return new FinalizableDelegatedExecutorService 16 (new ThreadPoolExecutor(1, 1, 17 0L, TimeUnit.MILLISECONDS, 18 new LinkedBlockingQueue<Runnable>())); 19 } 20 21 //可缓存的线程池 22 //注:该线程池将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE, 23 //空闲线程存活时间设置为60S。也就是说该线程池一开始线程数为0,随着任务数的增长线程数也相应 24 //增长,线程数的上限为Integer.MAX_VALUE。当任务数减小时线程数也随之减小,最后会减小至0。 25 public static ExecutorService newCachedThreadPool() { 26 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 27 60L, TimeUnit.SECONDS, 28 new SynchronousQueue<Runnable>()); 29 }