在J.U.C之executors框架:executors框架设计理念的章节中,咱们已经简要介绍过ThreadPoolExecutor
了,经过Executors工厂,用户能够建立本身须要的执行器对象。ThreadPoolExecutor,它是J.U.C在JDK1.5时提供的一种实现了ExecutorService接口的执行器,或者说线程池。算法
public class ThreadPoolExecutor extends AbstractExecutorService {
ThreadPoolExecutor并无本身直接实现ExecutorService接口,由于它只是其中一种Executor的实现而已,因此Doug Lea把一些通用部分封装成一个抽象父类——AbstractExecutorService,供J.U.C中的其它执行器继承。若是读者须要本身实现一个Executor,也能够继承该抽象类。数据库
AbstractExecutorService提供了 ExecutorService 接口的默认实现——主要实现了 submit、invokeAny 、invokeAll这三类方法;编程
若是读者看过上一篇综述文章,就应该知道,ExecutorService的这三类方法几乎都是返回一个Future对象。而Future是一个接口,AbstractExecutorService既然实现了这些方法,必然要实现该Future接口,咱们来看下AbstractExecutorService实现的submit方法:segmentfault
public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; }
能够看到,上述方法首先对Runnable和返回值value进行了封装,经过newTaskFor
方法,封装成了一个FutureTask对象,而后经过execute方法执行任务,最后返回异步任务对象。设计模式
这里实际上是 模板方法模式的运用,execute是抽象方法,须要由继承AbstractExecutorService的子类来实现。
上述须要注意的是newTaskFor方法,该方法建立了一个Future对象:缓存
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); }
FutureTask其实就是Future接口的实现类:
安全
咱们以前讲过,J.U.C中的Future接口是“Future模式”的 多线程设计模式的实现,可让调用方以 异步方式获取任务的执行结果。而FutureTask即是这样一类支持异步返回结果的任务,既然是任务就须要实现Runnable接口,同时又要支持异步功能,因此又须要实现Future接口。J.U.C为了方便,新定义了一个接口—— RunnableFuture,该接口同时继承Runnable和Future,表明支持异步处理的任务,而FutureTask即是它的默认实现。
本节不会在Futrure模式上花费太多笔墨,之后咱们会专门讲解J.U.C对Future模式的支持。多线程
回到ThreadPoolExecutor,从该类的命名也能够看出,这是一种线程池执行器。线程池你们应该并不陌生,应用开发中常常须要用到数据库链接池,数据库链接池里维护着一些数据库链接,当应用须要链接数据库时,并非本身建立链接,而是从链接池中获取可用链接;当关闭数据库链接时,只是将该链接还给链接池,以供复用。并发
而线程池也是相似的概念,当有任务须要执行时,线程池会给该任务分配线程,若是当前没有可用线程,通常会将任务放进一个队列中,当有线程可用时,再从队列中取出任务并执行,以下图:框架
线程池的引入,主要解决如下问题:
- 减小系统由于频繁建立和销毁线程所带来的开销;
- 自动管理线程,对使用方透明,使其能够专一于任务的构建。
了解了线程池和ThreadPoolExecutor的继承体系,接下来,咱们来看下J.U.C是如何实现一个普通线程池的。
咱们先来看下ThreadPoolExecutor的构造器,其实以前在讲Executors时已经接触过了,Executors工厂方法建立的三种线程池:newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool,内部都是经过ThreadPoolExecutor的下面这个构造器实例化了ThreadPoolExecutor对象:
/** * 使用给定的参数建立ThreadPoolExecutor. * * @param corePoolSize 核心线程池中的最大线程数 * @param maximumPoolSize 总线程池中的最大线程数 * @param keepAliveTime 空闲线程的存活时间 * @param unit keepAliveTime的单位 * @param workQueue 任务队列, 保存已经提交但还没有被执行的线程 * @param threadFactory 线程工厂(用于指定若是建立一个线程) * @param handler 拒绝策略 (当任务太多致使工做队列满时的处理策略) */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); // 使用纳秒保存存活时间 this.threadFactory = threadFactory; this.handler = handler; }
为了用户使用方便,ThreadPoolExecutor一共提供了4种构造器,但其它三种内部其实都调用了上面的构造器。
正是经过上述参数的组合变换,使得Executors工厂能够建立不一样类型的线程池。这里先简要讲一下corePoolSize
和maximumPoolSize
这两个参数:
ThreadPoolExecutor在逻辑上将自身管理的线程池划分为两部分:核心线程池(大小对应为corePoolSize)、非核心线程池(大小对应为maximumPoolSize-corePoolSize)。
当咱们向线程池提交一个任务时,将建立一个工做线程——咱们称之为Worker,Worker在逻辑上从属于下图中的【核心线程池】或【非核心线程池】,具体属于哪种,要根据corePoolSize、maximumPoolSize、Worker总数进行判断:
注意:咱们上面一直在提【工做线程】、【核心线程池】、【非核心线程池】,读者可能都看晕了,包括我本身第一次学习ThreadPoolExecutor时也被网上和垃圾国产技术书籍的错误描述给误导了。我这里先提一下,后面咱们分析线程池的任务调度流程时会再详细说明:
- ThreadPoolExecutor中只有一种类型的线程,名叫Worker,它是ThreadPoolExecutor定义的内部类,同时封装着Runnable任务和执行该任务的Thread对象,咱们称它为【工做线程】,它也是ThreadPoolExecutor惟一须要进行维护的线程;
- 【核心线程池】【非核心线程池】都是逻辑上的概念,ThreadPoolExecutor在任务调度过程当中会根据
corePoolSize
和maximumPoolSize
的大小,判断应该如何调度任务.
到这里,读者可能会思考一个问题:既然是线程池,那么必然有线程池状态,同时也涉及对其中的工做线程(Worker)的管理,ThreadPoolExecutor是如何作的呢?
ThreadPoolExecutor内部定义了一个AtomicInteger变量——ctl,经过按位划分的方式,在一个变量中记录线程池状态和工做线程数——低29位保存线程数,高3位保存线程池状态:
/** * 保存线程池状态和工做线程数: * 低29位: 工做线程数 * 高3位 : 线程池状态 */ private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; // 最大线程数: 2^29-1 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 00011111 11111111 11111111 11111111 // 线程池状态 private static final int RUNNING = -1 << COUNT_BITS; // 11100000 00000000 00000000 00000000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 00000000 00000000 00000000 00000000 private static final int STOP = 1 << COUNT_BITS; // 00100000 00000000 00000000 00000000 private static final int TIDYING = 2 << COUNT_BITS; // 01000000 00000000 00000000 00000000 private static final int TERMINATED = 3 << COUNT_BITS; // 01100000 00000000 00000000 00000000
能够看到,ThreadPoolExecutor一共定义了5种线程池状态:
- RUNNING : 接受新任务, 且处理已经进入阻塞队列的任务
- SHUTDOWN : 不接受新任务, 但处理已经进入阻塞队列的任务
- STOP : 不接受新任务, 且不处理已经进入阻塞队列的任务, 同时中断正在运行的任务
- TIDYING : 全部任务都已终止, 工做线程数为0, 线程转化为TIDYING状态并准备调用terminated方法
- TERMINATED : terminated方法已经执行完成
各个状态之间的流转图:
另外,咱们刚才也提到工做线程(Worker),Worker被定义为ThreadPoolExecutor的内部类,实现了AQS框架,ThreadPoolExecutor经过一个HashSet来保存工做线程:
/** * 工做线程集合. */ private final HashSet<Worker> workers = new HashSet<Worker>();
工做线程的定义以下:
/** * Worker表示线程池中的一个工做线程, 能够与任务相关联. * 因为实现了AQS框架, 其同步状态值的定义以下: * -1: 初始状态 * 0: 无锁状态 * 1: 加锁状态 */ private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * 与该Worker关联的线程. */ final Thread thread; /** * Initial task to run. Possibly null. */ Runnable firstTask; /** * Per-thread task counter */ volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); // 初始的同步状态值 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** * 执行任务 */ public void run() { runWorker(this); } /** * 是否加锁 */ protected boolean isHeldExclusively() { return getState() != 0; } /** * 尝试获取锁 */ protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } /** * 尝试释放锁 */ protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } /** * 中断线程(仅任务非初始状态) */ void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
经过Worker的定义能够看到,每一个Worker对象都有一个Thread线程对象与它相对应,当任务须要执行的时候,实际是调用内部Thread对象的start方法,而Thread对象是在Worker的构造器中经过getThreadFactory().newThread(this)
方法建立的,建立的Thread将Worker自身做为任务,因此当调用Thread的start
方法时,最终实际是调用了Worker.run()
方法,该方法内部委托给runWorker
方法执行任务,这个方法咱们后面会详细介绍。
ThreadFactory用来建立单个线程,当线程池须要建立一个线程时,就要调用该类的newThread(Runnable r)
方法建立线程(ThreadPoolExecutor中实际建立线程的时刻是在将任务包装成工做线程Worker时)。
ThreadPoolExecutor在构造时若是用户不指定ThreadFactory,则默认使用Executors.defaultThreadFactory()
建立一个ThreadFactory,即Executors.DefaultThreadFactory:
public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory(); } /** * 默认的线程工厂. */ static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1);//用于统计线程工厂个数 private final ThreadGroup group; //用来统计每一个线程工厂建立了多少线程 private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
这里的关键是要明白为何须要用ThreadFactory来建立线程,而不是直接经过new Thread()的方式。这个问题在executors框架概述中已经谈过了,这样作的好处是: 一来解耦对象的建立与使用,二来能够 批量配置线程信息(优先级、线程名称、是否守护线程等),以自由设置池子中全部线程的状态。
ExecutorService的核心方法是submit方法——用于提交一个待执行的任务,若是读者阅读ThreadPoolExecutor的源码,会发现它并无覆写submit方法,而是沿用了父类AbstractExecutorService的模板,而后本身实现了execute
方法:
public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; }
ThreadPoolExecutor的execute方法定义以下:
public void execute(Runnable command) { //【1】若是任务为null,则抛出NPE异常 if (command == null) throw new NullPointerException(); //【2】获取当前线程池的状态+线程个数变量的组合值 int c = ctl.get(); 【3】当前线程池中线程个数是否小于corePoolSize,小于则开启新线程运行 if (workerCountOf(c) < corePoolSize) { // CASE1: 工做线程数 < 核心线程池上限 if (addWorker(command, true)) // 添加工做线程并执行 return; c = ctl.get(); } //【4】若是线程池处于RUNNING状态,则添加任务到阻塞队列 if (isRunning(c) && workQueue.offer(command)) { // CASE2: 插入任务至队列 // 【4.1】二次检查 int recheck = ctl.get(); //【4.2】若是当前线程池状态不是RUNNING则从队列中删除任务,并执行拒绝策略 if (!isRunning(recheck) && remove(command)) reject(command); //【4.3】不然若是当前线程池为空,则添加一个线程 else if (workerCountOf(recheck) == 0) addWorker(null, false); //【5】若是队列满了,则新增线程,新增失败则执行拒绝策略 } else if (!addWorker(command, false)) // CASE3: 插入队列失败, 判断工做线程数 < 总线程池上限 reject(command); // 执行拒绝策略 }
说明:ThreadPoolExecutor的实现其实是一个生产消费模型,当用户添加任务到线程池时,至关于生产者生产元素,workers线程工做集中的线程直接执行任务或者从任务队列里面获取任务时,则至关于消费者消费元素。
上述execute的执行流程能够用下图描述:
这里须要特别注意的是 CASE2中的addWorker(null, false)
,当将任务成功添加到队列后,若是此时的工做线程数为0,就会执行这段代码。
通常来说每一个工做线程(Worker)都有一个Runnable任务和一个对应的执行线程Thread,当咱们调用addWorker方法时,若是不传入相应的任务,那么就只是新建了一个没有任务的工做线程(Worker),该Worker就会从工做队列中取任务来执行(由于本身没有绑定任务)。若是传入了任务,新建的工做线程就会执行该任务。
因此execute方法的CASE2中,将任务添加到队列后,须要判断工做线程数是否为0,若是是0那么就必须新建一个空任务的工做线程,未来在某一时刻它会去队列取任务执行,不然没有工做线程的话,该队列中的任务永远不会被执行。
另外,这里又要回到【工做线程】、【核心线程池】、【非核心线程池】、【总线程池】的概念上了。
再强调一遍,maximumPoolSize限定了整个线程池的大小,corePoolSize限定了核心线程池的大小,corePoolSize≤maximumPoolSize(当相等时表示为固定线程池);maximumPoolSize-corePoolSize表示非核心线程池。
execute的整个执行流程关键是下面两点:
CorePoolSize ≤ 工做线程数 < maximumPoolSize
)新建一个工做线程当即执行任务,不然执行拒绝策略。了解了ThreadPoolExecutor的整个执行流程,咱们来看下它是如何添加工做线程并执行任务的,execute方法内部调用了addWorker方法来添加工做线程并执行任务:
/** * 添加工做线程并执行任务 * * @param firstTask 若是指定了该参数, 表示将当即建立一个新工做线程执行该firstTask任务; 不然复用已有的工做线程,从工做队列中获取任务并执行 * @param core 执行任务的工做线程归属于哪一个线程池: true-核心线程池 false-非核心线程池 */ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); // 获取线程池状态 /** * 这个if主要是判断哪些状况下, 线程池再也不接受新任务执行, 而是直接返回.总结下, 有如下几种状况: * 1. 线程池状态为 STOP 或 TIDYING 或 TERMINATED: 线程池状态为上述任一一种时, 都不会再接受任务,因此直接返回 * 2. 线程池状态≥ SHUTDOWN 且 firstTask != null: 由于当线程池状态≥ SHUTDOWN时, 再也不接受新任务的提交,因此直接返回 * 3. 线程池状态≥ SHUTDOWN 且 队列为空: 队列中已经没有任务了, 因此也就不须要执行任何任务了,能够直接返回 */ if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false; //循环CAS增长线程数 for (; ; ) { int wc = workerCountOf(c); // 获取工做线程数 /** * 这个if主要是判断工做线程数是否超限, 如下任一状况属于属于超限, 直接返回: * 1. 工做线程数超过最大工做线程数(2^29-1) * 2. 工做线程数超过核心线程池上限(入参core为true, 表示归属核心线程池) * 3. 工做线程数超过总线程池上限(入参core为false, 表示归属非核心线程池) */ if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) // 工做线程数加1 break retry; // 跳出最外层循环 c = ctl.get(); if (runStateOf(c) != rs) // 线程池状态发生变化, 从新自旋判断 continue retry; } } //到这里说明CAS自旋成功了 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); // 将任务包装成工做线程 final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); //加入独占锁,为了实现workers同步,由于可能多个线程调用了线程池的execute方法 try { // 从新检查线程池状态,以免在获取锁前调用了shutdown接口 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); // 加入工做线程集合 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } //添加成功后,则启动任务 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (!workerStarted) // 建立/启动工做线程失败, 须要执行回滚操做 addWorkerFailed(w); } return workerStarted; }
整个addWorker的逻辑并不复杂,分为两部分:
第一部分双重循环的目的是经过CAS操做增长线程数;第二部分主要把并发安全的任务添加到workers里面,并启动任务执行。
首先将Runnable任务包装成一个Worker对象,而后加入到一个工做线程集合中(名为workers的HashSet),最后调用工做线程中的Thread对象的start方法执行任务,其实最终是委托到Worker的下面方法执行:
/** * 执行任务 */ public void run() { runWorker(this); }
runWoker用于执行任务,总体流程以下:
getTask()
方法从队列中获取任务(若是工做线程自身携带着任务,则执行携带的任务);task.run()
执行任务;final void runWorker(Worker w) { Thread wt = Thread.currentThread(); // 执行任务的线程 Runnable task = w.firstTask; // 任务, 若是是null则从队列取任务 w.firstTask = null; w.unlock(); // 容许执行线程被中断 boolean completedAbruptly = true; // 表示是否由于中断而致使退出 try { while (task != null || (task = getTask()) != null) { // 当task==null时会经过getTask从队列取任务 w.lock(); /** * 下面这个if判断的做用以下: * 1.保证当线程池状态为STOP/TIDYING/TERMINATED时,当前执行任务的线程wt是中断状态(由于线程池处于上述任一状态时,均不能再执行新任务) * 2.保证当线程池状态为RUNNING/SHUTDOWN时,当前执行任务的线程wt不是中断状态 */ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); // 钩子方法,由子类自定义实现 Throwable thrown = null; try { task.run(); // 执行任务 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); // 钩子方法,由子类自定义实现 } } finally { task = null; w.completedTasks++; // 完成任务数+1 w.unlock(); } } // 执行到此处, 说明该工做线程自身既没有携带任务, 也没从任务队列中获取到任务 completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); // 处理工做线程的退出工做 } }
这里要特别注意第一个IF方法,该方法的核心做用,用一句话归纳就是:
确保正在中止的线程池(STOP/TIDYING/TERMINATED)再也不接受新任务,若是有新任务那么该任务的工做线程必定是中断状态;确保正常状态的线程池(RUNNING/SHUTDOWN),其所执行的任务都是不能被中断的。
另外,getTask方法用于从任务队列中获取一个任务,若是获取不到任务,会跳出while循环,最终会经过processWorkerExit方法清理工做线程。注意这里的completedAbruptly
字段,它表示该工做线程是不是由于中断而退出,while循环的退出有如下几种可能:
processWorkerExit(worker,false);
processWorkerExit(worker,true);
经过上面的讨论,咱们知道工做线程是在processWorkerExit中被清理的,来看下定义:
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // 工做线程因异常状况而退出 decrementWorkerCount(); // 工做线程数减1(若是工做线程执行时没有出现异常, 在getTask()方法中已经对线程数减1了) //统计整个线程池完成的任务个数,并从工做集中删除当前worker final ReentrantLock mainLock = this.mainLock; //全局锁 mainLock.lock(); try { completedTaskCount += w.completedTasks; // completedTaskCount记录线程池完成的总任务数 workers.remove(w); // 从工做线程集合中移除(该工做线程会自动被GC回收) } finally { mainLock.unlock(); } tryTerminate(); // 根据线程池状态, 判断是否须要终止线程池 int c = ctl.get(); if (runStateLessThan(c, STOP)) { // 若是线程池状态为RUNNING/SHUTDOWN if (!completedAbruptly) { // 工做线程为正常退出 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && !workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); // 新建一个工做线程 } }
processWorkerExit的做用就是将该退出的工做线程清理掉,而后看下线程池是否须要终止。
processWorkerExit执行完以后,整个工做线程的生命周期也结束了,咱们能够经过下图来回顾下它的整个生命周期:
最后,咱们来看下任务的获取,也就是runWorker中使用的getTask
方法:
private Runnable getTask() { boolean timedOut = false; // 表示上次从阻塞队列中取任务时是否超时 for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); // 获取线程池状态 /** * 如下IF用于判断哪些状况下不容许再从队列获取任务: * 1. 线程池进入中止状态(STOP/TIDYING/TERMINATED), 此时即便队列中还有任务未执行, 也再也不执行 * 2. 线程池非RUNNING状态, 且队列为空 */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); // 工做线程数减1 return null; } int wc = workerCountOf(c); // 获取工做线程数 /** * timed变量用于判断是否须要进行超时控制: * 对于核心线程池中的工做线程, 除非设置了allowCoreThreadTimeOut==true, 不然不会超时回收; * 对于非核心线程池中的工做线程, 都须要超时控制 */ boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 这里主要是当外部经过setMaximumPoolSize方法从新设置了最大线程数时,须要回收多出的工做线程 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; // 超时仍未获取到任务 } catch (InterruptedException retry) { timedOut = false; } } }
getTask方法的主要做用就是:经过自旋,不断地尝试从阻塞队列中获取一个任务,若是获取失败则返回null。
阻塞队列就是在咱们构建ThreadPoolExecutor对象时,在构造器中指定的。因为队列是外部指定的,因此根据阻塞队列的特性不一样,getTask方法的执行状况也不一样。
队列特性 | 有界队列 | 近似无界队列 | 无界队列 | 特殊队列 |
---|---|---|---|---|
有锁算法 | ArrayBlockingQueue | LinkedBlockingQueue、LinkedBlockingDeque | / | PriorityBlockingQueue、DelayQueue |
无锁算法 | / | / | LinkedTransferQueue | SynchronousQueue |
咱们能够根据业务需求、任务特色等选择上表中的某一种阻塞队列,根据Oracle官方文档的提示,任务在阻塞队列中排队一共有三种状况:
1.直接提交
即直接将任务提交给等待的工做线程,这时能够选择SynchronousQueue。由于SynchronousQueue是没有容量的,并且采用了无锁算法,因此性能较好,可是每一个入队操做都要等待一个出队操做,反之亦然。
使用SynchronousQueue时,当核心线程池满了之后,若是不存在空闲的工做线程,则试图把任务加入队列将当即失败(execute方法中使用了队列的offer方法进行入队操做,而SynchronousQueue在调用offer时若是没有另外一个线程等待出队操做,则会当即返回false),所以会构造一个新的工做线程(未超出最大线程池容量时)。
因为,核心线程池是很容易满的,因此当使用SynchronousQueue时,通常须要将maximumPoolSizes
设置得比较大,不然入队很容易失败,最终致使执行拒绝策略,这也是为何Executors工做默认提供的缓存线程池使用SynchronousQueue做为任务队列的缘由。
2.无界任务队列
无界任务队列咱们的选择主要有LinkedTransferQueue、LinkedBlockingQueue(近似无界,构造时不指定容量便可),从性能角度来讲LinkedTransferQueue采用了无锁算法,高并发环境下性能相对更好,但若是只是作任务队列使用相差并不大。
使用无界队列须要特别注意系统资源的消耗状况,由于当核心线程池满了之后,会首先尝试将任务放入队列,因为是无界队列因此几乎必定会成功,那么系统瓶颈其实就是硬件了。若是任务的建立速度远快于工做线程处理任务的速度,那么最终会致使系统资源耗尽。Executors工厂中建立固定线程池的方法内部就是用了LinkedBlockingQueue。
3.有界任务队列
有界任务队列,好比ArrayBlockingQueue ,能够防止资源耗尽的状况。当核心线程池满了之后,若是队列也满了,则会建立归属于非核心线程池的工做线程,若是非核心线程池也满了 ,才会执行拒绝策略。
ThreadPoolExecutor在如下两种状况下会执行拒绝策略:
所谓拒绝策略,就是在构造ThreadPoolExecutor时,传入的RejectedExecutionHandler对象:
public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
ThreadPoolExecutor一共提供了4种拒绝策略:
1.AbortPolicy(默认)
AbortPolicy策略其实就是抛出一个RejectedExecutionException异常:
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
2.DiscardPolicy
DiscardPolicy策略其实就是无为而治,什么都不作,等任务本身被回收:
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
3.DiscardOldestPolicy
DiscardOldestPolicy策略是丢弃任务队列中的最近一个任务,并执行当前任务:
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { // 线程池未关闭(RUNNING) e.getQueue().poll(); // 丢弃任务队列中的最近任务 e.execute(r); // 执行当前任务 } } }
4.CallerRunsPolicy
CallerRunsPolicy策略至关于以自身线程来执行任务,这样能够减缓新任务提交的速度。
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { // 线程池未关闭(RUNNING) r.run(); // 执行当前任务 } } }
ExecutorService接口提供两种方法来关闭线程池,这两种方法的区别主要在因而否会继续处理已经添加到任务队列中的任务。
shutdown方法将线程池切换到SHUTDOWN状态(若是已经中止,则不用切换),并调用interruptIdleWorkers方法中断全部空闲的工做线程,最后调用tryTerminate尝试结束线程池:
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); // 若是线程池为RUNNING状态, 则切换为SHUTDOWN状态 interruptIdleWorkers(); // 中断全部空闲线程 onShutdown(); // 钩子方法, 由子类实现 } finally { mainLock.unlock(); } tryTerminate(); }
这里要注意,若是执行Runnable任务的线程自己不响应中断,那么也就没有办法终止任务。
shutdownNow方法的主要不一样之处就是,它会将线程池的状态至少置为STOP,同时中断全部工做线程(不管该线程是空闲仍是运行中),同时返回任务队列中的全部任务。
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); // 若是线程池为RUNNING或SHUTDOWN状态, 则切换为STOP状态 interruptWorkers(); // 中断全部工做线程 tasks = drainQueue(); // 抽空任务队列中的全部任务 } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
最后,咱们来回顾下ThreadPoolExecutor的总体结构,ThreadPoolExecutor的核心方法是execute,控制着工做线程的建立和任务的执行,以下图:
同时,ThreadPoolExecutor中有几个比较重要的组件:阻塞队列、核心线程池、拒绝策略,它们的关系以下图,图中的序号表示execute的执行顺序,能够配合上面的流程图来理解:
关于ThreadPoolExecutor这个线程池,最重要的是根据系统实际状况,合理进行线程池参数的设置以及阻塞队列的选择。现实状况下,通常会本身经过ThreadPoolExecutor的构造器去构建线程池,而非直接使用Executors工厂建立,由于这样更利于对参数的控制和调优。
另外,根据任务的特色,要有选择的配置核心线程池的大小:
ThreadPoolExecutor到此就介绍完了,下一节咱们将介绍一种可控制任务执行周期的线程池——ScheduledThreadPoolExecutor,其实咱们以前讲ScheduledExecutorService接口的时候已经接触过了,下一节会深刻它的实现原理。
线程池巧妙地使用一个Integer类型地原子变量来记录线程池状态和线程池中地线程个数。经过线程池状态来控制任务地执行,每一个worker线程能够处理多个任务。线程池经过线程地复用减小了线程地建立和销毁地开销。
参考书籍
Java并发编程之美
本文参考