Java Executor并发框架(三)ThreadPoolExecutor 队列缓存策略

前面两篇讲解了线程池中线程建立后的运行状况,其中有一系列的策略来保证线程正常运行。可是咱们知道线程池是能够设置容量的,并且这容量的设置也是相当重要的,若是容量设置的过小,那么将会影响系统的运行效率,若是设置的过大,也可能形成无止尽的线程堆积,最终形成系统内存溢出。对于此,线程池也提供了一些设置来防止这些现象。下面咱们将会介绍。java

线程初始化

当咱们建立线程池后,若是没有新任务进来的话,默认是没有线程的,提交任务后线程池才会建立新的线程。若是你想建立线程池时就初始化corePoolSize数量的线程的话,线程池提供了如下两个方法:数据库

  • prestartCoreThread() : 当即初始化一个线程
  • prestartAllCoreThreads():当即初始化corePoolSize数量的线程

如下是具体方法实现:数组

public int prestartAllCoreThreads() {
        int n = 0;
        while (addIfUnderCorePoolSize(null))
            ++n;
        return n;
    }

public boolean prestartCoreThread() {
        return addIfUnderCorePoolSize(null);
    }

底层都是调用 addIfUnderCorePoolSize() 方法,上一篇有讲过,若是传入的参数为null的话,则最后执行线程会阻塞在getTask方法中的,由于要等待堵塞队列中有任务到达。缓存

任务堵塞队列

当线程池池建立的线程数量大于 corePoolSize 后,新来的任务将会加入到堵塞队列(workQueue)中等待有空闲线程来执行。workQueue的类型为BlockingQueue ,一般能够取下面三种类型: 服务器

  1. ArrayBlockingQueue:基于数组的FIFO队列,是有界的,建立时必须指定大小
  2. LinkedBlockingQueue: 基于链表的FIFO队列,是无界的,默认大小是 Integer.MAX_VALUE
  3. synchronousQueue:一个比较特殊的队列,虽然它是无界的,但它不会保存任务,每个新增任务的线程必须等待另外一个线程取出任务,也能够把它当作容量为0的队列

全部 BlockingQueue 均可用于传输和保持提交的任务。可使用此队列与池大小进行交互:多线程

若是运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。(若是当前运行的线程小于corePoolSize,则任务根本不会存放,添加到queue中,而是直接抄家伙(thread)开始运行)操作系统

若是运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程线程

若是没法将请求加入队列,则建立新的线程,除非建立此线程超出 maximumPoolSize,在这种状况下,任务将被拒绝。rest

排队有三种通用策略:code

直接提交。工做队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在 此,若是不存在可用于当即运行任务的线程,则试图把任务加入队列将失败,所以会构造一个新的线程。此策略能够避免在处理可能具备内部依赖性的请求集时出现 锁。直接提交一般要求无界 maximumPoolSizes 以免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略容许无界线 程具备增加的可能性。

无界队列。使用无界队列(例如,不具备预约义容量的 LinkedBlockingQueue)将致使在所 有 corePoolSize 线程都忙时新任务在队列中等待。这样,建立的线程就不会超过 corePoolSize。(因 此,maximumPoolSize 的值也就无效了。)当每一个任务彻底独立于其余任务,即任务执行互不影响时,适合于使用无界队列;例如, 在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略容许无界线程具备增加的可能性。

有界队列。当使用有限的 maximumPoolSizes 时,有界队列 (如 ArrayBlockingQueue)有助于防止资源耗尽,可是可能较难调整和控制。队列大小和最大池大小可能须要相互折衷:使用大型队列和小型 池能够最大限度地下降 CPU 使用率、操做系统资源和上下文切换开销,可是可能致使人工下降吞吐量。若是任务频繁阻塞(例如,若是它们是 I/O 边 界),则系统可能为超过您许可的更多线程安排时间。使用小型队列一般要求较大的池大小,CPU 使用率较高,可是可能遇到不可接受的调度开销,这样也会降 低吞吐量。

BlockingQueue的选择。

例子一:使用直接提交策略,也即SynchronousQueue。

首先SynchronousQueue是无界的,也就是说他存数任务的能力是没有限制的,可是因为该Queue自己的特性,在某次添加元素后必须等待其余线程取走后才能继续添加。在这里不是核心线程即是新建立的线程,可是咱们试想同样下,下面的场景。

咱们使用一下参数构造ThreadPoolExecutor:

new ThreadPoolExecutor(
   2, 3, 30, TimeUnit.SECONDS,
   new SynchronousQueue<Runnable>(),
   new RecorderThreadFactory("CookieRecorderPool"),
   new ThreadPoolExecutor.CallerRunsPolicy());

当核心线程已经有2个正在运行.

  1. 此时继续来了一个任务(A),根据前面介绍的“若是运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。”,因此A被添加到queue中。
  2. 又来了一个任务(B),且核心2个线程尚未忙完,OK,接下来首先尝试1中描述,可是因为使用的SynchronousQueue,因此必定没法加入进去。
  3. 此时便知足了上面提到的“若是没法将请求加入队列,则建立新的线程,除非建立此线程超出maximumPoolSize,在这种状况下,任务将被拒绝。”,因此必然会新建一个线程来运行这个任务。
  4. 暂时还能够,可是若是这三个任务都还没完成,连续来了两个任务,第一个添加入queue中,后一个呢?queue中没法插入,而线程数达到了maximumPoolSize,因此只好执行异常策略了。

因此在使用SynchronousQueue一般要求maximumPoolSize是无界的,这样就能够避免上述状况发生(若是但愿限制就直接使 用有界队列)。对于使用SynchronousQueue的做用jdk中写的很清楚:此策略能够避免在处理可能具备内部依赖性的请求集时出现锁。

什么意思?若是你的任务A1,A2有内部关联,A1须要先运行,那么先提交A1,再提交A2,当使用SynchronousQueue咱们能够保证,A1一定先被执行,在A1么有被执行前,A2不可能添加入queue中。

例子二:使用无界队列策略,即LinkedBlockingQueue

这个就拿newFixedThreadPool来讲,根据前文提到的规则:

若是运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。那么当任务继续增长,会发生什么呢?

若是运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程。OK,此时任务变加入队列之中了,那何时才会添加新线程呢?

若是没法将请求加入队列,则建立新的线程,除非建立此线程超出 maximumPoolSize,在这种状况下,任务将被拒绝。这里就颇有意思了, 可能会出现没法加入队列吗?不像SynchronousQueue那样有其自身的特色,对于无界队列来讲,老是能够加入的(资源耗尽,固然另当别论)。换 句说,永远也不会触发产生新的线程!corePoolSize大小的线程数会一直运行,忙完当前的,就从队列中拿任务开始运行。因此要防止任务疯长,好比 任务运行的实行比较长,而添加任务的速度远远超过处理任务的时间,并且还不断增长,不一下子就爆了。

例子三:有界队列,使用ArrayBlockingQueue。

这个是最为复杂的使用,因此JDK不推荐使用也有些道理。与上面的相比,最大的特色即是能够防止资源耗尽的状况发生。

举例来讲,请看以下构造方法:

new ThreadPoolExecutor(
     2, 4, 30, TimeUnit.SECONDS,
     new ArrayBlockingQueue<Runnable>(2),
     new RecorderThreadFactory("CookieRecorderPool"),
     new ThreadPoolExecutor.CallerRunsPolicy());

假设,全部的任务都永远没法执行完。

对于首先来的A,B来讲直接运行,接下来,若是来了C,D,他们会被放到queue中,若是接下来再来E,F,则增长线程运行E,F。可是若是再来任务,队列没法再接受了,线程数也到达最大的限制了,因此就会使用拒绝策略来处理。

keepAliveTime

jdk中的解释是:当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。

有点拗口,其实这个不难理解,在使用了“池”的应用中,大多都有相似的参数须要配置。好比数据库链接池,DBCP中的maxIdle,minIdle参数。

什么意思?接着上面的解释,后来向老板派来的工人始终是“借来的”,俗话说“有借就有还”,但这里的问题就是何时还了,若是借来的工人刚完成一个任务就还回去,后来发现任务还有,那岂不是又要去借?这一来一往,老板确定头也大死了。

合理的策略:既然借了,那就多借一下子。直到“某一段”时间后,发现再也用不到这些工人时,即可以还回去了。这里的某一段时间即是keepAliveTime的含义,TimeUnit为keepAliveTime值的度量。

任务拒绝策略

  线程池堵塞队列容量满以后,将会直接新建线程,数量等于 maximumPoolSize 后,将会执行任务拒绝策略不在接受任务,有如下四种拒绝策略:

  1. ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
  2. ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,可是不抛出异常。
  3. ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,而后从新尝试执行任务(重复此过程)
  4. ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

线程池的关闭

  ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:

  • shutdown():不会当即终止线程池,而是要等全部任务缓存队列中的任务都执行完后才终止,但不再会接受新的任务
  • shutdownNow():当即终止线程池,并尝试打断正在执行的任务,而且清空任务缓存队列,返回还没有执行的任务

线程池容量的动态调整

ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

  • setCorePoolSize:设置核心池大小
  • setMaximumPoolSize:设置线程池最大能建立的线程数目大小

  当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能当即建立新的线程来执行任务。

相关文章
相关标签/搜索