ThreadPoolExecutor使用和思考-线程池大小设置与BlockingQueue的三种

工做中多处接触到了ThreadPoolExecutor。趁着如今还算空,学习总结一下。java

前记:程序员

  1. jdk官方文档(javadoc)是学习的最好,最权威的参考。
  2. 文章分上中下。上篇中主要介绍ThreadPoolExecutor接受任务相关的两方面入参的意义和区别,池大小参数corePoolSize和maximumPoolSize,BlockingQueue选型(SynchronousQueue,LinkedBlockingQueue,ArrayBlockingQueue);中篇中主要聊聊与keepAliveTime这个参数相关的话题;下片中介绍一下一些比较少用的该类的API,及他的近亲:ScheduledThreadPoolExecutor。
  3. 若是理解错误,请直接指出。

查看JDK帮助文档,能够发现该类比较简单,继承自AbstractExecutorService,而AbstractExecutorService实现了ExecutorService接口。服务器

ThreadPoolExecutor的完整构造方法的签名是:多线程

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 函数

先记着,后面慢慢解释。性能

===============================神奇分割线==================================学习

其实对于ThreadPoolExecutor的构造函数网上有N多的解释的,大多讲得都很好,不过我想先换个方式,从Executors这个类入手。由于他的几个构造工厂构造方法名字取得使人很容易了解有什么特色。可是其实Executors类的底层实现即是ThreadPoolExecutor!spa

ThreadPoolExecutor是Executors类的底层实现。操作系统

在JDK帮助文档中,有如此一段话:线程

强烈建议程序员使用较为方便的 Executors 工厂方法 Executors.newCachedThreadPool()(无界线程池,能够进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)和Executors.newSingleThreadExecutor()(单个后台线程),它们均为大多数使用场景预约义了设置。

能够推断出ThreadPoolExecutor与Executors类必然关系密切。

===============================神奇分割线==================================

OK,那就来看看源码吧,从newFixedThreadPool开始。

ExecutorService newFixedThreadPool(int nThreads):固定大小线程池。

能够看到,corePoolSize和maximumPoolSize的大小是同样的(实际上,后面会介绍,若是使用无界queue的话maximumPoolSize参数是没有意义的),keepAliveTime和unit的设值表名什么?-就是该实现不想keep alive!最后的BlockingQueue选择了LinkedBlockingQueue,该queue有一个特色,他是无界的

Java代码  

public static ExecutorService newFixedThreadPool(int nThreads) {  

        return new ThreadPoolExecutor(nThreads, nThreads,  

                                      0L, TimeUnit.MILLISECONDS,  

                                      new LinkedBlockingQueue<Runnable>());  

      }  

ExecutorService newSingleThreadExecutor():单线程。

能够看到,与fixedThreadPool很像,只不过fixedThreadPool中的入参直接退化为1

Java代码   收藏代码

  1. public static ExecutorService newSingleThreadExecutor() {  
  2.         return new FinalizableDelegatedExecutorService  
  3.             (new ThreadPoolExecutor(11,  
  4.                                     0L, TimeUnit.MILLISECONDS,  
  5.                                     new LinkedBlockingQueue<Runnable>()));  
  6.     }  

ExecutorService newCachedThreadPool():无界线程池,能够进行自动线程回收。

这个实现就有意思了。首先是无界的线程池,因此咱们能够发现maximumPoolSize为big big。其次BlockingQueue的选择上使用SynchronousQueue。可能对于该BlockingQueue有些陌生,简单说:该QUEUE中,每一个插入操做必须等待另外一个

线程的对应移除操做。好比,我先添加一个元素,接下来若是继续想尝试添加则会阻塞,直到另外一个线程取走一个元素,反之亦然。(想到什么?就是缓冲区为1的生产者消费者模式^_^)

注意到介绍中的自动回收线程的特性吗,为何呢?先不说,但注意到该实现中corePoolSize和maximumPoolSize的大小不一样。

Java代码   收藏代码

public static ExecutorService newCachedThreadPool() {          

  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  60L, TimeUnit.SECONDS,                                        new SynchronousQueue<Runnable>());      

}  

===============================神奇分割线==================================

到此若是有不少疑问,那是必然了(除非你也很了解了)

先从BlockingQueue<Runnable> workQueue这个入参开始提及。在JDK中,其实已经说得很清楚了,一共有三种类型的queue。如下为引用:(我会稍微修改一下,并用红色突出显示)

全部  BlockingQueue 均可用于传输和保持提交的任务。可使用此队列与池大小进行交互:

  • 若是运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。(什么意思?若是当前运行的线程小于corePoolSize,则任务根本不会存放,添加到queue中,而是直接抄家伙(thread)开始运行
  • 若是运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列而不添加新的线程
  • 若是没法将请求加入队列,则建立新的线程,除非建立此线程超出 maximumPoolSize,在这种状况下,任务将被拒绝。

先不着急举例子,由于首先须要知道queue上的三种类型。

排队有三种通用策略:

  1. 直接提交。工做队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,若是不存在可用于当即运行任务的线程,则试图把任务加入队列将失败,所以会构造一个新的线程。此策略能够避免在处理可能具备内部依赖性的请求集时出现锁。直接提交一般要求无界 maximumPoolSizes 以免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略容许无界线程具备增加的可能性。
  2. 无界队列。使用无界队列(例如,不具备预约义容量的 LinkedBlockingQueue)将致使在全部 corePoolSize 线程都忙时新任务在队列中等待。这样,建立的线程就不会超过 corePoolSize。(所以,maximumPoolSize 的值也就无效了。)当每一个任务彻底独立于其余任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略容许无界线程具备增加的可能性。
  3. 有界队列。当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,可是可能较难调整和控制。队列大小和最大池大小可能须要相互折衷:使用大型队列和小型池能够最大限度地下降 CPU 使用率、操做系统资源和上下文切换开销,可是可能致使人工下降吞吐量。若是任务频繁阻塞(例如,若是它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列一般要求较大的池大小,CPU 使用率较高,可是可能遇到不可接受的调度开销,这样也会下降吞吐量。  

===============================神奇分割线==================================

到这里,该了解的理论已经够多了,能够调节的就是corePoolSize和maximumPoolSizes 这对参数还有就是BlockingQueue的选择。

例子一:使用直接提交策略,也即SynchronousQueue。

首先SynchronousQueue是无界的,也就是说他存数任务的能力是没有限制的,可是因为该Queue自己的特性在某次添加元素后必须等待其余线程取走后才能继续添加。在这里不是核心线程即是新建立的线程,可是咱们试想同样下,下面的场景。

咱们使用一下参数构造ThreadPoolExecutor:

Java代码   收藏代码

  1. new ThreadPoolExecutor(  
  2.             2330, TimeUnit.SECONDS,   
  3.             new <span style="white-space: normal;">SynchronousQueue</span><Runnable>(),   
  4.             new RecorderThreadFactory("CookieRecorderPool"),   
  5.             new ThreadPoolExecutor.CallerRunsPolicy());  

 当核心线程已经有2个正在运行.

  1. 此时继续来了一个任务(A),根据前面介绍的“若是运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列而不添加新的线程。”,因此A被添加到queue中。
  2. 又来了一个任务(B),且核心2个线程尚未忙完,OK,接下来首先尝试1中描述,可是因为使用的SynchronousQueue,因此必定没法加入进去。
  3. 此时便知足了上面提到的“若是没法将请求加入队列,则建立新的线程,除非建立此线程超出maximumPoolSize,在这种状况下,任务将被拒绝。”,因此必然会新建一个线程来运行这个任务。
  4. 暂时还能够,可是若是这三个任务都还没完成,连续来了两个任务,第一个添加入queue中,后一个呢?queue中没法插入,而线程数达到了maximumPoolSize,因此只好执行异常策略了。

因此在使用SynchronousQueue一般要求maximumPoolSize是无界的,这样就能够避免上述状况发生(若是但愿限制就直接使用有界队列)。对于使用SynchronousQueue的做用jdk中写的很清楚: 此策略能够避免在处理可能具备内部依赖性的请求集时出现锁

什么意思?若是你的任务A1,A2有内部关联,A1须要先运行,那么先提交A1,再提交A2,当使用SynchronousQueue咱们能够保证,A1一定先被执行,在A1么有被执行前,A2不可能添加入queue中

例子二:使用无界队列策略,即LinkedBlockingQueue

这个就拿 newFixedThreadPool来讲,根据前文提到的规则:

 写道

若是运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。

 那么当任务继续增长,会发生什么呢?

 写道

若是运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。

 OK,此时任务变加入队列之中了,那何时才会添加新线程呢?

 写道

若是没法将请求加入队列,则建立新的线程,除非建立此线程超出 maximumPoolSize,在这种状况下,任务将被拒绝。

这里就颇有意思了,可能会出现没法加入队列吗?不像SynchronousQueue那样有其自身的特色,对于无界队列来讲,老是能够加入的(资源耗尽,固然另当别论)。换句说,永远也不会触发产生新的线程!corePoolSize大小的线程数会一直运行,忙完当前的,就从队列中拿任务开始运行。因此要防止任务疯长,好比任务运行的实行比较长,而添加任务的速度远远超过处理任务的时间,并且还不断增长,若是任务内存大一些,不一下子就爆了,呵呵。

能够仔细想一想哈。

例子三:有界队列,使用ArrayBlockingQueue。

这个是最为复杂的使用,因此JDK不推荐使用也有些道理。与上面的相比,最大的特色即是能够防止资源耗尽的状况发生。

举例来讲,请看以下构造方法:

Java代码   收藏代码

  1. new ThreadPoolExecutor(  
  2.             2430, TimeUnit.SECONDS,   
  3.             new ArrayBlockingQueue<Runnable>(2),   
  4.             new RecorderThreadFactory("CookieRecorderPool"),   
  5.             new ThreadPoolExecutor.CallerRunsPolicy());  

假设,全部的任务都永远没法执行完。

对于首先来的A,B来讲直接运行,接下来,若是来了C,D,他们会被放到queu中,若是接下来再来E,F,则增长线程运行E,F。可是若是再来任务,队列没法再接受了,线程数也到达最大的限制了,因此就会使用拒绝策略来处理。

总结:

  1. ThreadPoolExecutor的使用仍是颇有技巧的。
  2. 使用无界queue可能会耗尽系统资源。
  3. 使用有界queue可能不能很好的知足性能,须要调节线程数和queue大小
  4. 线程数天然也有开销,因此须要根据不一样应用进行调节。

一般来讲对于静态任务能够归为:

  1. 数量大,可是执行时间很短
  2. 数量小,可是执行时间较长
  3. 数量又大执行时间又长
  4. 除了以上特色外,任务间还有些内在关系

看完这篇问文章后,但愿可以能够选择合适的类型了

相关文章
相关标签/搜索