线程池ThreadPoolExecutor实现原理

摘抄自简书

1. 为何要使用线程池

在实际使用中,线程是很占用系统资源的,若是对线程管理不善很容易致使系统问题。所以,在大多数并发框架中都会使用线程池来管理线程,使用线程池管理线程主要有以下好处:数据库

  1. 下降资源消耗。经过复用已存在的线程和下降线程关闭的次数来尽量下降系统性能损耗;
  2. 提高系统响应速度。经过复用线程,省去建立线程的过程,所以总体上提高了系统的响应速度;
  3. 提升线程的可管理性。线程是稀缺资源,若是无限制的建立,不只会消耗系统资源,还会下降系统的稳定性,所以,须要使用线程池来管理线程。

2. 线程池的工做原理

当一个并发任务提交给线程池,线程池分配线程去执行任务的过程以下图所示:编程

 
线程池执行流程图.jpg

从图能够看出,线程池执行所提交的任务过程主要有这样几个阶段:缓存

  1. 先判断线程池中核心线程池全部的线程是否都在执行任务。若是不是,则新建立一个线程执行刚提交的任务,不然,核心线程池中全部的线程都在执行任务,则进入第2步;
  2. 判断当前阻塞队列是否已满,若是未满,则将提交的任务放置在阻塞队列中;不然,则进入第3步;
  3. 判断线程池中全部的线程是否都在执行任务,若是没有,则建立一个新的线程来执行任务,不然,则交给饱和策略进行处理

3. 线程池的建立

建立线程池主要是ThreadPoolExecutor类来完成,ThreadPoolExecutor的有许多重载的构造方法,经过参数最多的构造方法来理解建立线程池有哪些须要配置的参数。ThreadPoolExecutor的构造方法为:并发

ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

 

下面对参数进行说明:框架

  1. corePoolSize:表示核心线程池的大小。当提交一个任务时,若是当前核心线程池的线程个数没有达到corePoolSize,则会建立新的线程来执行所提交的任务,即便当前核心线程池有空闲的线程。若是当前核心线程池的线程个数已经达到了corePoolSize,则再也不从新建立线程。若是调用了prestartCoreThread()或者 prestartAllCoreThreads(),线程池建立的时候全部的核心线程都会被建立而且启动。
  2. maximumPoolSize:表示线程池能建立线程的最大个数。若是当阻塞队列已满时,而且当前线程池线程个数没有超过maximumPoolSize的话,就会建立新的线程来执行任务。
  3. keepAliveTime:空闲线程存活时间。若是当前线程池的线程个数已经超过了corePoolSize,而且线程空闲时间超过了keepAliveTime的话,就会将这些空闲线程销毁,这样能够尽量下降系统资源消耗。
  4. unit:时间单位。为keepAliveTime指定时间单位。
  5. workQueue:阻塞队列。用于保存任务的阻塞队列,关于阻塞队列能够看这篇文章。可使用ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue, PriorityBlockingQueue。
  6. threadFactory:建立线程的工程类。能够经过指定线程工厂为每一个建立出来的线程设置更有意义的名字,若是出现并发问题,也方便查找问题缘由。
  7. handler:饱和策略。当线程池的阻塞队列已满和指定的线程都已经开启,说明当前线程池已经处于饱和状态了,那么就须要采用一种策略来处理这种状况。采用的策略有这几种:
    1. AbortPolicy: 直接拒绝所提交的任务,并抛出RejectedExecutionException异常;
    2. CallerRunsPolicy:只用调用者所在的线程来执行任务;
    3. DiscardPolicy:不处理直接丢弃掉任务;
    4. DiscardOldestPolicy:丢弃掉阻塞队列中存放时间最久的任务,执行当前任务

线程池执行逻辑ide

经过ThreadPoolExecutor建立线程池后,提交任务后执行过程是怎样的,下面来经过源码来看一看。execute方法源码以下:源码分析

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    //若是线程池的线程个数少于corePoolSize则建立新线程执行当前任务
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //若是线程个数大于corePoolSize或者建立线程失败,则将任务存放在阻塞队列workQueue中
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //若是当前任务没法放进阻塞队列中,则建立新的线程来执行任务
    else if (!addWorker(command, false))
        reject(command);
}

 

ThreadPoolExecutor的execute方法执行逻辑请见注释。下图为ThreadPoolExecutor的execute方法的执行示意图:性能

 
execute执行过程示意图.jpg

execute方法执行逻辑有这样几种状况:ui

  1. 若是当前运行的线程少于corePoolSize,则会建立新的线程来执行新的任务;
  2. 若是运行的线程个数等于或者大于corePoolSize,则会将提交的任务存放到阻塞队列workQueue中;
  3. 若是当前workQueue队列已满的话,则会建立新的线程来执行任务;
  4. 若是线程个数已经超过了maximumPoolSize,则会使用饱和策略RejectedExecutionHandler来进行处理。

须要注意的是,线程池的设计思想就是使用了核心线程池corePoolSize,阻塞队列workQueue和线程池maximumPoolSize,这样的缓存策略来处理任务,实际上这样的设计思想在须要框架中都会使用。this

4. 线程池的关闭

关闭线程池,能够经过shutdownshutdownNow这两个方法。它们的原理都是遍历线程池中全部的线程,而后依次中断线程。shutdownshutdownNow仍是有不同的地方:

  1. shutdownNow首先将线程池的状态设置为STOP,而后尝试中止全部的正在执行和未执行任务的线程,并返回等待执行任务的列表;
  2. shutdown只是将线程池的状态设置为SHUTDOWN状态,而后中断全部没有正在执行任务的线程

能够看出shutdown方法会将正在执行的任务继续执行完,而shutdownNow会直接中断正在执行的任务。调用了这两个方法的任意一个,isShutdown方法都会返回true,当全部的线程都关闭成功,才表示线程池成功关闭,这时调用isTerminated方法才会返回true。

5. 如何合理配置线程池参数?

要想合理的配置线程池,就必须首先分析任务特性,能够从如下几个角度来进行分析:

  1. 任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
  2. 任务的优先级:高,中和低。
  3. 任务的执行时间:长,中和短。
  4. 任务的依赖性:是否依赖其余系统资源,如数据库链接。

任务性质不一样的任务能够用不一样规模的线程池分开处理。CPU密集型任务配置尽量少的线程数量,如配置Ncpu+1个线程的线程池。IO密集型任务则因为须要等待IO操做,线程并非一直在执行任务,则配置尽量多的线程,如2xNcpu。混合型的任务,若是能够拆分,则将其拆分红一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,若是这两个任务执行时间相差太大,则不必进行分解。咱们能够经过Runtime.getRuntime().availableProcessors()方法得到当前设备的CPU个数。

优先级不一样的任务可使用优先级队列PriorityBlockingQueue来处理。它可让优先级高的任务先获得执行,须要注意的是若是一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。

执行时间不一样的任务能够交给不一样规模的线程池来处理,或者也可使用优先级队列,让执行时间短的任务先执行。

依赖数据库链接池的任务,由于线程提交SQL后须要等待数据库返回结果,若是等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。

而且,阻塞队列最好是使用有界队列,若是采用无界队列的话,一旦任务积压在阻塞队列中的话就会占用过多的内存资源,甚至会使得系统崩溃。

参考文献

《Java并发编程的艺术》
ThreadPoolExecutor源码分析,很详细

相关文章
相关标签/搜索