ThreadPoolExecutor线程池

一:类继承结构安全

                            继承关系服务器

二:构造函数并发

                                                   构造函数less

(1)线程池的大小除了显示的限制外,还可能因为其余资源上的约束而存在一些隐式限制。好比JDBC链接池。函数

(2)运行时间较长的任务。工具

    若是任务阻塞的时间过长,即便不出现死锁,线程池的响应性也会变得糟糕。执行时间较长的任务不只会形成线程池阻塞,甚至还会增长执行时间。若是线程池中线程的数量远小于在稳定状态下执行时间较长任务的数量,那么到最后可能全部的线程都会运行这些执行时间较长的任务,从而影响总体的响应性。性能

    有一项技术能够缓解执行时间较长任务形成的影响,即限定任务等待资源的时间,而不要无限制地等待。在平台类库的大多数可阻塞方法中,都同时定义了限时版本和无限时版本,例如:Thread.join,BlockingQueue.put,CountDownLatch.await以及Selector.select等。若是等待超时,能够把任务标识为失败,而后停止任何或者将任务从新放回队列以便随后执行。若是在线程池中老是充满了呗阻塞的任务,那么也可能表示线程池的规模太小。spa

 

(3)设置线程池的大小线程

(3.1)线程池的理想大小取决于被提交任务的类型以及所部署系统的特性。在代码中不会固定线程池的大小,而应该经过某种配置机制来提供,或者根据Runtime.availableProcessors来动态计算。调试

(3.2)要设置线程池的大小也并不困难,只须要避免“过大”或“太小”这两种极端状况。若是设置过大,那么大量的线程将在相对不多的CPU和内存资源上发生竞争,这不只会致使更高的内存使用量,并且还可能耗尽资源。若是设置太小,那么将致使不少空闲的处理器没法执行工做,从而下降吞吐率。

(3.3)要想正确设置线程池的大小,必须分析计算环境,资源预算和任务的特性。在部署的系统中有多少CPU?多大的内存?计算是计算密集型、I/O密集型仍是两者皆可?它们是否须要像JDBC链接这样的稀缺资源?若是须要执行不一样类别的任务,而且它们之间的行为相差很大,那么应该考虑使用多个线程池,从而使每一个线程池能够根据本身的工做负载来调整。

(3.4)对于计算密集型的任务,在拥有Ncpu个处理器的系统上,当线程池的大小为Ncpu+1时,一般能实现最优的利用率。(计算当计算密集型的线程偶尔因为页缺失故障或者其余缘由而暂停时,这个“额外”的线程也能确保CPU的时钟周期不会被浪费)

(3.5)对于包含I/O操做或者其余阻塞操做的任务,你必须估算出任务的等待时间与计算时间的比值。这种估算不须要很精确,而且能够经过一些分析或者监控工具来得到。你还能够经过另外一种方法来调节线程池的大小:在某个基准负载下,分别设置不一样大小的线程池来运行应用程序,并观察CPU利用率的水平。给定以下列定义:

                      Ncpu = number of CPUs

                     Ucpu = target CPU utilization,0 <= Ucpu <= 1

                     W/C = ratio of wait time to compute time

      要使处理器达到指望的使用率,线程池的最优大小等于:

                    Nthreads = Ncpu * Ucpu * (1 + W/C)

     能够经过Runtime来得到CPU的数目:

                   int N_CPUS = Runtime.getRuntime().availableProcessors();

 

(4)参数解析

  • corePoolSize

    线程池的基本大小(线程池的目标大小),默认状况下会一直存活,即便处于闲置状态也不会受keepAliveTime限制,除非将allowCoreThreadTimeOut设置为true。

  • maximumPoolSize

    最大线程池大小,表示可同时活动的线程数量的上限。

  • keepAliveTime

    若是某个线程的空闲时间超过了存活时间,那么将被标记为可回收的,而且当线程池的当前大小超过了基本大小时,这个线程将被终止。

分析:线程池的基本大小(corePoolSize)、最大大小(maximumPoolSize)以及存活时间等因素共同负责线程的建立与销毁。经过调节线程池的基本大小和存活时间,能够帮助线程池回收空闲线程占用的资源,从而使得这些资源能够用于执行其余工做。

 

三:基本实现

(1)newCachedThreadPool()

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

 

    线程池的基本大小设置为零,最大大小设置为Integer.MAX_VALUE,线程池能够被无限扩展,需求下降时自动收缩,最大大小设置过大在某些状况下也是缺点。

(2)newFixedThreadPool(int nThreads)

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

 

    缺点是LinkedBlockingQueue是无界队列,有些状况下排队的任务会不少。

(3)newScheduledThreadExecutor()

   

/**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

/**
     * Creates a new {@code ScheduledThreadPoolExecutor} with the
     * given core pool size.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

 

(4)newSingleThreadExecutor()

    

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

 

总结:都由Executors类的静态方法统一提供,如Executors.newCachedThreadPool(),底层经过ThreadPoolExecutor来实现。ThreadPoolExecutor提供了不少构造函数,它是一个灵活的、稳定的线程池,容许进行各类定制。

 

四:管理队列任务

(1)单线程的Executor是一种值得注意的特例:它们能确保不会有任务并发执行,由于它们经过线程封闭来实现线程安全性。

(2)若是无限制地建立线程,将会致使不稳定性。能够经过采用固定大小的线程池(而不是每收到一个请求就建立一个新线程)来解决这个问题。然而,这个方案并不完整。在高负载的状况下,应用程序仍可能耗尽资源,知识问题的几率较小。若是新请求的到达超过了线程池的处理效率,那么新到来的请求将累计起来。在线程池中,这些请求会在一个由Executor管理的Runable队列中等待,而不会像线程那样去竞争CPU资源,经过一个Runnable和一个链表节点来表现一个等待中的任务,固然比使用线程来表示的开销低得多,但若是客户提交给服务器请求的效率超过了服务器的处理效率,那么仍可能会耗尽资源。

(3)ThreadPoolExecutor容许提供一个BlockingQueue来保存等待执行的任务。基本的任务排队方法有3种:无界队列、有界队列和同步移交(Synchronous Handoff)。队列的选择与其余的配置参数有关,例如线程池的大小。

(4)newFixedThreadPool和newSingleThreadExecutor在默认的状况下将使用一个无界的LinkedBlockingQueue。若是全部工做者线程都处于忙碌状态,那么任务将在队列中等候。若是任务持续快速地到达,而且超过了线程池处理它们的速度,那么队列将无限制地增长。

(5)一种更稳妥的资源管理策略是使用有界队列,例如ArrayBlockingQueue、有界的LinkedBlockQueue、PriorityBlockingQueue。有界队列有助于避免资源耗尽的状况发生,但它又带来了新的问题:当队列填满后,新的任务该怎么办?(有许多饱和策略能够解决这个问题)在使用有界

的工做队列时,队列的大小和线程池的大小必须一块儿调节。若是线程池较小而队列较大,那么有助于减小内存使用量,下降CPU的使用率,同时还能够减小上下文切换,但付出的代价是可能会限制吞吐量。

(6)对于很是大的或者无界的线程池,能够经过使用SynchronousQueue来避免任务排队,以及直接将任务从生产者移交给工做者线程。SynchronousQueue不是一个真正的队列,而是一种在线程之间进行移交的机制。要将一个元素放入SynchronousQueue中,必须有另外一个线程正在等待接受这个元素。

(7)当使用像LinkedBlockingQueue或ArrayBlockingQueue这样的FIFO(先进先出)队列时,任务的执行顺序与它们的到达顺序相同。若是想进一步控制任务执行顺序,还可使用PriorityBlockingQueue,这个队列将根据优先级来安排任务。

(8)只有当任务相互独立时,为线程池或工做队列设置界限才是合理的。若是任务之间存在依赖性,那么有界的线程池或队列就可能致使线程“饥饿”死锁问题。此时应该使用无界的线程池,例如newCacheThreadPool。

 

五:饱和策略

(1)当有界队列被填满后,饱和策略开始发挥做用。ThreadPoolExecutor的饱和策略能够经过调用setRejectedExecutionHander来修改。(若是某个任务被提交到一个已被关闭的Executor时,也会用到饱和策略。)JDK提供了几种不一样的RejectedExecutionHandler实现,每种实现都包含不一样的饱和策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy和DiscardOldestPolicy。

(2)停止(Abort)策略是默认的饱和策略,该策略将抛出未检查的RejectedExecutionException。调用者能够捕获这个异常,而后根据须要编写本身的处理代码。

(3)当新提交的任务没法保存到队列中等待执行时,“抛弃(Discard)”策略会悄悄抛弃该任务。

(4)“抛弃最旧的(Discard-Oldest)”策略则会抛弃下一个将被执行的任务,而后尝试从新提交新的任务。(若是工做队列是一个优先队列,那么“抛弃最旧的”策略将致使抛弃优先级最高的任务,所以最好不要将“抛弃最旧的”饱和策略和优先队列放在一块儿使用。)

(5)“调用者运行(Caller-Runs)”策略实现了一种调度机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而下降新任务的流量。它不会在线程池的某个线程中执行新提交的任务,而是在一个调用了execute的线程中执行该任务。若是采用有界队列和“调用者运行”饱和策略,当线程池中的全部线程都被占用,而且工做队列被填满后,下一个任务会在调用execute时在主线程中执行。因为执行任务须要必定的时间,所以主线程至少在一段时间内不能提交任何任务,从而使得工做者线程有时间来处理完正在执行的任务。在此期间,主线程不会调用accept,所以到达的请求将被保存在TCP层的队列中而不是在应用程序的队列中。若是持续过载,那么TCP层将最终发现它的请求队列被填满,所以一样会开始抛弃请求。当服务器过载时,这种过载状况会逐渐向外蔓延开来——从线程池到工做队列到应用程序再到TCP层,最终达到客户端,致使服务器在高负载下实现一种平缓的性能下降。

    可经过以下方式设置饱和策略:

          executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

 

六:线程工厂

(1)每当线程池须要建立一个线程时,都是经过线程工厂方法来完成的。默认的线程工厂方法将建立一个新的、非守护的线程,而且不包含特殊的配置信息。经过指定一个线程工厂方法,能够定制线程池的配置信息。在ThreadFactory中只定义了一个方法newThread,每当线程池须要建立一个新线程时都会调用这个方法。

(2)许多状况下都须要使用定制的线程工厂方法。例如,你但愿为线程池中的线程指定一个UncaughtExceptionHandler,或者实例化一个定制的Thread类用于执行调试信息的记录。你还可能但愿修改线程的优先级(这一般并非一个好主意)或者守护状态(一样,这也不是一个好主意)。或许你只是但愿给线程取一个更有意义的名称,用来解释线程的转储信息和错误日志。

public interface ThreadFactory {

    /**
     * Constructs a new {@code Thread}.  Implementations may also initialize
     * priority, name, daemon status, {@code ThreadGroup}, etc.
     *
     * @param r a runnable to be executed by new thread instance
     * @return constructed thread, or {@code null} if the request to
     *         create a thread is rejected
     */
    Thread newThread(Runnable r);
}

 

经过实现该接口,能够定制本身的线程池工厂方法。

 

七:调用构造函数后再定制ThreadPoolExecutor

(1)在调用完ThreadPoolExecutor的构造函数后,仍然能够经过设置函数(setter)来修改大多数传递给它的构造函数的参数(例如线程池的基本大小、最大大小、存活时间、线程工厂以及拒绝执行处理器)。若是Executor是经过Executors中的某个(newSingleThreadExecutor除外)工厂方法建立的,那么能够将结果的类型转换为ThreadPoolExecutor以访问设置器,以下:

    ExecutorService exec = Executors.newCachedThreadPool();

       if ( exec instanceof ThreadPoolExecutor)

              ((ThreadPoolExecutor) exec).setCorePoolSize(10);

      else

              throw new AssertionError("Oops, bad assumption");

 

八:扩展ThreadPoolExecutor

(1)ThreadPoolExecutor是可扩展的,它提供了几个能够在子类化中改写的方法:beforeExecute、afterExecute和 terminated,这些方法能够用于扩展ThreadPoolExecutor的行为。

(2)在执行任务的线程中将调用beforeExecute 和 afterExecute等方法,在这些方法中还能够添加日志、计时、监视或统计信息收集的功能。不管任务是从run中正常返回仍是抛出一个异常而返回,afterExecute都会被调用。(若是任何在调用完成后带有一个Error,那么就不会调用afterExecute。)若是beforeExecute抛出一个RuntimeException,那么任务将不被执行,而且afterExecute也不会被调用。

(3)在线程池完成关闭操做时调用terminated,也就是在全部任务都已经完成而且全部工做者线程也已经关闭后。terminated能够用来释放Executor在其生命周期里分配的各类资源,此外还能够执行发送通知、记录日志或者收集finalize统计信息等操做。

相关文章
相关标签/搜索