【初识】-JUC·ThreadPoolExecutor 线程池

ThreadPoolExecutor算是JUC中最经常使用的类之一了。ThreadPoolExecutor,顾名思义,thread-pool-executor,硬翻译就是“线程-池-执行者”;java中,经过ThreadPoolExecutor能够很容易的建立一个线程池。可是咱们为何要使用线程池?呢?它可以带来什么样的优点呢?它又是怎么实现的呢?OK,带着这几个问题,咱们来学习一下JAVA中的线程池技术。java

为何要使用线程池?

关于这个问题其实有点鸡肋,我以为再问这个问题以前更应该问为何要有线程池。那为何呢?数组


this is a 例子:bash

快递行业最近两年发展的灰常火热,据说工资也很是的高,搞得我一每天的都没有心思去好好写代码了...微信

以前的小快递公司都是没有固定的快递员的,就是说,每次去送一件快递,站点负责人就须要去找一我的来帮忙送,送完以后就没有而后了(固然,钱仍是要给的)。框架

可是后来随着货愈来愈多,找人给钱成本太大,并且农忙时还须要花很长时间去找人,因此就雇用了5我的,签了合同,长期为站点配送。ide

之前都是随时用随时找,如今不是,如今是成立了一个物流公司,开了一个配送部,配送部门规定正式配送员最多只能有五我的。函数

以前配送的缺点是什么:oop

  • 每次有货,我都会去临时找一我的,而后签定临时合同,送完以后解除合同。很麻烦。 这也是不用线程池的缺点,就是任务来了,咱们须要频繁的去建立新的线程,用完以后还须要释放线程资源,对于系统的消耗是很大的。
  • 由于配送的货车只有那么几个,若是临时签定的人多了,车子不够用,其余人只能等着车子送完以后才能用。

成立配送部以后解决的问题post

  • 成立配送部以后呢,由于签定的是劳务合同,咱们能够重复的让配送员配送不一样的货物。达到线程资源的复用。
  • 由于限定了最多招聘的人数,能够很好的避免招过多无用的人。

OK,咱们以上述例子来对应理解线程池的基本原理学习

先来看下,JAVA对ThreadPoolExecutor的类申明:

public class ThreadPoolExecutor extends AbstractExecutorService 复制代码

【初识】-JUC·Executor框架中给出了Executor的继承体系。ThreadPoolExecutor就是具有线程池功能的集成者。

构造方法

//构造方法一
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
                          
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
         
 }
 //构造方法二
 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}
//构造方法三
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}
//构造方法四
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
复制代码

从上面的代码能够看出,构造方法(1、2、三)都是经过调用(四)来作具体属性初始化的。那么咱们直接来看构造方法四;在构造方法四中总共须要7个参数,先来看下每一个参数的具体含义:

  • corePoolSize

    核心线程数大小。那么什么是核心线程数呢,咱们能够类比于上面例子中的配送部中签定劳动合同的人的个数。

  • maximumPoolSize

    最大线程数。加入说如今是双十一期间,快递异常的多,配送部的5我的彻底忙不过来,并且仓库也满了,怎么办呢?这个时候就须要再招聘一些临时配送员,假设maximumPoolSize为10,那么也就是说,临时招聘能够招5我的,配送部签定正式劳动合同的人和签定临时合同的人加一块不能超过配送部规定的最大人数(10人)。因此说,maximumPoolSize就是线程池可以容许的存在的最大线程的数量。

  • keepAliveTime

    存活时间。为何要有这个呢?想一下,双十一过去了,货物已经配送的差很少了。临时合同写的是若是临时配送员2天没有配送了,那配送部就有权利终止临时合同,如今已经达到2天这个点了,须要开除这些临时配送专员了。对于线程池来讲,keepAliveTime就是用来表示,当除核心线程池以外的线程超过keepAliveTime时间以后,就须要被系统回收了。

  • unit

    keepAliveTime的时间单位。

  • workQueue

    工做队列。这个就至关于一个仓库,如今配送部5我的都在配送,可是还不断的有新的快递达到,这个时候就须要一个仓库来存放这些快递。对于线程池来讲,当核心线程都有本身的任务处理,而且还有任务进来的时候,就会将任务添加到工做队列中去。

  • threadFactory

    线程工厂。就是用来建立线程的。能够类比成招聘组,会给每一个线程分配名字或者编号这样。

  • handler

    RejectedExecutionHandler 用来描述拒绝策略的。假设如今个人仓库也知足,而且配送部已经达到10我的了。怎么办呢,那么只能采用一些策略来拒绝任务了。

线程池的状态

// runState is stored in the high-order bits
//RUNNING;该状态的线程池接收新任务,而且处理阻塞队列中的任务
private static final int RUNNING    = -1 << COUNT_BITS;
//SHUTDOWN;该状态的线程池不接收新任务,但会处理阻塞队列中的任务;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//STOP;不接收新任务,也不处理阻塞队列中的任务,而且会中断正在运行的任务;
private static final int STOP       =  1 << COUNT_BITS;
//全部的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态
private static final int TIDYING    =  2 << COUNT_BITS;
//线程池完全终止,就变成TERMINATED状态。 
private static final int TERMINATED =  3 << COUNT_BITS;
复制代码

下面是在网上发现的一位大牛的图;感受能够较为直观的描述状态的变动

工做原理

线程池执行原理

有几个点须要注意。

一、如何提交一个任务到线程池?

public void execute(Runnable command) {
    //任务为null,直接抛出空指针异常
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    //若是线程数大于等于基本线程数,将任务加入队列
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}
复制代码
  • 若是少于corePoolSize线程正在运行,请尝试使用给定命令启动一个新线程做为其第一个任务。 对addWorker的调用会自动检查runState和workerCount,从而防止错误报警,在不该该的时候经过返回false来添加线程。
  • 若是一个任务可以成功排队,那么咱们仍然须要再次检查是否应该添加一个线程(由于现有的线程自上次检查以来已经死掉)或者自从进入这个方法以来,池关闭了。因此咱们从新检查状态,若是当前command已经stop了,那么就退出工做队列,若是没有的话就开始一个新的线程。
  • 若是队列满了,会想尝试去建立一个新的线程去执行,若是建立不了,那就执行拒绝策略。

二、如何建立一个线程去处理任务?

经过实现这个接口去建立一个新的线程

public interface ThreadFactory {
    Thread newThread(Runnable r);
}
复制代码

三、如何将任务添加到队列?

经过addWorker方法来添加,其实在excute中只是做为一个提交任务的入口,实际的处理逻辑都是在addWorker这个方法里来完成的。addWorker有两个参数:

  • firstTask 当前任务
  • core 用来标注当前须要建立的线程是不是核心线程,若是core为true,则代表建立的是核心线程,也就是说当前尚未达到最大核心线程数。

先来看下这个方法的前半部分:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    //自旋方式
    for (;;) {
        //获取当前线程池的状态
        int c = ctl.get();
        int rs = runStateOf(c);
    
        //若是状态是STOP,TIDYING,TERMINATED状态的话,则会返回false
        //若是状态是SHUTDOWN,可是firstTask不为空或者workQueue为空的话,那么直接返回false。
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        //经过自旋的方式,判断要添加的worker是否为corePool范畴以内的
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
复制代码

//若是超过CAPACITY限制了则直接返回false

wc >= CAPACITY
复制代码

//判断当前的workerCount是否大于corePoolsize,不然则判断是否大于maximumPoolSize //具体的比较取决于入参core是true仍是false。

wc >= (core ? corePoolSize : maximumPoolSize)
复制代码

若是上面两个有一个知足了,则直接返回false。

下面是判断WorkerCount经过CAS操做增长1是否成功,成功的话就到此结束

if (compareAndIncrementWorkerCount(c))
    break retry;
复制代码

若是不成功,则再次判断当前线程池的状态,若是如今获取到的状态与进入自旋的状态不一致的话,那么则经过continue retry从新进行状态的判断。

c = ctl.get();  // Re-read ctl
if (runStateOf(c) != rs)
    continue retry;
复制代码

再来看下这个方法的后面半个部分:
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    final ReentrantLock mainLock = this.mainLock;
    //建立一个新的Worker对象
    w = new Worker(firstTask);
    final Thread t = w.thread;
    //
    if (t != null) {
    //加锁
        mainLock.lock();
        try {
            // 在锁定的状况下从新检查。
            // 在一下状况退出:ThreadFactory 建立失败或者在获取锁以前shut down了
            int c = ctl.get();
            int rs = runStateOf(c);
           //状态校验
            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // 预先检查t是能够启动的
                    throw new IllegalThreadStateException();
                //添加至workers中
                workers.add(w);
                int s = workers.size();
                //若是超过了历史最大线程数,则将当前池数量设置为历史最大线程记录数
                if (s > largestPoolSize)
                    largestPoolSize = s;
                //标识添加工做线程成功
                workerAdded = true;
            }
        } finally {
        //解锁
            mainLock.unlock();
        }
        //若是添加成功则启动当前工做线程
        if (workerAdded) {
            t.start();
            //并将当前线程状态设置为已启动
            workerStarted = true;
        }
    }
} finally {
//添加失败
    if (! workerStarted)
        addWorkerFailed(w);
}
return workerStarted;
}
复制代码

拒绝策略有哪些?

  • 一、AbortPolicy:直接抛出异常,默认策略;
  • 二、CallerRunsPolicy:使用调用者本身的当前线程来执行任务;
  • 三、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
  • 四、DiscardPolicy:直接丢弃任务;

固然咱们也能够自定义拒绝策略。

经常使用工做队列类型

一、ArrayBlockingQueue

基于数组的阻塞队列,长度有限

二、LinkedBlockingQuene

基于链表的阻塞队列,长度无限,使用这个可能会致使咱们的拒绝策略失效。由于能够无限的建立新的工做线程。

三、PriorityBlockingQueue

具备优先级的无界阻塞队列;

三、SynchronousQuene

SynchronousQuene是一个是一个不存储元素的BlockingQueue;每个put操做必需要等待一个take操做,不然不能继续添加元素。因此这个比较特殊,它不存咱们的任务,也就说说它的每一个put操做必须等到另外一个线程调用take操做,不然put操做一直处于阻塞状态。

Worker

这个是ThreadPoolExecutor的一个内部类,表示一个工做线程。重要的是这个内部类实现了AbstractQueuedSynchronizer(AQS:抽象队列同步器)抽象类。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */
    private static final long serialVersionUID = 6138294804551838833L;

    /** 当前work持有的线程 */
    final Thread thread;
    /** 运行的初始任务。 可能为空。*/
    Runnable firstTask;
    /** 每一个线程完成任务的计数器 */
    volatile long completedTasks;

    /** * 构造函数 */
    Worker(Runnable firstTask) {
    // 禁止中断,直到runWorker
        setState(-1); 
        //想提交的任务交给当前工做线程
        this.firstTask = firstTask;
        //经过线程工厂建立一个新的线程
        this.thread = getThreadFactory().newThread(this);
    }

    /** 将run方法的执行委托给外部runWorker */
    public void run() {
        runWorker(this);
    }

    // 是否锁定
    //
    // 0表明解锁状态。
    // 1表明锁定状态。

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
    //尝试获取锁(重写AQS的方法)
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    //尝试释放锁(重写AQS的方法)
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    //加锁
    public void lock() { acquire(1); }
    //尝试加锁
    public boolean tryLock() { return tryAcquire(1); }
    //解锁
    public void unlock() { release(1); }
    //是否锁定
    public boolean isLocked() { return isHeldExclusively(); }
    //若是启动则中断
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}
复制代码

runWorker

最后来看下runWorker这个方法(ThreadPoolExecutor中的方法):

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}
复制代码

下面是对注释的蹩脚翻译,欢迎吐槽,但注意尺度,O(∩_∩)O哈哈~

主要工做循环运行。重复地从队列中获取任务并执行它们,同时处理一些问题:

  • 咱们可能会从最初的任务开始,在这种状况下,咱们不须要获得第一个任务。不然,只要池正在运行,咱们就从getTask得到任务。 若是它返回null,则因为更改池状态或配置参数而致使worker退出。其余退出的结果是在外部代码中抛出的异常,在这种状况下completeAbruptly成立,这一般会致使processWorkerExit来取代这个线程。
  • 在运行任何任务以前,获取锁以防止任务正在执行时发生其余池中断,调用clearInterruptsForTaskRun确保除非池正在中止,则此线程没有设置其中断。
  • 每一个任务运行以前都会调用beforeExecute,这可能会引起一个异常,在这种状况下,咱们会致使线程死亡(断开循环completeAbruptly为true),而不处理任务。
  • 假设beforeExecute正常完成,咱们运行任务,收集任何抛出的异常发送到afterExecute。 咱们分别处理RuntimeException,Error(这两个规范保证咱们陷阱)和任意的Throwables。 由于咱们不能在Runnable.run中从新抛出Throwable,因此咱们把它们封装在Errors中(到线程的UncaughtExceptionHandler)。 任何抛出的异常也保守地致使线程死亡。
  • task.run完成后,咱们调用afterExecute,这也可能会抛出一个异常,这也会致使线程死亡。 根据JLS Sec 14.20,即便task.run抛出,这个异常也是有效的。

异常机制的最终效果是afterExecute和线程的UncaughtExceptionHandler拥有关于用户代码遇到的任何问题的准确信息。

总结

本文是JUC的第二篇,意在经过查看源码来了解线程池的具体工做原理。文中若是存在不当的描述,但愿小伙伴们可以及时提出。灰常感谢!

欢迎关注微信公众号,干货满满哦~

相关文章
相关标签/搜索