1.下降系统资源消耗 java
2.提升线程可控性。缓存
1.建立一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。安全
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(
nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()); } 复制代码
2.jdk8新增的,会根据所需的并发数来动态建立和关闭线程。能合理的使用CPU进行对任务进行并发操做,因此适合使用在很耗时的任务。bash
注意返回的是ForkJoinPool 并发
public static ExecutorService newWorkStealingPool(int parallelism) {
return new ForkJoinPool (
parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);
} 复制代码
什么是ForkJoinPool?
public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler, boolean asyncMode) {
this(checkParallelism(parallelism), checkFactory(factory), handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); } 复制代码
使用一个无限队列来保存所须要执行的任务,能够传入线程的数量;不传入,则默认使用当前计算机中可用的CPU数量;使用分治法来解决问题,使用 fork()和join()来进行调用。框架
3.建立一个可缓存的线程池,可灵活回收空闲线程,若无可回收,则新建线程。 async
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(
0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } 复制代码
4.建立一个单线程的线程池。 oop
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (
new ThreadPoolExecutor(
1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }复制代码
5.建立一个定长线程池,支持定时及周期性任务执行。源码分析
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize); } 复制代码
Executor结构:
ui
Executor【接口】 一个运行新任务的简单接口
public interface Executor{
void execute(Runnable command);
} 复制代码
ExecutorService【接口】 扩展了Executor接口。
添加了一些用来管理执行生命周期和任务生命周期的方法。
AbstractExecutorService【抽象类】
对ExecutorService接口的抽象类实现。不是咱们分析的重点。 复制代码
ThreadPoolExecutor【类】
Java线程池的核心实现 复制代码
属性解释:
// AtomicInteger是原子类 ctlOf()返回值为RUNNING;private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 高3位表示线程状态private static final int COUNT_BITS = Integer.SIZE - 3;// 低29位表示workerCount容量private static final int CAPACITY = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits// 能接收任务且能处理阻塞队列中的任务private static final int RUNNING = -1 << COUNT_BITS;// 不能接收新任务,但能够处理队列中的任务。private static final int SHUTDOWN = 0 << COUNT_BITS;// 不接收新任务,不处理队列任务。private static final int STOP = 1 << COUNT_BITS;// 全部任务都终止private static final int TIDYING = 2 << COUNT_BITS;// 什么都不作private static final int TERMINATED = 3 << COUNT_BITS;// 存听任务的阻塞队列private final BlockingQueue<Runnable> workQueue;复制代码
值得注意的是状态值越大线程越不活跃。
线程池状态的转换模型:
构造器:
public ThreadPoolExecutor(int corePoolSize,//线程池初始启动时线程的数量 int maximumPoolSize,//最大线程数量 long keepAliveTime,//空闲线程多久关闭? TimeUnit unit,// 计时单位 BlockingQueue<Runnable> workQueue,//听任务的阻塞队列 ThreadFactory threadFactory,//线程工厂 RejectedExecutionHandler handler// 拒绝策略) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;}复制代码
在向线程池提交任务时,会经过两个方法:execute和submit。
【本次只讲execute方法,submit、Future、Callable一块儿讲】
execute方法:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // clt记录着runState和workerCount int c = ctl.get(); //workerCountOf方法取出低29位的值,表示当前活动的线程数 //而后拿线程数和 核心线程数作比较 if (workerCountOf(c) < corePoolSize) { // 若是活动线程数<核心线程数 // 添加到 //addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断仍是maximumPoolSize来判断 if (addWorker(command, true)) // 若是成功则返回 return; // 若是失败则从新获取 runState和 workerCount c = ctl.get(); } // 若是当前线程池是运行状态而且任务添加到队列成功 if (isRunning(c) && workQueue.offer(command)) { // 从新获取 runState和 workerCount int recheck = ctl.get(); // 若是不是运行状态而且 if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) //第一个参数为null,表示在线程池中建立一个线程,但不去启动 // 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize addWorker(null, false); } //再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize else if (!addWorker(command, false)) //若是失败则拒绝该任务 reject(command);}复制代码
总结一下它的工做流程:
经过上面的execute方法能够看出,最主要的逻辑仍是在addWorker方法中实现的。
addWorker方法:
主要工做就是在线程池中建立一个新的线程并执行。复制代码
参数定义:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); // 获取运行状态 int rs = runStateOf(c); // Check if queue empty only if necessary. // 若是状态值 >= SHUTDOWN (不接新任务&不处理队列任务) // 而且 若是 !(rs为SHUTDOWN 且 firsTask为空 且 阻塞队列不为空) if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) // 返回false return false; for (;;) { //获取线程数wc int wc = workerCountOf(c); // 若是wc大与容量 || core若是为true表示根据corePoolSize来比较,不然为maximumPoolSize if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 增长workerCount(原子操做) if (compareAndIncrementWorkerCount(c)) // 若是增长成功,则跳出 break retry; // wc增长失败,则再次获取runState c = ctl.get(); // Re-read ctl // 若是当前的运行状态不等于rs,说明状态已被改变,返回从新执行 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 根据firstTask来建立Worker对象 w = new Worker(firstTask); // 根据worker建立一个线程 final Thread t = w.thread; if (t != null) { // new一个锁 final ReentrantLock mainLock = this.mainLock; // 加锁 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. // 获取runState int rs = runStateOf(ctl.get()); // 若是rs小于SHUTDOWN(处于运行)或者(rs=SHUTDOWN && firstTask == null) // firstTask == null证实只新建线程而不执行任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 若是t活着就抛异常 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 不然加入worker(HashSet) //workers包含池中的全部工做线程。仅在持有mainLock时访问。 workers.add(w); // 获取工做线程数量 int s = workers.size(); //largestPoolSize记录着线程池中出现过的最大线程数量 if (s > largestPoolSize) // 若是 s比它还要大,则将s赋值给它 largestPoolSize = s; // worker的添加工做状态改成true workerAdded = true; } } finally { mainLock.unlock(); } // 若是worker的添加工做完成 if (workerAdded) { // 启动线程 t.start(); // 修改线程启动状态 workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } // 返回线启动状态 return workerStarted;复制代码
为何须要持有mainLock?
由于workers是HashSet类型的,不能保证线程安全。
那w = new Worker(firstTask);如何理解呢
Work.java
private final class Worker extends AbstractQueuedSynchronizer implements Runnable复制代码
能够看到它集成AQS并发框架还发现了Runnable。证实它仍是一个线程任务类。那咱们调用t.start()事实上就是调用了该类重写的run方法。
Worker为何使用ReentrantLock来实现呢?
tryAcquire方法它是不容许重入的,而ReentrantLock是容许重入的。对于线程来讲,若是线程正在执行是不容许其余锁重入进来的。
线程只须要两个状态,一个是独占锁,代表是正在执行任务;一个是不加锁,代表是空闲状态。
public void run(){
runnWorker(this);
}复制代码
run方法有调用了runnWorker方法:
final void runWorker(Worker w) { // 拿到当前线程 Thread wt = Thread.currentThread(); // 拿到当前任务 Runnable task = w.firstTask; // 将Worker.firstTask置空 而且释放锁 w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 若是task或者getTask不为空,则一直循环 while (task != null || (task = getTask()) != null) { // 加锁 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // return ctl.get() >= stop // 若是线程池状态>=STOP 或者 (线程中断且线程池状态>=STOP)且当前线程没有中断 // 其实就是保证两点: // 1. 线程池没有中止 // 2. 保证线程没有中断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) // 中断当前线程 wt.interrupt(); try { // 空方法 beforeExecute(wt, task); Throwable thrown = null; try { // 执行run方法(Runable对象) task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { // 执行完后, 将task置空, 完成任务++, 释放锁 task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 退出工做 processWorkerExit(w, completedAbruptly); }复制代码
总结一下runWorker方法的执行流程:
这个流程图很是经典:
除此以外,ThreadPoolExecutor还提供了tryAcquire、tryRelease、shutdown、shutdownNow、tryTerminate、等涉及的一系列线程状态更改的方法。
在runWorker方法中,为何要在执行任务的时候对每一个工做线程都加锁呢?
shutdown方法与getTask方法存在竞争条件。