java 经常使用线程池讲解

1.线程池的好处

  1. 重用线程池中的线程,避免由于线程的建立和销毁所带来的性能开销。
  2. 能有效控制线程池的最大并发数,避免大量的线程之间由于互相抢夺系统资源而致使的阻塞现象。
  3. 能对线程进行简单管理,并提供定时执行以及指定间隔循环执行等功能。

相关连接

2. 原理分析

系统组织结构图

2.1 Executor

只有一个excute方法java

/** * Executes the given command at some time in the future. The command * may execute in a new thread, in a pooled thread, or in the calling * thread, at the discretion of the {@code Executor} implementation. * * @param command the runnable task * @throws RejectedExecutionException if this task cannot be * accepted for execution * @throws NullPointerException if command is null */
    void execute(Runnable command);
复制代码

2.2 public interface ExecutorService extends Executor

ExecutorService是Executor的子接口,增长了一些经常使用的对线程的控制方法,以后使用线程池主要也是使用这些方法。缓存

2.3 AbstractExecutorService

AbstractExecutorService是一个抽象类。安全

2.4 ThreadPoolExecutor

是AbstractExcutor的具体实现类。并发

先来看看它的构造器

*
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue} is null
     */
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
复制代码

这是第一种方式,能够传入blockingQueue.

接下来是第二种方式

/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters and default rejected execution handler. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} is null */
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }


复制代码

第三种方式

/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters and default thread factory. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code handler} is null */
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
复制代码

第四种方式

/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

复制代码

构造方法参数说明

corePoolSize

核心线程数,默认状况下核心线程会一直存活,即便处于闲置状态也不会受存keepAliveTime限制。除非将allowCoreThreadTimeOut设置为trueless

maximumPoolSize

线程池所能容纳的最大线程数。超过这个数的线程将被阻塞。当任务队列为没有设置大小的LinkedBlockingDeque时,这个值无效。post

keepAliveTime

非核心线程的闲置超时时间,超过这个时间就会被回收。性能

unit

指定keepAliveTime的单位,如TimeUnit.SECONDS。当将allowCoreThreadTimeOut设置为true时对corePoolSize生效。this

workQueue

线程池中的任务队列. 经常使用的有三种队列,SynchronousQueue,LinkedBlockingDeque,ArrayBlockingQueue。spa

threadFactory

线程工厂,提供建立新线程的功能。ThreadFactory是一个接口,只有一个方法.net

public interface ThreadFactory {
  Thread newThread(Runnable r);
}
复制代码

RejectedExecutionHandler

RejectedExecutionHandler也是一个接口,只有一个方法

public interface RejectedExecutionHandler {
  void rejectedExecution(Runnable var1, ThreadPoolExecutor var2);
}
复制代码

线程池规则

线程池使用的队列

BlockingQueue

先进先出队列

synchronousQueue

线程安全的队列,里面是没有固定的缓存的。 ==SynchronousQueue没有数量限制==。由于他根本不保持这些任务,而是直接交给线程池去执行。当任务数量超过最大线程数时会直接抛异常
也是okhttp使用的

PriorityBlokingQueue

无序的,能够根据优先级进行排序。 执行的对象须要实现compareable

okhttp使用的是SynchronousQueue。

下面都假设任务队列没有大小限制:

  1. 若是线程数量<=核心线程数量,那么直接启动一个核心线程来执行任务,不会放入队列中。
  2. 若是线程数量>核心线程数,但<=最大线程数,而且任务队列是LinkedBlockingDeque的时候,超过核心线程数量的任务会放在任务队列中排队。
  3. 若是线程数量>核心线程数,但<=最大线程数,而且任务队列是SynchronousQueue的时候,线程池会建立新线程执行任务,这些任务也不会被放在任务队列中。这些线程属于非核心线程,在任务完成后,闲置时间达到了超时时间就会被清除。
  4. 若是线程数量>核心线程数,而且>最大线程数,当任务队列是LinkedBlockingDeque,会将超过核心线程的任务放在任务队列中排队。也就是当任务队列是LinkedBlockingDeque而且没有大小限制时,线程池的最大线程数设置是无效的,他的线程数最多不会超过核心线程数。
  5. 若是线程数量>核心线程数,而且>最大线程数,当任务队列是SynchronousQueue的时候,会由于线程池拒绝添加任务而抛出异常

任务队列大小有限时

  1. ==当LinkedBlockingDeque塞满时==,新增的任务会直接建立新线程来执行,当建立的线程数量超过最大线程数量时会抛异常。
  2. ==SynchronousQueue没有数量限制==。由于他根本不保持这些任务,而是直接交给线程池去执行。当任务数量超过最大线程数时会直接抛异常。

线程池的分类

1. FixedThreadPool

经过Executors的newFIxedThreadPool方法来建立,他是一种线程数量固定的线程池,当线程处于空闲状态时,她们并不会回收,除非线程关闭了。当全部的线程都处于活动状态时,新任务都会处于等待状态,直到有线程空闲出来。因为FixedThreadPool只有核心线程而且这些核心线程都不会回收,这意味着它能更加快速地响应外界的请求。
FixedTHreadPool中只有核心线程,而且这些核心线程没有超时机制,另外任务队列是没有大小限制的

public static ExecutorService newFixedThreadPool(int nThreads){
     return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUnit.MILLISENDS,new LinkedBlockingQueue<Runnable>());
 }
复制代码

2. CachedTHreadPool

  • 这是一种线程数量不定的线程池,它只有非核心线程,而且其最大线程数为Integer.MAX_VALUE。因为Integer.MAX_VALUE是一个最大的数,实际上就至关于最大线程数能够任意大,当线程池中的线程都处于活动状态时,线程池会建立新的线程来处理任务。不然就会利用空闲的线程来处理新任务。
  • 线程池中的空闲线程都有超时机制,这个超时时长为60秒,超过60秒闲置线程就会被回收。和FixedTHreadPool不一样的是,CacheThreadPool的任务队列其实至关于一个空集合,这将致使任何任务都会当即被执行。
  • 这类线程池比较适合执行大量的耗时较少的任务。当整个线程池都处于闲置状态时,线程池中的线程都会超时而被中止这个时候CachethreadPool之中其实是没有任何线程的。它几乎不占任何系统资源。
public static ExecutorService newCachedThreadPool(){
    return new ThreadPoolExecutor(0,INterger.MAX_VALUE,60L,TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
复制代码

3. ScheduledThreadPool

  • 他的核心线程数量是固定的,而非核心线程数是没有限制的,而且当非核心线程闲置时会被当即回收,ScheduledThreadPool这类线程池主要用于执行定时任务和具备固定周期的重复任务。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize){
    return new ScheduledThreadPoolExecutor(corePoolSize);
    
}
public ScheduledThreadPoolExecutor(int corePoolSize){
    super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,new DelayedWorkQueue());
}
复制代码

4 SingleThreadExecutor

  • 经过Executor的newSingleThreadExecutor方法来建立。这类线程池内部只有一个核心线程,他确保全部的任务都在同一个线程中按顺序执行。SingleThreadExector的

参考文献

csdn

相关文章
相关标签/搜索