首先咱们要知道为何要使用ThreadPoolExcuter,具体能够看看文档中的说明: 线程池能够解决两个不一样问题:因为减小了每一个任务的调用开销,在执行大量的异步任务时,它一般可以提供更好的性能,而且还能够提供绑定和管理资源(包括执行集合任务时使用的线程)的方法。每一个 ThreadPoolExecutor还维护着一些基本的统计数据,如完成的任务数。 线程池作的其实能够看得很简单,其实就是把你提交的任务(task)进行调度管理运行,但这个调度的过程以及其中的状态控制是比较复杂的。java
能够直接看最完整的ThreadPoolExcuter的初始化函数:缓存
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { ... }
逐个介绍以下: **corePoolSize:**核心线程数,在ThreadPoolExcutor中有一个与它相关的配置:allowCoreThreadTimeOut(默认为false),当allowCoreThreadTimeOut为false时,核心线程会一直存活,哪怕是一直空闲着。而当allowCoreThreadTimeOut为true时核心线程空闲时间超过keepAliveTime时会被回收。并发
**maximumPoolSize:**最大线程数,线程池能容纳的最大线程数,当线程池中的线程达到最大时,此时添加任务将会采用拒绝策略,默认的拒绝策略是抛出一个运行时错误(RejectedExecutionException)。值得一提的是,当初始化时用的工做队列为LinkedBlockingDeque时,这个值将无效。异步
**keepAliveTime:**存活时间,当非核心空闲超过这个时间将被回收,同时空闲核心线程是否回收受allowCoreThreadTimeOut影响。ide
**unit:**keepAliveTime的单位。函数
**workQueue:**任务队列,经常使用有三种队列,即SynchronousQueue,LinkedBlockingDeque(无界队列),ArrayBlockingQueue(有界队列)。oop
**threadFactory:**线程工厂,ThreadFactory是一个接口,用来建立worker。经过线程工厂能够对线程的一些属性进行定制。默认直接新建线程。性能
**RejectedExecutionHandler:**也是一个接口,只有一个方法,当线程池中的资源已经所有使用,添加新线程被拒绝时,会调用RejectedExecutionHandler的rejectedExecution法。 默认是抛出一个运行时异常。ui
这么多参数看起来好像很复杂,因此Java贴心得为咱们准备了便捷的API,便可以直接用Executors建立各类线程池。分别是:this
//建立一个可缓存线程池,若是线程池长度超过处理须要,可灵活回收空闲线程,若无可回收,则新建线程。 //经过设置corePoolSize为0,而maximumPoolSize为Integer.Max_VALUE(Int型数据最大值)实现。 ExecutorService cache = Executors.newCachedThreadPool(); //建立一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。 //经过将corePoolSize和maximumPoolSize的值设置为同样的值来实现。 ExecutorService fixed = Executors.newFixedThreadPool(num); //建立一个定长线程池,支持定时及周期性任务执行。 //经过将队列参数workQueue设置为DelayWorkQueue来实现。 ExecutorService schedule = Executors.newScheduledThreadPool(5); //建立一个单线程化的线程池,它只会用惟一的工做线程来执行任务,保证全部任务按照指定顺序(FIFO, LIFO, 优先级)执行。 //经过将corePoolSize和maximumPoolSize都设置为1来实现。 ExecutorService single = Executors.newSingleThreadExecutor();
这几个API会根据具体的状况而使用预设定好默认的初始化参数去建立一个ThreadPoolExecutor。
这里须要作一个额外说明,在ThreadPoolExcuter中,worker和task是有区别的,task是用户提交的任务,而worker则是用来执行task的线程。在初始化参数中,corePoolSize和maximumPoolSize都是针对worker的,而workQueue是用来存放task的。
前面有介绍了一下worker和task的区别,其中task是用户提交的线程任务,而worker则是ThreadPoolExecutor本身内部实现的一个类了。
具体源码以下:
/** * Woker主要维护着运行task的worker的中断控制信息,以及其余小记录。这个类拓展AbstractQueuedSynchronizer * 而来简化获取和释放每个任务执行中的锁。这能够防止中断那些打算唤醒正在等待其余线程任务的任务,而不是 * 中断正在运行的任务。咱们实现一个简单的不可重入锁而不是ReentrantLo,由于咱们不想当其调用setCorePoolSize * 这样的方法的时候能得到锁。 */ //worker主要是对进行中的任务进行中断控制,顺带着对其余行为进行记录 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ //正在跑的线程,若是是null标识factory失败 final Thread thread; /** Initial task to run. Possibly null. */ //初始化一个任务以运行 Runnable firstTask; /** Per-thread task counter */ //每一个线程计数 volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) * 用给定的first task和从threadFactory建立 */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ //主要调用了runWorker public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. //锁方法 // protected boolean isHeldExclusively() { return getState() != 0; } //尝试获取锁 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } //尝试释放锁 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
Worker其实能够看做高级一点的线程。其中继承AbstractQueuedSynchronizer主要是为了实现锁控制。ThreadPoolExecutor会持有并管理Worker,在Worker中firstTask其实就是存放task的,而thread则是存放当前Worker自己的线程。
其中比较重要的就是run方法了,但这个方法其实又是去调用ThreadPoolExecutor里面的runWorker()方法,具体能够看下一节的介绍。
首先须要介绍线程池有五种运行状态: RUNNING(状态值-1): 接收新任务并处理队列中的任务 SHUTDOWN(状态值0): 不接收新任务但会处理队列中的任务。 STOP(状态值1): 不接收新任务,不处理队列中的任务,并中断正在处理的任务 TIDYING(状态值2): 全部任务已终止,workerCount为0,处于TIDYING状态的线程将调用钩子方法terminated()。 TERMINATED(状态值3): terminated()方法完成。
而后咱们能够看看ThreadPoolExcuter中的ctl这个变量。 ctl是ThreadPoolExcuter中比较有意思的一个实现,它是一个AtomicInteger,这里不对AtomicInteger多作讨论,只要知道能够把它当作有原子性的Integer就够了,其实它具备原子性的原理是使用了CAS的技术,这是一种乐观锁的具体实现。 ThreadPoolExcuter是将两个内部值打包成一个值,即将workerCount和runState(运行状态)这两个值打包在一个ctl中,由于runState有5个值,须要3位,因此有3位表示 runState,而其余29位表示为workerCount。 而运行时要获取其余数据时,只须要对ctl进行拆包便可。具体这部分代码以下:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl //拆包ctl,分别获取runState和WorkerCount private static int runStateOf(int c) { return c & ~CAPACITY; } private static int workerCountOf(int c) { return c & CAPACITY; } //打包操做 private static int ctlOf(int rs, int wc) { return rs | wc; }
当执行器(Executor)处于终止状态,或者执行器在max threads和工做队列都是有界而且处于饱和的时候,新提交的任务会被拒绝。在任一状况下,执行的任务将调用RejectedExecutionHandler的方法rejectedExecution(Runnable, ThreadPoolExecutor)。有如下四种拒绝策略:
1.默认的是ThreadPoolExecutor.AbortPolicy,在这种策略下,处理器会在拒绝后抛出一个运行异常RejectedExecutionException。
2.在ThreadPoolExecutor.CallerRunsPolicy的策略下,线程会调用它直接的execute来运行这个任务。这种方式提供简单的反馈控制机制来减缓新任务提交的速度。
3.在ThreadPoolExecutor.DiscardPolicy策略下,没法执行的任务将被简单得删除掉。
4.在ThreadPoolExecutor.DiscardOldestPolicy策略下,若是executor没有处于终止状态,在工做队列头的任务将被删除,而后会从新执行(可能会再次失败,这会致使重复这个过程)。
总结:本篇初步介绍了ThreadPoolExcuter的基本原理,解决了什么问题。然后说明了ThreadPoolExcuter中的初始化参数,对其中的各个参数作初步介绍。再以后介绍ctl变量的做用,并初步介绍了任务提交失败后的拒绝策略。