线程池源码分析

1、问什么要使用线程池?

 1.下降系统资源消耗 java

 2.提升线程可控性。缓存

 2、如何建立使用的线程池?

 jdk8提供了五种建立线程池的方法: 

 1.建立一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。安全

public static ExecutorService newFixedThreadPool(int nThreads) {      return new ThreadPoolExecutor(
         nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, 
         new LinkedBlockingQueue<Runnable>());  } 复制代码

 2.jdk8新增的,会根据所需的并发数来动态建立和关闭线程。能合理的使用CPU进行对任务进行并发操做,因此适合使用在很耗时的任务。bash

注意返回的是ForkJoinPool 并发

public static ExecutorService newWorkStealingPool(int parallelism) {
      return new ForkJoinPool (
          parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); 
} 复制代码

什么是ForkJoinPool?
 public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, 
                     UncaughtExceptionHandler handler, boolean asyncMode) { 
          this(checkParallelism(parallelism), checkFactory(factory), handler, 
        asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-");             checkPermission(); } 复制代码


使用一个无限队列来保存所须要执行的任务,能够传入线程的数量;不传入,则默认使用当前计算机中可用的CPU数量;使用分治法来解决问题,使用 fork()和join()来进行调用。框架

 3.建立一个可缓存的线程池,可灵活回收空闲线程,若无可回收,则新建线程。 async

public static ExecutorService newCachedThreadPool() {       return new ThreadPoolExecutor(
       0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());  } 复制代码

 4.建立一个单线程的线程池。 oop

public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (
        new ThreadPoolExecutor(
        1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }复制代码

 5.建立一个定长线程池,支持定时及周期性任务执行。源码分析

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);  } 复制代码

3、源码分析

Executor结构:
ui


 Executor【接口】 一个运行新任务的简单接口 

public interface Executor{
     void execute(Runnable command); 
} 复制代码

 ExecutorService【接口】 扩展了Executor接口

添加了一些用来管理执行生命周期和任务生命周期的方法。


 AbstractExecutorService【抽象类】

对ExecutorService接口的抽象类实现。不是咱们分析的重点。 复制代码

 ThreadPoolExecutor【类】 

Java线程池的核心实现 复制代码

 4、ThreadPoolExecutor

属性解释:

// AtomicInteger是原子类  ctlOf()返回值为RUNNING;private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 高3位表示线程状态private static final int COUNT_BITS = Integer.SIZE - 3;// 低29位表示workerCount容量private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits// 能接收任务且能处理阻塞队列中的任务private static final int RUNNING    = -1 << COUNT_BITS;// 不能接收新任务,但能够处理队列中的任务。private static final int SHUTDOWN   =  0 << COUNT_BITS;// 不接收新任务,不处理队列任务。private static final int STOP       =  1 << COUNT_BITS;// 全部任务都终止private static final int TIDYING    =  2 << COUNT_BITS;// 什么都不作private static final int TERMINATED =  3 << COUNT_BITS;// 存听任务的阻塞队列private final BlockingQueue<Runnable> workQueue;复制代码

值得注意的是状态值越大线程越不活跃。


线程池状态的转换模型:     

  

构造器:       

public ThreadPoolExecutor(int corePoolSize,//线程池初始启动时线程的数量                          int maximumPoolSize,//最大线程数量                          long keepAliveTime,//空闲线程多久关闭?                          TimeUnit unit,// 计时单位                          BlockingQueue<Runnable> workQueue,//听任务的阻塞队列                          ThreadFactory threadFactory,//线程工厂                          RejectedExecutionHandler handler// 拒绝策略) {    if (corePoolSize < 0 ||        maximumPoolSize <= 0 ||        maximumPoolSize < corePoolSize ||        keepAliveTime < 0)        throw new IllegalArgumentException();    if (workQueue == null || threadFactory == null || handler == null)        throw new NullPointerException();    this.acc = System.getSecurityManager() == null ?            null :            AccessController.getContext();    this.corePoolSize = corePoolSize;    this.maximumPoolSize = maximumPoolSize;    this.workQueue = workQueue;    this.keepAliveTime = unit.toNanos(keepAliveTime);    this.threadFactory = threadFactory;    this.handler = handler;}复制代码

在向线程池提交任务时,会经过两个方法:execute和submit。

【本次只讲execute方法,submit、Future、Callable一块儿讲】

execute方法:

public void execute(Runnable command) {    if (command == null)        throw new NullPointerException();    // clt记录着runState和workerCount    int c = ctl.get();    //workerCountOf方法取出低29位的值,表示当前活动的线程数    //而后拿线程数和 核心线程数作比较    if (workerCountOf(c) < corePoolSize) {        // 若是活动线程数<核心线程数        // 添加到        //addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断仍是maximumPoolSize来判断        if (addWorker(command, true))            // 若是成功则返回            return;        // 若是失败则从新获取 runState和 workerCount        c = ctl.get();    }    // 若是当前线程池是运行状态而且任务添加到队列成功    if (isRunning(c) && workQueue.offer(command)) {        // 从新获取 runState和 workerCount        int recheck = ctl.get();        // 若是不是运行状态而且         if (! isRunning(recheck) && remove(command))            reject(command);        else if (workerCountOf(recheck) == 0)            //第一个参数为null,表示在线程池中建立一个线程,但不去启动            // 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize            addWorker(null, false);    }    //再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize    else if (!addWorker(command, false))        //若是失败则拒绝该任务        reject(command);}复制代码

总结一下它的工做流程:

  1. 当workerCount < corePoolSize,建立线程执行任务。
  2. 当workerCount >= corePoolSize && 阻塞队列workQueue未满,把新的任务放入阻塞队列。
  3. 当workQueue已满,而且workerCount >= corePoolSize,而且workerCount < maximumPoolSize,建立线程执行任务。
  4. 当workQueue已满,workerCount >= maximumPoolSize,采起拒绝策略,默认拒绝策略是直接抛异常。


经过上面的execute方法能够看出,最主要的逻辑仍是在addWorker方法中实现的。


addWorker方法:

主要工做就是在线程池中建立一个新的线程并执行。复制代码

参数定义:

  •  firstTask : the task the new thread should run first(or null if none).(指定新增线程执行的第一个任务或者不执行任务)
  • core :if true use corePoolSize as bound,else maximumPoolSize.(core若是为true则使用corePoolSize绑定,不然为maximumPoolSize。)此处使用布尔指示符而不是值,以确保在检查其余状态后读取新值。

private boolean addWorker(Runnable firstTask, boolean core) {    retry:    for (;;) {        int c = ctl.get();        //  获取运行状态        int rs = runStateOf(c);        // Check if queue empty only if necessary.        // 若是状态值 >= SHUTDOWN (不接新任务&不处理队列任务)        // 而且 若是 !(rs为SHUTDOWN 且 firsTask为空 且 阻塞队列不为空)        if (rs >= SHUTDOWN &&            ! (rs == SHUTDOWN &&               firstTask == null &&               ! workQueue.isEmpty()))            // 返回false            return false;        for (;;) {            //获取线程数wc            int wc = workerCountOf(c);            // 若是wc大与容量 || core若是为true表示根据corePoolSize来比较,不然为maximumPoolSize            if (wc >= CAPACITY ||                wc >= (core ? corePoolSize : maximumPoolSize))                return false;            // 增长workerCount(原子操做)            if (compareAndIncrementWorkerCount(c))                // 若是增长成功,则跳出                break retry;            // wc增长失败,则再次获取runState            c = ctl.get();  // Re-read ctl            // 若是当前的运行状态不等于rs,说明状态已被改变,返回从新执行            if (runStateOf(c) != rs)                continue retry;            // else CAS failed due to workerCount change; retry inner loop        }    }    boolean workerStarted = false;    boolean workerAdded = false;    Worker w = null;    try {        // 根据firstTask来建立Worker对象        w = new Worker(firstTask);        // 根据worker建立一个线程        final Thread t = w.thread;        if (t != null) {            // new一个锁            final ReentrantLock mainLock = this.mainLock;            // 加锁            mainLock.lock();            try {                // Recheck while holding lock.                // Back out on ThreadFactory failure or if                // shut down before lock acquired.                // 获取runState                int rs = runStateOf(ctl.get());                // 若是rs小于SHUTDOWN(处于运行)或者(rs=SHUTDOWN && firstTask == null)                // firstTask == null证实只新建线程而不执行任务                if (rs < SHUTDOWN ||                    (rs == SHUTDOWN && firstTask == null)) {                    // 若是t活着就抛异常                    if (t.isAlive()) // precheck that t is startable                        throw new IllegalThreadStateException();                    // 不然加入worker(HashSet)                    //workers包含池中的全部工做线程。仅在持有mainLock时访问。                    workers.add(w);                    // 获取工做线程数量                    int s = workers.size();                    //largestPoolSize记录着线程池中出现过的最大线程数量                    if (s > largestPoolSize)                        // 若是 s比它还要大,则将s赋值给它                        largestPoolSize = s;                    // worker的添加工做状态改成true                        workerAdded = true;                }            } finally {                mainLock.unlock();            }            // 若是worker的添加工做完成            if (workerAdded) {                // 启动线程                t.start();                // 修改线程启动状态                workerStarted = true;            }        }    } finally {        if (! workerStarted)            addWorkerFailed(w);    }    // 返回线启动状态    return workerStarted;复制代码

为何须要持有mainLock?

由于workers是HashSet类型的,不能保证线程安全。

w = new Worker(firstTask);如何理解呢

Work.java

private final class Worker    extends AbstractQueuedSynchronizer    implements Runnable复制代码

能够看到它集成AQS并发框架还发现了Runnable。证实它仍是一个线程任务类。那咱们调用t.start()事实上就是调用了该类重写的run方法。

Worker为何使用ReentrantLock来实现呢?

tryAcquire方法它是不容许重入的,而ReentrantLock是容许重入的。对于线程来讲,若是线程正在执行是不容许其余锁重入进来的。

线程只须要两个状态,一个是独占锁,代表是正在执行任务;一个是不加锁,代表是空闲状态。

public void run(){
    runnWorker(this);
}复制代码

run方法有调用了runnWorker方法:

final void runWorker(Worker w) {    // 拿到当前线程    Thread wt = Thread.currentThread();    // 拿到当前任务    Runnable task = w.firstTask;    // 将Worker.firstTask置空 而且释放锁    w.firstTask = null;    w.unlock(); // allow interrupts    boolean completedAbruptly = true;    try {        // 若是task或者getTask不为空,则一直循环        while (task != null || (task = getTask()) != null) {            // 加锁            w.lock();            // If pool is stopping, ensure thread is interrupted;            // if not, ensure thread is not interrupted.  This            // requires a recheck in second case to deal with            // shutdownNow race while clearing interrupt            //  return ctl.get() >= stop             // 若是线程池状态>=STOP 或者 (线程中断且线程池状态>=STOP)且当前线程没有中断            // 其实就是保证两点:            // 1. 线程池没有中止            // 2. 保证线程没有中断            if ((runStateAtLeast(ctl.get(), STOP) ||                 (Thread.interrupted() &&                  runStateAtLeast(ctl.get(), STOP))) &&                !wt.isInterrupted())                // 中断当前线程                wt.interrupt();            try {                // 空方法                beforeExecute(wt, task);                Throwable thrown = null;                try {                    // 执行run方法(Runable对象)                    task.run();                } catch (RuntimeException x) {                    thrown = x; throw x;                } catch (Error x) {                    thrown = x; throw x;                } catch (Throwable x) {                    thrown = x; throw new Error(x);                } finally {                    afterExecute(task, thrown);                }            } finally {                // 执行完后, 将task置空, 完成任务++, 释放锁                task = null;                w.completedTasks++;                w.unlock();            }        }        completedAbruptly = false;    } finally {        // 退出工做        processWorkerExit(w, completedAbruptly);    }复制代码

总结一下runWorker方法的执行流程:

  1. while循环中,不断的经过getTask方法从workerQueue中获取任务
  2. 若是线程池正在中止,则中断线程。不然调用3.
  3. 调用task.run()执行任务。
  4. 若是task为null则跳出循环,执行processWorkExit()方法,销毁线程workers.remove(w).

这个流程图很是经典:


除此以外,ThreadPoolExecutor还提供了tryAcquire、tryRelease、shutdown、shutdownNow、tryTerminate、等涉及的一系列线程状态更改的方法。


在runWorker方法中,为何要在执行任务的时候对每一个工做线程都加锁呢?

shutdown方法与getTask方法存在竞争条件。

相关文章
相关标签/搜索