java多线程之ThreadPoolExecutor原理

「本文已参与好文召集令活动,点击查看:后端、大前端双赛道投稿,2万元奖池等你挑战!」前端

ThreadPoolExecutor是Java的线程池并发代名词,多线程开发基本都是基于这个去作具体的业务开发。虽然以为本身回了,网上帖子已经有不少的文章写这个,可是是本身一一点写的,终归是要比看别人的理解更加深入,因此最近本身在对java知识的系统梳理。 那么接下来主要分析下这个多线程框架的原理。java

ThreadPoolExecutor的构造函数以成员变量介绍

public ThreadPoolExecutor(int corePoolSize,
                           int maximumPoolSize,
                           long keepAliveTime,
                           TimeUnit unit,
                           BlockingQueue<Runnable> workQueue,
                           ThreadFactory threadFactory,
                           RejectedExecutionHandler handler) {

复制代码

面试靠的最可能是这个构造函数中7个参数的做用,面试

  • corePoolSize 是核心线程数, 即便线程是空闲的,线程池一直保持的的线程数,除非 allowCoreThreadTimeOut参数设置为true
  • maximumPoolSize 线程池最大线程数
  • keepAliveTime unit 线程存活时间 和 时间单位
  • workQueue 是任务队列,是用来保持task,该队列保持住了Runnable的任务,经过调用 线程池的execute的方法.
  • threadFactory 建立线程的工厂
  • RejectedExecutionHandler 是当线程数超过限制以及队列也满了,须要执行的拒绝策 略.

成员变零后端

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1; //线程容量

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
复制代码

面试最喜欢问的是 ctl变量的表明什么意义? ctl变量的的的用高3位表示线程池的状态,用低29位表示线程个数,二者经过 | 操做,拼接出ctl变量,也就是线程池的最大线程数capacity是 (2^29)-1。markdown

线程池状态

  • RUNNING 运行状态 -1 << 29 表示线程池能够接新的任务而且处理队列任务
  • SHUTDOWN 关闭状态态 -1 << 29 表示线程池不接受新的线程池任务可是能够处理队列中的任务
  • STOP 中止状态 1 << 29 表示线程池不接受新的线程池任务也不处理队列中的任务而且中断线程池里中正在执行的任务
  • TIDYING 2 << 29 表示全部的线程池都已经中断了,线程数为0,线程状态转为为TIDYING, 将执行terminated钩子函数
  • TERMINATED 3 << 29 表示全部terminated方法都已经执行完成。

线程状态之间装换图

image.png

线程池的提交任务执行流程

首先咱们来看平时业务代码是提交任务到线程池执行的函数是经过execute或者submit方法, 区别就是submit返回具备Future,execute返回void,的、那么接下来咱们主要分析execute 的执行流程,submit涉及到线程异步返回,以后会另外单独分析,那么下面这个execute函数 就能看出线程池的整个执行流程,多线程

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
         // 当线程池的核心线程数设置为0状况下,那么这时workerCountOf(recheck)为0,这时就开启非线程数处理队列任务
          addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}
复制代码

线程池执行任务流程图以下: image.png 我相信大概的流程通常同窗是清楚的:并发

  1. 当线程数的Worker线程 < corePoolSize 建立核心线程数执行
  2. 当线程数的Worker线程 > corePoolSize,将任务加入任务队列中
  3. 则当corePoolSize< maxPoolsize,则新增非核心线程执行任务
  4. 当队列满了,线程数也已经达到maxPoolsize,则执行拒绝策略

实际源码中执行流程还有一些小细节容易被忽略的地点框架

  • 从新检查线程的状态以及检查 线程池的线程数的流程

线程池新增工做线程的流程

线程池新增工做任务主要addWorker方法。因为代码比较长,我就在 代码里写好注释less

private boolean addWorker(Runnable firstTask, boolean core) {
   retry:
   for (;;) {
       int c = ctl.get();
       int rs = runStateOf(c);

       // Check if queue empty only if necessary.
       if (rs >= SHUTDOWN &&
          //第一个条件: 线程至少不是运行状态,那么就是shutdown stop tidying,terminated状态
           ! (rs == SHUTDOWN &&
              firstTask == null &&
              ! workQueue.isEmpty()))
            //第二个条件: 当前线程池是shutdown状态且任务队列非空而且工做任务第一个任务是空的取反条件,这个含义是当除了SHUTDOWN状态且第一个任务为空且任务队列不为空
           // 状况下,直接返回false,增长Work线程失败
           return false;

       for (;;) {
           int wc = workerCountOf(c);
           if (wc >= CAPACITY ||
               wc >= (core ? corePoolSize : maximumPoolSize))
               return false;
           if (compareAndIncrementWorkerCount(c))
               break retry;
           c = ctl.get();  // Re-read ctl
           if (runStateOf(c) != rs)
               continue retry;
           // else CAS failed due to workerCount change; retry inner loop
       }
   }

   boolean workerStarted = false;
   boolean workerAdded = false;
   Worker w = null;
   try {
       w = new Worker(firstTask);
       final Thread t = w.thread;
       if (t != null) {
           final ReentrantLock mainLock = this.mainLock;
           mainLock.lock();
           try {
               // Recheck while holding lock.
               // Back out on ThreadFactory failure or if
               // shut down before lock acquired.
               int rs = runStateOf(ctl.get());

               if (rs < SHUTDOWN ||
               // 线程池是running状态
                   (rs == SHUTDOWN && firstTask == null)) {
                   //线程池处于shutdown状态而且第一个task为空
                   if (t.isAlive()) // precheck that t is startable
                       throw new IllegalThreadStateException();
                   //加入工做线程的集合
                   workers.add(w);
                   int s = workers.size();
                   if (s > largestPoolSize)
                      //记录最大线程数
                       largestPoolSize = s;
                   workerAdded = true;
               }
           } finally {
-                 mainLock.unlock();
-             }
-             if (workerAdded) {
-                 t.start();
                  workerStarted = true;
           }
       }
   } finally {
       if (! workerStarted)
           addWorkerFailed(w);
   }
   return workerStarted;
}
复制代码

添加工做线程主要步骤异步

  • 检查线程池的运行状态以及队列是不是空,增长线程。
    为何增长这个判断,主要是由于线程池是多线程的随即可能另外调用shutdown等方法关闭线程池,因此作每一步以前都要再次check线程池的状态,其中比较重要的点是线程池在除了Running状态,其余的只有shutdow状态,且队列任务非空的状况,才能增长work线程处理任务。
  • 判断线程池的线程是核心线程数,而后就判断大于核心线程数, 若是不是增长的核心线程数,而后经过 CAS增长线程数加1,而后re-read的ctl的如今的状态是否刚开始进入循环的状态保持一致。
  • 建立Worker对象,它的第一个参数Runable就是执行的第一个task,而后获取mainLock的重入锁, 而后再次判断线程池的状态是不是shutdown状态,而后将Worker对象加入工做线程的Set集合中, 判断是大于largePoolSize,则将workSet的size赋值largePoolSize,而后赋值workerAdded为true,接下来在finnally中workerAdded为true,则调用Worker的start方法启动该Worker线程,

若是WorkerAdded失败,则从Worder的Set移除刚才加入Worker线程,并将线程池的线程数减1,

工做线程Worker的执行流程

首先来看下Work的类的成员变量的构造函数,从下面的Work的代码,能够看到它是实现了 RUnnable接口,上一节Worker启动是调用了它的start方法,真正由操做系统调度执行 的其run方法,那么接下来重点看下run的工做流程。

private final class Worker
   extends AbstractQueuedSynchronizer
   implements Runnable
{
   /**
    * This class will never be serialized, but we provide a
    * serialVersionUID to suppress a javac warning.
    */
   private static final long serialVersionUID = 6138294804551838833L;

   /** Thread this worker is running in.  Null if factory fails. */
   final Thread thread;
   /** Initial task to run.  Possibly null. */
   Runnable firstTask;
   /** Per-thread task counter */
   volatile long completedTasks;

   /**
    * Creates with given first task and thread from ThreadFactory.
    * @param firstTask the first task (null if none)
    */
   Worker(Runnable firstTask) {
        //初始化状态为-1,表示不能被中断
       setState(-1); // inhibit interrupts until runWorker
       this.firstTask = firstTask;
       this.thread = getThreadFactory().newThread(this);
   }
复制代码

下面代码中Work的run直接调用runWork,并传入自身对象, 开始一个循环判断 第一个任务后者从任务队列中取任务不为空,就开始上锁,而后执行任务,若是任务 队列为空了,则处理Work的退出。

/** Delegates main run loop to outer runWorker  */
public void run() {
    //直接调用runWorker函数
    runWorker(this);
}

final void runWorker(Worker w) {
    // Wokder当前线程
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    //将state值赋值为0,这样就运行中断
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 循环判断第一个Task获取从获取任务
        while (task != null || (task = getTask()) != null) {
           //获取当前Work的锁,处理任务,也就是当前Work线程处理是同步处理任务的
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
             //线程池的状态至少是stop,即便stop,tidying.terminated状态
           if ((runStateAtLeast(ctl.get(), STOP)
             //检查线程是否中断且清楚中断
           || (Thread.interrupted()
                 &&
                  //再次检查线程池的状态至少是STOP
                  runStateAtLeast(ctl.get(), STOP))) &&
                  //再次判断是否中断
                !wt.isInterrupted())
                 //中断线程
                 wt.interrupt();
            try {
               //执行业务任务前处理(钩子函数)
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                   // 这里就是执行提交线程池的Runnable的任务的run方法                               task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //执行业务任务后处理(钩子函数)
                    afterExecute(task, thrown);
                }
            } finally {
                //执行结束重置为空,回到while循环拿下一个
                task = null;
                //处理任务加1
                w.completedTasks++;
                //释放锁,处理下一个任务
                w.unlock();
            }
        }
        //代码执行到这里,表明业务的任务没有异常,否则不会走到这里,
        //由于上一层try没有catch异常的,而业务执行出现异常,最里层
        //虽然catch了异常,可是也都经过throw向外抛出
        completedAbruptly = false;
    } finally {
     //若是循环结束,则处理Work退出工做,表明任务拿不到任务,即任务队列没有任务了
      processWorkerExit(w, completedAbruptly);
    }
}
复制代码

下面就来看下getTask获取任务队列的处理逻辑 、 若是这里返回null,即runWorker循环退出,则会处理finnaly中processWorkExit, 处理Work线程的退出,下面是getWork返回null的状况:

  1. 若是线程池状态值至少是SHUTDOWN状态,而且 线程池状态值至少是STOP状态,或者是任务队列是空,则将线程池的workcout减1,并返回null,
  2. 计算线程池中线程池的数量,若是线程数量大于最大线程数量, 或者 allowCoreThreadTimeOut参数为true 或者 线程数大于而且任务队列为空,则将线程池减1,并返回null,
private Runnable getTask() {
    //超时标志
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
         //获取线程状态
        int c = ctl.get();
        //线程状态
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
       // 若是线程池状态值至少是SHUTDOWN状态,
        if (rs >= SHUTDOWN
        线程池状态值至少是STOP状态,或者是任务队列是空
        && (rs >= STOP || workQueue.isEmpty())) {
            // CAS将worker线程数减1
            decrementWorkerCount();
            return null;
        }
        //计算线程池线程数量
        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // allowCoreThreadTimeOut参数设置为true,或则线程池的线程数大于corePoolSize, 表示须要超时的Worker须要退出,
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
         //线程数大于最大线程数 || 已经超时
        if ((wc > maximumPoolSize || (timed && timedOut))
            // 线程数大于1 或者 任务队列为空
            && (wc > 1 || workQueue.isEmpty())) {
            // CAS将线程数减1
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 须要处理超时的Worker,则获取任务队列中任务等待的时间
            //就是线程池构造函数中keepAliveTime时间,若是不处理超时的Worker
            //则直接调用take一直阻塞等待任务队列中有任务,拿到就返回Runnale任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
复制代码

Worker的退出处理: 1 从上面分析知道completedAbruptly是任务执行时是否出现异常标志, 若是任务执行过程出错,则将线程池的线程数量减1 2.加线程池的mainLock的全局锁,这里主要区分Worker执行任务中,拿的是Worker内部的锁,完成任务加1,将worker从Worker的集合移除, 3. 执行tryTerminate函数,是否线程池线程池是否关闭 4. 根据线程池状态是否补充非核心的Worker线程去处理

private void processWorkerExit(Worker w, boolean completedAbruptly) {
     //任务执行时出现异常,则减去工做
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();
     //拿到线程池的主锁
    final ReentrantLock mainLock = this.mainLock;
    //加锁
    mainLock.lock();
    try {
       //完成任务加1
        completedTaskCount += w.completedTasks;
        //将worker从Worker的集合移除
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    //尝试线程池关闭
    tryTerminate();
    //获取线程池的ctl
    int c = ctl.get();
     //若是线程池的状态值小于STOP,即便SHUTDOWN RUNNING
    if (runStateLessThan(c, STOP)) {
         //任务执行没有异常
        if (!completedAbruptly) {
            //allowCoreThreadTimeOut参数true,则min=0,表示不须要线程常驻。
            //负责是有corePoolSize个线程常驻线程池
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
                //若是线程池数大于最小,也就是不须要补充线程执行任务队列的任务
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 走到这里表示线程池的线程数为0,而任务队列又不为空,得补充一个线程处理任务   addWorker(null, false);
    }
}
复制代码

tryTerminate的逻辑是处理线程池关闭的场景

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        //线程池是RUNNING状态
        if (isRunning(c) ||
           //线程池状态至少是TIDYING
            runStateAtLeast(c, TIDYING) ||
            //线程池状态是SHUTDOWN可是队列不为空
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
             //中断一个空闲线程
            interruptIdleWorkers(ONLY_ONE);
            return;
        }
        //只有最后一个线程才能走到这里,处理线程池从TIDYIING状态
        //到TERMINATED状态
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try { 
                   //钩子函数
                    terminated();
                } finally {
                   //设置线程池TERMINATED状态
                    ctl.set(ctlOf(TERMINATED, 0));
                    //唤醒调用awaitTermination的线程
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}
复制代码

线程池的拒绝策略 RejectedExecutionHandler

当线程池没法处理任务时的处理策略:
1.默认拒绝策略是AbortPolicy 直接抛出RejectedExecutionException异常
2.DiscardPolicy 直接丢弃任务
3.DiscardOldestPolicy 丢弃任务队列中最老的任务,这里以前理解是直接丢弃,其实看了源码以后,其实它仍是当线程池还咩有关闭时,尝试去提交该任务到线程池去执行

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardOldestPolicy} for the given executor.
     */
    public DiscardOldestPolicy() { }

    /**
     * Obtains and ignores the next task that the executor
     * would otherwise execute, if one is immediately available,
     * and then retries execution of task r, unless the executor
     * is shut down, in which case task r is instead discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}
复制代码
  1. CallerRunsPolicy 直接调用方去执行这个任务,也就是直接Runnable的run函数。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code CallerRunsPolicy}.
     */
    public CallerRunsPolicy() { }

    /**
     * Executes task r in the caller's thread, unless the executor
     * has been shut down, in which case the task is discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}
复制代码

总结 本文主要就线程池的状态转换、工做线程Worker建立以及执行任务队列中任务的流程、拒绝策略的详细分析。

相关文章
相关标签/搜索