源码分析—ThreadPoolExecutor线程池三大问题及改进方案

前言

在一次聚会中,我和一个腾讯大佬聊起了池化技术,说起到java的线程池实现问题,我说这个我懂啊,而后巴拉巴拉说了一大堆,而后腾讯大佬问我说,那你知道线程池有什么缺陷吗?我顿时哑口无言,甘拜下风,因此此次我再回来思考一下线程池的实现原理java

源码分析

ThreadPoolExecutor构造器

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
      //校验几个参数不能小于零,不然抛出异常
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  • corePoolSize:核心线程数
  • maximumPoolSize:最大线程数,线程池容许建立的最大线程数
  • workQueue:任务队列,BlockingQueue 接口的某个实现(常使用 ArrayBlockingQueue 和 LinkedBlockingQueue)
  • keepAliveTime:空闲线程的保活时间,若是某线程的空闲时间超过这个值都没有任务给它作,那么能够被关闭了。注意这个值并不会对全部线程起做用,若是线程池中的线程数少于等于核心线程数 corePoolSize,那么这些线程不会由于空闲太长时间而被关闭,固然,也能够经过调用 allowCoreThreadTimeOut(true)使核心线程数内的线程也能够被回收
  • threadFactory:用于生成线程,通常咱们使用Executors.defaultThreadFactory()
  • handler:当线程池已经满了,可是又有新的任务提交的时候,该采起什么策略由这个来指定

默认的几个属性:数据库

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 这里 COUNT_BITS 设置为 29(32-3),意味着前三位用于存放线程状态,后29位用于存放线程数
private static final int COUNT_BITS = Integer.SIZE - 3;
//最大线程数是 2^29-1=536870911
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
//111 00000000000000000000000000000
private static final int RUNNING    = -1 << COUNT_BITS;
// 000 00000000000000000000000000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 001 00000000000000000000000000000
private static final int STOP       =  1 << COUNT_BITS;
// 010 00000000000000000000000000000
private static final int TIDYING    =  2 << COUNT_BITS;
// 011 00000000000000000000000000000
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
//将CAPACITY取费后和c进行取与运算,能够获得高3位的值,即线程池的状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//将c和CAPACITY取与运算,能够获得低29位的值,即线程池的个数
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

采用一个 32 位的整数来存放线程池的状态和当前池中的线程数,其中高 3 位用于存放线程池状态,低 29 位表示线程数(CAPACITY)缓存

  • RUNNING:这个没什么好说的,这是最正常的状态:接受新的任务,处理等待队列中的任务
  • SHUTDOWN:不接受新的任务提交,可是会继续处理等待队列中的任务
  • STOP:不接受新的任务提交,再也不处理等待队列中的任务,中断正在执行任务的线程
  • TIDYING:全部的任务都销毁了,workCount 为 0。线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()
  • TERMINATED:terminated() 方法结束后,线程池的状态就会变成这个

execute方法

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
     
    int c = ctl.get();
      //若是当前的线程数小于corePoolSize
    if (workerCountOf(c) < corePoolSize) {
          //调用addWorker新建一个线程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
       // 到这里说明,要么当前线程数大于等于核心线程数,要么刚刚 addWorker 失败了
      //校验当前线程状态是RUNNING,并将command入队
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
          //若是不是运行状态,那么移除队列,并执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 若是线程池仍是 RUNNING 的,而且线程数为 0,那么开启新的线程
          //防止任务提交到队列中了,可是线程都关闭了
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
      //到这里说明队列已经满了,因此新建一个线程,若是新建的线程数已经超过了maximumPoolSize,那么执行拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}

用一张图来归纳一下上面的内容:
ide

  • 若是线程池中的线程数少于 coreThreadCount 时,处理新的任务时会建立新的线程;
  • 若是线程数大于 coreThreadCount 则把任务丢到一个队列里面,由当前空闲的线程执行;
  • 当队列中的任务堆积满了的时候,则继续建立线程,直到达到 maxThreadCount;
  • 当线程数达到 maxTheadCount 时还有新的任务提交,那么咱们就不得不将它们丢弃了。

咱们下面看一下addWorker是如何建立线程的:oop

addWorker源码分析

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
          //获取当前线程池状态
        int rs = runStateOf(c);
           
          //1
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
               //2. 校验传入的线程数是否超过了容量大小, 或者是否超过了corePoolSize或maximumPoolSize
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
              //到了这里说明线程数没有超,那么就用CAS将线程池的个数加1
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
              //3 说明有其余的线程抢先更新了状态,继续下一轮的循环,跳到外层循环
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
          //建立一个线程
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                  //4 若是线程是没有问题的话,那么将worker加入到队列中
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    // largestPoolSize 用于记录 workers 中的个数的最大值
                    // 由于 workers 是不断增长减小的,经过这个值能够知道线程池的大小曾经达到的最大值
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
               //若是worker入队成功,那么启动线程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
          //若是worker启动失败,那么就回滚woker线程建立的状态
        if (! workerStarted)
            addWorkerFailed(w);
    }
      // 返回线程是否启动成功
    return workerStarted;
}
  1. 这里主要是列举了几个条件不能建立新的worker的状况
    1. 线程池状态大于 SHUTDOWN,其实也就是 STOP, TIDYING, 或 TERMINATED
    2. firstTask != null
    3. workQueue.isEmpty()
      若是线程池处于 SHUTDOWN,可是 firstTask 为 null,且 workQueue 非空,那么是容许建立 worker 的
  2. 若是传入的core参数是true表明使用核心线程数 corePoolSize 做为建立线程的界限,也就说建立这个线程的时候,若是线程池中的线程总数已经达到 corePoolSize,那么不能响应此次建立线程的请求;若是是false,表明使用最大线程数 maximumPoolSize 做为界限
  3. 若是CAS失败并非由于有其余线程在嘈杂哦致使的,那么就直接在里层循环继续下一次的循环就行了,若是是由于其余线程的操做,致使线程池的状态发生了变动,若有其余线程关闭了这个线程池,那么须要回到外层的for循环
  4. 若是是 小于 SHUTTDOWN 那就是 RUNNING,则继续往下继续,或者状态是SHUTDOWN可是传入的firstTask为空,表明继续处理队列中的任务

addWorkerFailed学习

private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            workers.remove(w);
        decrementWorkerCount();
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

addWorkerFailed的处理就是将workers集合里面的worker移除,而后count减1,优化

worker对象

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
      
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;
 
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
      ....
}

Worker是继承AQS对象的,在建立Worker对象的时候会传入一个Runnable对象,并设置AQS的state状态为-1,并从线程工厂中新建一个线程ui

调用thread.start方法会调用到Worker的run方法中this

public void run() {
    runWorker(this);
}

Worker的run方法会调用到ThreadPoolExecutor的runWorker方法

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
          //若是task为空,那么就从workQueue里面获取task
        while (task != null || (task = getTask()) != null) {
            w.lock();
              // 若是线程池状态大于等于 STOP,那么意味着该线程也要中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                  // 这是一个钩子方法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                       //执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                       // 这是一个钩子方法
                    afterExecute(task, thrown);
                }
            } finally {
                   // 置空 task,准备 getTask 获取下一个任务
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
          //异常状况或getTask获取不到任务时会执行关闭
        processWorkerExit(w, completedAbruptly);
    }
}

传入一个Worker首先去校验firstTask是否是null,若是是那么就调用getTask方法从workQueue队列里面获取,而后判断一下当前的线程是否须要中断,如须要的话执行钩子方法,而后调用task的run方法执行task;
若是while循环里面getTask获取不到任务的话,就结束循环调用processWorkerExit方法执行关闭;
若是是异常缘由致使的while循环退出,那么会调用processWorkerExit并传入为true

getTask

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        //要么状态大于STOP,要么状态等于SHUTDOWN而且队列是空的,那么线程数减一后返回null
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);
        // 容许核心线程数内的线程回收,或当前线程数超过了核心线程数,那么有可能发生超时关闭
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
          //校验线程数是否超了,或者是否超时
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
              // 到 workQueue 中获取任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

这个方法返回null有以下几种状况:

  1. 当前状态是SHUTDOWN而且workQueue队列为空
  2. 当前状态是STOP及以上
  3. 池中有大于 maximumPoolSize 个 workers 存在(经过调用 setMaximumPoolSize 进行设置)

processWorkerExit

private void processWorkerExit(Worker w, boolean completedAbruptly) {
      //若是是异常缘由中断,那么须要将运行线程数减一
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
          //设置完成任务数
        completedTaskCount += w.completedTasks;
          //将worker从集合里移除
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
      //判断当前的线程池是否处于SHUTDOWN状态,判断是否要终止线程
    tryTerminate();

    int c = ctl.get();
      //若是是RUNNING或SHUTDOWN则会进入这个方法
    if (runStateLessThan(c, STOP)) {
          //如不是之外中断则会往下走
        if (!completedAbruptly) {
              //判断是否保留最少核心线程数
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
          //若是当前运行的Worker数比当前所须要的Worker数少的话,那么就会调用addWorker,添加新的Worker
        addWorker(null, false);
    }
}
  1. 判断是不是意外退出的,若是是意外退出的话,那么就须要把WorkerCount--
  2. 加完锁后,同步将completedTaskCount进行增长,表示总共完成的任务数,而且从WorkerSet中将对应的Worker移除
  3. 调用tryTemiate,进行判断当前的线程池是否处于SHUTDOWN状态,判断是否要终止线程
  4. 判断当前的线程池状态,若是当前线程池状态比STOP大的话,就不处理
  5. 判断是不是意外退出,若是不是意外退出的话,那么就会判断最少要保留的核心线程数,若是allowCoreThreadTimeOut被设置为true的话,那么说明核心线程在设置的KeepAliveTime以后,也会被销毁。
  6. 若是最少保留的Worker数为0的话,那么就会判断当前的任务队列是否为空,若是任务队列不为空的话并且线程池没有中止,那么说明至少还须要1个线程继续将任务完成
  7. 判断当前的Worker是否大于min,也就是说当前的Worker总数大于最少须要的Worker数的话,那么就直接返回,由于剩下的Worker会继续从WorkQueue中获取任务执行
  8. 若是当前运行的Worker数比当前所须要的Worker数少的话,那么就会调用addWorker,添加新的Worker,也就是新开启线程继续处理任务

线程池的三大问题

这个任务处理流程看似简单,实际上有不少坑,你在使用的时候必定要注意。

  1. JDK 实现的这个线程池优先把任务放入队列暂存起来,而不是建立更多的线程,它比较适用于执行 CPU 密集型的任务,也就是须要执行大量 CPU 运算的任务。因此当当前线程数超过核心线程数时,线程池不会增长线程,而是放在队列里等待核心线程空闲下来。

可是,咱们平时开发的 Web 系统一般都有大量的 IO 操做,比方说查询数据库、查询缓存等等。任务在执行 IO 操做的时候 CPU 就空闲了下来,这时若是增长执行任务的线程数而不是把任务暂存在队列中,就能够在单位时间内执行更多的任务,大大提升了任务执行的吞吐量。因此你看 Tomcat 使用的线程池就不是 JDK 原生的线程池,而是作了一些改造,当线程数超过 coreThreadCount 以后会优先建立线程,直到线程数到达 maxThreadCount,这样就比较适合于 Web 系统大量 IO 操做的场景了,你在实际运用过程当中也能够参考借鉴。

  1. 线程池中使用的队列的堆积量也是咱们须要监控的重要指标,对于实时性要求比较高的任务来讲,这个指标尤其关键。

我在实际项目中就曾经遇到过任务被丢给线程池以后,长时间都没有被执行的诡异问题。最初,我认为这是代码的 Bug 致使的,后来通过排查发现,是由于线程池的 coreThreadCount 和 maxThreadCount 设置的比较小,致使任务在线程池里面大量的堆积,在调大了这两个参数以后问题就解决了。跳出这个坑以后,我就把重要线程池的队列任务堆积量,做为一个重要的监控指标放到了系统监控大屏上。

  1. 若是你使用线程池请必定记住不要使用无界队列(即没有设置固定大小的队列)。也许你会以为使用了无界队列后,任务就永远不会被丢弃,只要任务对实时性要求不高,反正迟早有消费完的一天。可是,大量的任务堆积会占用大量的内存空间,一旦内存空间被占满就会频繁地触发 Full GC,形成服务不可用,我以前排查过的一次 GC 引发的宕机,原由就是系统中的一个线程池使用了无界队列。

线程池的改造方案

咱们这里直接学习Tomcat是如何优化线程池的,在咱们平时的使用中若是使用LinkedBlockingQueue的话,默认是使用Integer.MAX_VALUE,即无界队列(这种状况下若是没有配置队列的capacity的话,队列始终不会满,那么始终没法进入开启新线程到达maxThreads个数的地步,则此时配置maxThreads实际上是没有意义的)。

而在Tomcat中使用的是TaskQueue,TaskQueue的队列capacity为maxQueueSize,默认也是Integer.MAX_VALUE。可是,其重写offer方法,当其线程池大小小于maximumPoolSize的时候,返回false,即在必定程度改写了队列满的逻辑,修复了使用LinkedBlockingQueue默认的capacity为Integer.MAX_VALUE的时候,maxThreads失效的"bug"。从而能够继续增加线程到maxThreads,超过以后,继续放入队列。

因此综上,Tomcat的线程池使用了本身扩展的taskQueue,修改了offer的逻辑,以作到最小的改动实现了线程池的改造。

咱们再来回顾一下ThreadPoolExecutor的execute方法是怎么写的:
ThreadPoolExecutor#execute

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
      //这里,若是使用workQueue的offer成功的话,那么就不会建立新的线程,若是失败的话,就会走到else if方法进行建立新的线程
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

TaskQueue

public class TaskQueue extends LinkedBlockingQueue<Runnable> {
   private ThreadPoolExecutor parent = null;
    @Override
    public boolean offer(Runnable o) {
        //we can't do any checks
        if (parent==null) return super.offer(o);
        //we are maxed out on threads, simply queue the object
        if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
        //we have idle threads, just add it to the queue
        if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
        //当其线程池大小小于maximumPoolSize的时候,返回false
        if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
        //if we reached here, we need to add it to the queue
        return super.offer(o);
    }
}

咱们从这里能够看到

  1. 若是当前线程数已达到MaximumPoolSize,那么就放入到队列里去
  2. 若是当前线程池的数量大于正在运行的线程数,说明有空闲的线程,那么就将任务放入到队列中去
  3. 若当其线程池大小小于maximumPoolSize的时候,返回false
相关文章
相关标签/搜索