ThreadPoolExecutor 源码解析

1、线程

  1. 线程是CPU 调度的最小操做单位,线程模型分为KLT 模型和ULT 模型,JVM 使用的是KLT 模型。
  2. 线程的状态 :NEW,RUNNABLE,BLOCKED,TERMINATED

2、线程池

1. 线程池解决的两大核心问题:
  • 在执行大量异步运算的时候,线程池用优化系统性能,减小线程的反复建立所带来的的系统开销
  • 提供了一种限制和管理资源的方法
2. 7 大核心参数:
  1. corePoolSize :线程池中的核心线程数,当提交一个任务时,线程池建立一个新线程执行任务,直到当前线程数等于corePoolSize;若是当前线程数为corePoolSize,继续提交的任务被保存到 阻塞队列中,等待被执行;若是执行了线程池的prestartAllCoreThreads()方法,线程池会 提早建立并启动全部核心线程。编程

  2. maximumPoolSize :线程池中容许的最大线程数。若是当前阻塞队列满了,且继续提交任务,则建立新的线程执行任务,前提是当前线程数小于maximumPoolSize;数组

  3. keepAliveTime :线程池维护线程所容许的空闲时间。当线程池中的线程数量大corePoolSize的时 候,若是这时没有新的任务提交,核心线程外的线程不会当即销毁,而是会等待,直到等待 的时间超过了keepAliveTime;并发

  4. unit: keepAliveTime的单位;异步

  5. workQueue: 用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了以下阻塞队列:函数

  • ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
  • LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量一般要高于ArrayBlockingQuene; -
  • SynchronousQuene:一个不存储元素的阻塞队列,每一个插入操做必须等到另外一个线程调用移除操做,不然插入操做一直处于阻塞状态,吞吐量一般要高于 LinkedBlockingQuene;
  • priorityBlockingQuene:具备优先级的无界阻塞队列;
  1. threadFactory:它是ThreadFactory类型的变量,用来建立新线程。默认使用 Executors.defaultThreadFactory() 来建立线程。使用默认ThreadFactory来建立线程 时,会使新建立的线程具备相同的NORM_PRIORITY优先级而且是非守护线程,同时也设 置了线程的名称。oop

  2. handler: 线程池的饱和策略,当阻塞队列满了,且没有空闲的工做线程,若是继续提交任务,必 须采起一种策略处理该任务,线程池提供了4种策略:性能

  • AbortPolicy:直接抛出异常,默认策略;
  • CallerRunsPolicy:用调用者所在的线程来执行任务;
  • DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
  • DiscardPolicy:直接丢弃任务;

上面的4种策略都是ThreadPoolExecutor的内部类。固然也能够根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如 记录日志或持久化存储不能处理的任务。测试

3. 线程池的生命周期状态 :
  • NEW
  • RUNNABLE
  • WATING
  • BLOCKED
  • TIMED_WATING
  • TERMINATED
4. 线程池的重要属性 :ctl
  1. ctl 是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两 部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),这 里能够看到,使用了Integer类型来保存,高3位保存runState,低29位保存 workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常 量表示workerCount的上限值,大约是5亿。优化

  2. runState 主要提供线程池生命周期的控制,主要值包括:ui

  • RUNNING

(1) 状态说明:线程池处在RUNNING状态时,可以接收新任务,以及对已添加的任务进行 处理。

(2) 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被建立,就处 于RUNNING状态,而且线程池中的任务数为0!

  • SHUTDOWN

(1) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。

(2) 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> -SHUTDOWN。

  • STOP

(1) 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,而且会中 断正在处理的任务。

(2) 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。

  • TIDYING

(1) 状态说明:当全部的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING 状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在 ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理; 能够经过重载terminated()函数来实现。

(2) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空而且线程池中执行的任务也 为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的 任务为空时,就会由STOP -> TIDYING。

  • TERMINATED

(1) 状态说明:线程池完全终止,就变成TERMINATED状态。

(2) 状态切换:线程池处在TIDYING状态时,执行完terminated()以后,就会由 TIDYING - > TERMINATED。 进入TERMINATED的条件以下: 线程池不是RUNNING状态; 线程池状态不是TIDYING状态或TERMINATED状态; 若是线程池状态是SHUTDOWN而且workerQueue为空; workerCount为0,设置TIDYING状态成功。

线程池状态转换

  1. ctl相关 API
  • runStateOf():获取运行状态;

  • workerCountOf():获取活动线程数;

  • ctlOf():获取运行状态和活动线程数的值。

5. 线程池的行为
  • execute(Runnable command):执行Ruannable类型的任务

  • submit(task):可用来提交Callable或Runnable任务,并返回表明此任务的Future 对象

  • shutdown():在完成已提交的任务后封闭办事,再也不接管新任务,

  • shutdownNow():中止全部正在履行的任务并封闭办事。

  • isTerminated():测试是否全部任务都履行完毕了。

  • isShutdown():测试是否该ExecutorService已被关闭。

6. 经常使用线程池的具体实现
ThreadPoolExecutor 默认线程池 
ScheduledThreadPoolExecutor 定时线程池
7. 线程池监控API
  • public long getTaskCount() //线程池已执行与未执行的任务总数

  • public long getCompletedTaskCount() //已完成的任务数

  • public int getPoolSize() //线程池当前的线程数

  • public int getActiveCount() //线程池中正在执行任务的线程数量

3、源码解析

execute() 方法
//在未来的某个时间执行给定的任务。任务能够是新起一个新线程或者复用现有池线程中的线程去执行
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * 执行过程仍是分为 3 步:
         *
         * 1.执行任务: 
         * 若是小于核心线程数,尝试建立一个新线程来执行给定的任务。
         * 方法 addWorker() 就是真正的建立一个新线程来执行任务的方法。
         * addWorker()方法会对 runState 和 workerCount进行原子检查。
         * addWorker()方法会返回一个 boolean 值,经过返回 false 值来防止在不该该添加线程的状况下发出错误警报
         * 
         *
         * 2.添加到阻塞队列:
         * 未能知足条件执行完步骤 1 则添加到阻塞队列。
         * 若是任务能够成功排队,会再次进行检查,检查是否应该添加线程(由于现有线程自上次检查后就死了),
         * 或者自进入此方法以来该池已关闭。所以,须要从新检查状态,并在中止的状况下在必要时回滚队列,若是没有,则启动一个新线程。 
         *
         * 3.拒绝任务: 
         * 若是没法将任务添加至阻塞队列,最大线程数也未达到最大会尝试添加一个新的线程。若是失败,说明线程池已关闭或处于饱和状态,所以拒绝该任务。
         */
         
         //clt记录着runState和workerCount
        int c = ctl.get();
        /*
         * workerCountOf方法取出低29位的值,表示当前活动的线程数;
         * 若是当前活动线程数小于corePoolSize,则新建一个线程放入线程池中;并把任务添加到该线程中。
         */
        if (workerCountOf(c) < corePoolSize) {
            /*
             * addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断仍是maximumPoolSize来判断
             * 若是为true,根据corePoolSize来判断;
             * 若是为false,则根据maximumPoolSize来判断
             */
            if (addWorker(command, true))
                return;
            //若是添加失败,则从新获取ctl值    
            c = ctl.get();
        }
        //执行到此处说明从核心线程里给当前任务分配线程失败
        //若是当前线程池是运行状态而且任务添加到队列成功
        if (isRunning(c) && workQueue.offer(command)) {
            //从新获取ctl值。即便添加队列成功也要再次检查,若是不是运行状态,因为以前已经把任务添加到workerQueue 中了,因此要移除该任务,执行事后经过handler使用拒绝策略对该任务进行处理,整个方法返回
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
                /*
                 * 获取线程池中的有效线程数,若是数量是0,则执行addWorker方法 这里传入的参数表示:
                 * 第一个参数为null,表示在线程池中建立一个线程,但不去启动;
                 * 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPolSize,添加线程时根据maximumPoolSize来判断;
                 * 若是判断workerCount大于0,则直接返回,在workQueue中新增的comman 会在未来的某个时刻被执行。
                 */
            //由于 任务已经被添加到workQueue中了,因此worker在执行的时候,会直接从workQueue中 获取任务。     
            else if (workerCountOf(recheck) == 0)
                //执行到这里说明任务已经添加到阻塞队列里了,最大线程数也未饱和,则建立一个新的线程去阻塞队列里拿任务
                //这步操做也就是建立一个线程,但并无传入任务,由于任务已经被添加到workQueue中了,因此worker在执行的时候,会直接从workQueue中 获取任务。
                //为何要这样作呢?是为了保证线程池在RUNNING状态下必需要有一个线程来执行任务。
                addWorker(null, false);
        }
        /*
         * 若是执行到这里,有两种状况:
         * 1. 线程池已经不是RUNNING状态;
         * 2. 线程池是RUNNING状态,但workerCount >= corePoolSize而且workQueue已满。
         * 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的 有限线程数量的上限设置为maximumPoolSize;
         * 若是失败则拒绝该任务
         */
        else if (!addWorker(command, false))
            reject(command);
    }
addWorker() 方法
/**
 * 检查是否能够根据当前线程池的状态添加一个新的工做线程去执行任务。
 * addWorker(runnable,true)表示从核心工做线程数中分配线程执行传进来的任务;
 * addWorker(null,false)表示从最大线程数中分配线程执行阻塞队列中的任务。
 * 线程池若是中止或者关闭则直接返回 false,若是线程池建立新线程失败一样也会返回 false。
 * 若是建立线程失败,或者线程工厂返回 null,或者执行当前 addWorker()的线程抛出异常,(注意是当前线程抛出异常,当前线程抛出异常只与当前任务有关,并不影响其余任务的执行),线程池的相关属性会当即回滚
 */
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    // 外层 for 循环就是为了能给任务分配线程作准备,判断状态--> 原子递增 workerCount
    // 直到线程池状态不符合条件返回 false ,或者自增成功跳出 for 循环
    // 一样的,getTask()从阻塞队列中获取任务的时候也是这么个逻辑,先对 workerCount 原子递减,再去执行任务
    for (;;) {
        //能够看到,每一步操做都会对线程池的状态参数作判断
        int c = ctl.get();
        int rs = runStateOf(c);
        
        //也是对线程池状态,队列状态作检查
        /**
         * 这里的状态判断也很好理解:
         * 线程池状态为SHUTDOWN,不会再接受新的任务了,返回 false
         * 想城池状态不为SHUTDOWN,传进来的任务为空,而且阻塞队列里也没任务,那还执行个锤子任务,一样返回 false 
         */
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        //前面已经判断过知足为任务分配一个线程去执行任务
        //这个 for 循环就是为了建立任务作准备,先去原子性的递增 workerCount,workerCount 递增成功了才会去真正的为任务分配线程去执行
        for (;;) {
            //当前工做线程数
            int wc = workerCountOf(c);
            //当前工做线程数大于corePoolSize 或者 maximumPoolSize (跟谁比较就是根据传进来的参数 core 判断),
            //说明也没有分配的线程能够执行任务了,返回 false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            /** 
             * 执行到这里说明知足条件了,能够分配出来线程去执行任务了
             * 尝试增长workerCount,若是成功,则跳出第一个for循环
             * 这里是进行 CAS 自增 ctl 的 workerCount(先把数量自增,再跳出 for 循环建立新的线程去执行任务)
             * 该方法内部也是调用了原子类 AtomicInteger.compareAndSet()方法,保证原子递增
             */    
            if (compareAndIncrementWorkerCount(c))
                break retry;
            //若是尝试添加新的工做线程失败则会继续判断当前线程池的状态,状态知足继续尝试为当前线程分配工做线程    
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    //跳出 for 循环以后说明线程池的工做线程数 workerCount 已经调节过了,接下来要作到就是真正的分配线程,执行任务
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //Worker 对象是个内部类,其实就是用threatFactory 生成一个新的线程
        //继承 AQS 类,实现Runable 接口,重写 run()方法,重写的 run()方法也很重要,后面会讲
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            //加锁保证同步
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //仍是先进行一通的线程池状态检查
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //这个 workers 是个 HashSet,线程池也是经过维护这个 workers 控制任务的执行
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        //largestPoolSize记录着线程池中出现过的最大线程数量
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                //终于,调用线程的 start() 方法
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        //若是建立线程失败,就要回滚线程池的状态了
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
Worker 类
Worker 类是用来干吗的,存在的意义

回头再看 Worker 类,线程池中的每个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组 Worker对象(HashSet)。

Worker类继承了AQS,并实现了Runnable接口,注意其中的firstTask和thread属 性:firstTask用它来保存传入的任务;thread是在调用构造方法时经过ThreadFactory来创 建的线程,是用来处理任务的线程。

Worker继承了AQS,使用AQS来实现独占锁的功能。为何不使用ReentrantLock来 实现呢?能够看到tryAcquire方法,它是不容许重入的,而ReentrantLock是容许重入的:

  1. lock方法一旦获取了独占锁,表示当前线程正在执行任务中;
  2. 若是正在执行任务,则不该该中断线程;
  3. 若是该线程如今不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务, 这时能够对该线程进行中断;
  4. 线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers 方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程 池中的线程是不是空闲状态;
  5. 之因此设置为不可重入,是由于咱们不但愿任务在调用像setCorePoolSize这样的 线程池控制方法时从新获取锁。若是使用ReentrantLock,它是可重入的,这样若是 在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。 因此,Worker继承自AQS,用于判断线程是否空闲以及是否能够被中断。 此外,在构造方法中执行了setState(-1);,把state变量设置为-1,为何这么作呢? 是由于AQS中默认的state是0,若是刚建立了一个Worker对象,尚未执行任务时,这时就不该该被中断,看一下tryAquire方法:
Worker 类中以及涉及到的重要的方法
  • tryAcquire(int unused) 方法
/**
    * 用于判断线程是否空闲以及是否能够被中断
    */
   protected boolean tryAcquire(int unused) {
           //cas 修改状态,不可重入
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

tryAcquire方法是根据state是不是0来判断的,因此,将state设置为-1是 为了禁止在执行任务前对线程进行中断。

正由于如此,在runWorker方法中会先调用Worker对象的unlock方法将state设置为 0。

  • runWorker(Worker w)方法
/**
 * Worker 类实现 Runnable 接口,重写的 run()方法
 */
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    //容许中断
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        //while 循环就是不断的去执行任务,当本身的任务(firstTask)执行完以后依然会从阻塞队列里拿任务去执行,就这样的操做保证了线程的重用
        //task 为空则从阻塞队列中获取任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            /**
             * 若是线程池正在中止,那么要保证当前线程是中断状态,若是不是的话,则要保证当前线程不是中断状态
             * 这里为何要这么作呢?考虑在执行该if语句期间可能也执行了shutdownNow方法,shutdownNow方法会 把状态设置为STOP,
             * 回顾一下STOP状态:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,
             * 调用 shutdownNow() 方法会使线程池进入到STOP状态。
             * STOP状态要中断线程池中的全部线程,而这里使用Thread.interrupted()来判断是否中断是为了确保在 RUNNING或者SHUTDOWN状态时线程是非中断状态的,
             * 由于Thread.interrupted()方法会重置中断的状态。
             */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

总结一下runWorker方法的执行过程:

  1. while循环不断地经过getTask()方法获取任务;
  2. getTask()方法从阻塞队列中取任务;
  3. 若是线程池正在中止,那么要保证当前线程是中断状态,不然要保证当前线程不是 中断状态;
  4. 调用task.run()执行任务;
  5. 若是task为null则跳出循环,执行processWorkerExit()方法;
  6. runWorker方法执行完毕,也表明着Worker中的run方法执行完毕,销毁线程。
  • getTask()方法
/**
 * 从阻塞队列中获取任务,返回值是 Runnable
 * 线程池状态不知足执行条件时直接返回 null
 */
private Runnable getTask() {
    //timeOut变量的值表示上次从阻塞队列中取任务时是否超时
    boolean timedOut = false; // Did the last poll() time out?
    
    //这里两个 for 循环操做和 addWorker() 方法里的两个 for 循环操做思想同样
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        
        //仍然检查线程池状态
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        //timed变量用于判断是否须要进行超时控制
        //allowCoreThreadTimeOut默认是false,也就是核心线程不容许进行超时
        //wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量
        //对于超过核心线程数量的这些线程,须要进行超时控制
        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        //和 addWorker() 里流程同样,也是先对线程池中 workerCount 进行控制,再进行后面的执行任务操做
        //知足条件则 workerCount 数量减一
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            //根据timed来判断,若是为true,则经过阻塞队列的poll方法进行超时控制,若是在keepAliveTime时间内没有获取到任务,则返回null
            //不然经过take方法,若是这时队列为空,则take方法会阻塞直到队列不为空
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            // 若是 r == null,说明已经超时,timedOut设置为true
            timedOut = true;
        } catch (InterruptedException retry) {
            // 若是获取任务时当前线程发生了中断,则设置timedOut为false并返回
            timedOut = false;
        }
    }
}
processWorkerExit() 
/**
 * getTask方法返回null时,在runWorker方法中会跳出while循环,而后会执行 processWorkerExit方法。
 * 作线程池的善后工做
 */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    //若是completedAbruptly值为true,则说明线程执行时出现了异常,须要将workerCount减1
    //若是线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount减1了,这里就不须要再减了
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //统计完成的任务数
        completedTaskCount += w.completedTasks;
        // 从workers中移除,也就表示着从线程池中移除了一个工做线程
        // workers 是前面提到的 HashSet,线程池就是经过维护这个 worker()来保证线程池运做的 
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    // 根据线程池状态进行判断是否结束线程池
    tryTerminate();


    /**
     * 当线程池是RUNNING或SHUTDOWN状态时,若是worker是异常结束,那么会直接addWorker;
     * 若是allowCoreThreadTimeOut=true,而且等待队列有任务,至少保留一个worker
     * 若是allowCoreThreadTimeOut=false,workerCount很多于corePoolSize
     */
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

至此,processWorkerExit执行完以后,工做线程被销毁,以上就是整个工做线程的生 命周期,从execute方法开始,Worker使用ThreadFactory建立新的工做线程, runWorker经过getTask获取任务,而后执行任务,若是getTask返回null,进入 processWorkerExit方法,整个线程结束

4、思考

  1. 线程池如何实现线程重用的?

就是在重写的 run()方法里,经过 while 循环,执行完 firstTask 以后依然从阻塞队列里获取任务去执行。

  1. 线程超时怎么处理?

当前面任务抛出异常,后面的线程还会执行吗? 答案是会。也是 while 循环里找答案,当前线程抛出异常只会对当前线程产生影响,对线程池里其余任务不会有影响。

  1. 何时会销毁?

是runWorker方法执行完以后,也就是Worker中的run方法执行完,由JVM自动回收。

  1. 阻塞队列选取?在JDK中提供了以下阻塞队列:
  • ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
  • LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量一般要高于ArrayBlockingQuene;
  • SynchronousQuene:一个不存储元素的阻塞队列,每一个插入操做必须等到另外一个线程调用移除操做,不然插入操做一直处于阻塞状态,吞吐量一般要高于 LinkedBlockingQuene;
  • PriorityBlockingQuene:具备优先级的无界阻塞队列;
  1. 丢弃策略选取?线程池提供了4种策略:
  • AbortPolicy:直接抛出异常,默认策略;
  • CallerRunsPolicy:用调用者所在的线程来执行任务;
  • DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
  • DiscardPolicy:直接丢弃任务;
  1. 线程数如何设置?
  • 通常设法是会根据咱们任务的类型去设置,简单分为: CPU 密集型 :CPU 核数 + 1 IO 密集型:2*CPU 核数 + 1

《Java并发编程实战》中最原始的公式是这样的: Nthreads=Ncpu∗Ucpu∗(1+WC)Nthreads​=Ncpu​∗Ucpu​∗(1+CW​);

  • Ncpu​表明CPU的个数,
  • Ucpu​表明CPU利用率的指望值(0<Ucpu<10<Ucpu​<1),
  • WCCW​仍然是等待时间与计算时间的比例。

上面提供的公式至关于目标CPU利用率为100%。 一般系统中不止一个线程池,因此实际配置线程数应该将目标CPU利用率计算进去。

相关文章
相关标签/搜索