让人抓头的Java并发(二) 线程池ThreadPoolExecutor分析

线程的建立须要开辟虚拟机栈、本地方法栈、程序计数器等线程私有的内存空间,在线程销毁时须要回收这些系统资源。频繁的建立销毁线程会浪费大量资源,使用线程池能够更好的管理和协调线程的工做。缓存

线程池的好处

  • 下降资源消耗,经过重复利用已有线程下降线程建立和销毁形成的消耗
  • 提升响应速度,任务到达没必要等待线程的建立
  • 管理复用线程,限制最大并发数
  • 实现定时执行或周期执行的任务(ScheduledThreadPoolExecutor)
  • 隔离线程环境,避免不一样服务线程相互影响,防止服务发生雪崩(在SpringCloud的hystrix中也是这样作的,不一样服务调用采用不一样线程池)

线程池的使用

ThreadPoolExecutor的构造方法以下:
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        …………
    }
复制代码
1)构造参数分析:
  • corePoolSize:表示常驻的核心线程数量。若是为0执行任务以后没有任何请求进入时将被销毁,若是大于0则不会被销毁。
  • maximumPoolSize:表示线程池最大可以容纳同时执行的线程数,必须大于等于1。若是和corePoolSize相等便是固定大小线程池,若是待执行线程数大于此数则按照参数handler处理。
  • keepAliveTime:表示线程池中的线程空闲时间,当空闲时间达到此值时,线程会被销毁直到剩下corePoolSize个线程。默认当线程数大于corePoolSize时才会起做用,可是当ThreadPoolExecutor的allowCoreThreadTimeOut设置为true时核心线程超时后也会被销毁。
  • unit:keepAliveTime的时间单位
  • workQueue:表示缓存队列,当请求线程数大于corePoolSize时,线程将进入BlockingQueue。
  • threadFactory:线程工厂,它用来生产一组相同任务的线程。经过给这个factory增长组名前缀来实现线程池命名,以方便在虚拟机栈分析时知道线程任务是由哪一个线程工程产生的。
  • handler:执行拒绝策略的对象。当workQueue满了以后而且活动线程数大于maximumPoolSize的时候,线程池经过该策略处理请求。

2)拒绝策略分析: ThreadPoolExecutor中提供了四个RejectedExecutionHandler策略。bash

  • AbortPolicy(默认):丢弃任务并抛出RejectedExecutionException异常。
  • DiscardPolicy:丢弃当前任务。
  • DiscardOldestPolicy:丢弃任务中等待最久的任务,而后把当前任务加入队列。
  • CallerRunsPolicy:调用任务的run()方法绕过线程池直接执行。

3)建立线程池的其余方式(不推荐):Executors这个线程池静态工厂能够建立三个线程池的包装对象:ForkJoinPool、ThreadPoolExecutor、ScheduledThreadPoolExecutor。Executors中关于ThreadPoolExecutor的核心方法以下:并发

// SynchronousQueue是不存储元素的阻塞队列,而且maximumPoolSize为Integer.MAX_VALUE
便是无界,当主线程提交任务速度高于CachedThreadPool的处理速度时会不断建立线程,
极端状况下会发生OOM
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
                                      
                                      
// keepAliveTime为0意味着多余的空闲线程会被马上终止,LinkedBlockingQueue的默认容量
是Integer.MAX_VALUE即无界,极端状况下会发生OOM
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
                                      
// LinkedBlockingQueue的默认容量是Integer.MAX_VALUE即无界,极端状况下会发生OOM
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
复制代码

看过了上述方法以后能够发现这三个方法构造出来的线程池都存在OOM的风险。而且不能灵活的配置线程工厂和拒绝策略,因此不推荐使用Executors来建立线程池。ui

4)向线程池提交任务:有两个方法execute()和submit()能够向线程池提交任务。execute()方法用于提交不须要返回值的任务,没法判断任务是否被线程池执行成功。submit()方法用于提交有返回值的任务(Callable)。线程池会返回一个future类型对象,经过future的get()方法能够获取返回值,值得注意的是get()方法会阻塞当前线程直到任务完成。spa

5)关闭线程池:有两个方法shutdown()和shutdownNow()能够关闭线程池。它们的原理是遍历线程池中的工做线程,而后逐个的调用线程的interrupt()方法来中断线程(没法响应中断的线程没法终止)。它们的区别在于shutdownNow()首先将线程池状态设置为STOP,而后尝试中止全部线程;shutdown()是将线程池状态设置为SHOTDOWN,而后中断全部没有正在执行任务的线程。线程

线程池的原理解析

当线程池接收到一个任务以后,执行流程以下图:code

  1. 判断当前工做线程数是否达到核心线程数,若是没有则建立一个新的线程来执行任务,若是达到了则进行下一个判断。
  2. 判断工做队列是否已经满了,若是工做队列没有满,则将任务加入工做队列中,不然进行下一个判断。
  3. 判断线程池是否已经满了,若是没有则建立新线程执行任务,不然按照饱和策略处理任务。

ThreadPoolExecutor执行示意图:cdn

下面是ThreadPoolExecutor中execute()方法的核心代码:对象

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // 获取用于返回线程数和线程池状态的integer数值
        int c = ctl.get();
        // 一、若是工做线程数小于核心线程数,则建立任务并执行
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 二、若是线程池处于RUNNING状态则将任务加入队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 三、核心线程池和队列都满了,尝试建立一个新的线程
        else if (!addWorker(command, false))
            // 四、若是建立失败则执行拒绝策略
            reject(command);
    }
复制代码

addWorker()主要是建立工做线程 -- 将任务包装成Worker类。在一、3两个步骤中建立线程时须要获取全局锁ReentrantLock避免被干扰,当当前工做线程数大于等于corePoolSize以后几乎全部的execute()都是在执行步骤2。 Worker在执行完任务以后还会循环获取工做队列的任务来执行while (task != null || (task = getTask()) != null),getTask()方法中获取阻塞队列中的任务(poll()或take(),若是核心线程会被销毁或者当前线程数大于核心线程数则用poll()超时获取)blog

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 
                    : workQueue.take();
复制代码

线程池的工做原理代码在这里就不具体分析了,下图直观的展现了线程池的工做原理。

线程池工做原理



合理配置线程池

想要合理的配置线程池首先须要分析任务特性:CPU密集型任务、IO密集型任务、混合型任务 .

  • CPU密集型任务:尽可能使用较小的线程池,通常为CPU核心数+1。CPU密集型任务的CPU使用率很高,过多的线程数运行只能增长上下文切换的次数,所以会带来额外的开销。

  • IO密集型任务:使用稍大的线程池,通常为2*CPU核心数。IO密集型任务CPU使用率并不高,可让CPU在等待IO的时候去处理别的任务,充分利用CPU。

  • 混合型任务:能够将任务分红IO密集型和CPU密集型任务,而后分别用不一样的线程池去处理。只要分完以后两个任务的执行时间相差不大,那么就会比串行执行高效。若是划分以后两个任务执行时间相差甚远,那么最终的时间仍然取决于后执行完的任务,并且还要加上任务拆分与合并的开销。



在线程池的实现中还涉及了不少并发包中的知识好比BlockingQueue、ReentrantLock、Condition等,在这里就暂时不进行介绍了,后续会介绍它们。

相关文章
相关标签/搜索