深刻理解 Java 线程池

1、简介

什么是线程池

线程池是一种多线程处理形式,处理过程当中将任务添加到队列,而后在建立线程后自动启动这些任务。html

为何要用线程池

若是并发请求数量不少,但每一个线程执行的时间很短,就会出现频繁的建立和销毁线程。如此一来,会大大下降系统的效率,可能频繁建立和销毁线程的时间、资源开销要大于实际工做的所需。java

正是因为这个问题,因此有必要引入线程池。使用 线程池的好处 有如下几点:编程

  • 下降资源消耗 - 经过重复利用已建立的线程下降线程建立和销毁形成的消耗。
  • 提升响应速度 - 当任务到达时,任务能够不须要等到线程建立就能当即执行。
  • 提升线程的可管理性 - 线程是稀缺资源,若是无限制的建立,不只会消耗系统资源,还会下降系统的稳定性,使用线程池能够进行统一的分配,调优和监控。可是要作到合理的利用线程池,必须对其原理了如指掌。

2、Executor 框架

Executor 框架是一个根据一组执行策略调用,调度,执行和控制的异步任务的框架,目的是提供一种将”任务提交”与”任务如何运行”分离开来的机制。数组

核心 API 概述

Executor 框架核心 API 以下:缓存

  • Executor - 运行任务的简单接口。
  • ExecutorService - 扩展了 Executor 接口。扩展能力:
    • 支持有返回值的线程;
    • 支持管理线程的生命周期。
  • ScheduledExecutorService - 扩展了 ExecutorService 接口。扩展能力:支持按期执行任务。
  • AbstractExecutorService - ExecutorService 接口的默认实现。
  • ThreadPoolExecutor - Executor 框架最核心的类,它继承了 AbstractExecutorService 类。
  • ScheduledThreadPoolExecutor - ScheduledExecutorService 接口的实现,一个可定时调度任务的线程池。
  • Executors - 能够经过调用 Executors 的静态工厂方法来建立线程池并返回一个 ExecutorService 对象。

Executor

Executor 接口中只定义了一个 execute 方法,用于接收一个 Runnable 对象。markdown

public interface Executor {
    void execute(Runnable command);
}
复制代码

ExecutorService

ExecutorService 接口继承了 Executor 接口,它还提供了 invokeAllinvokeAnyshutdownsubmit 等方法。多线程

public interface ExecutorService extends Executor {

    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
复制代码

从其支持的方法定义,不难看出:相比于 Executor 接口,ExecutorService 接口主要的扩展是:并发

  • 支持有返回值的线程 - sumbitinvokeAllinvokeAny 方法中都支持传入Callable 对象。
  • 支持管理线程生命周期 - shutdownshutdownNowisShutdown 等方法。

ScheduledExecutorService

ScheduledExecutorService 接口扩展了 ExecutorService 接口。框架

它除了支持前面两个接口的全部能力之外,还支持定时调度线程。异步

public interface ScheduledExecutorService extends ExecutorService {

    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}
复制代码

其扩展的接口提供如下能力:

  • schedule 方法能够在指定的延时后执行一个 Runnable 或者 Callable 任务。
  • scheduleAtFixedRate 方法和 scheduleWithFixedDelay 方法能够按照指定时间间隔,按期执行任务。

3、ThreadPoolExecutor

java.uitl.concurrent.ThreadPoolExecutor 类是 Executor 框架中最核心的类。因此,本文将着重讲述一下这个类。

重要字段

ThreadPoolExecutor 有如下重要字段:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
复制代码

参数说明:

  • ctl - 用于控制线程池的运行状态和线程池中的有效线程数量。它包含两部分的信息:
    • 线程池的运行状态 (runState)
    • 线程池内有效线程的数量 (workerCount)
    • 能够看到,ctl 使用了 Integer 类型来保存,高 3 位保存 runState,低 29 位保存 workerCountCOUNT_BITS 就是 29,CAPACITY 就是 1 左移 29 位减 1(29 个 1),这个常量表示 workerCount 的上限值,大约是 5 亿。
  • 运行状态 - 线程池一共有五种运行状态:
    • RUNNING - 运行状态。接受新任务,而且也能处理阻塞队列中的任务。
    • SHUTDOWN - 关闭状态。不接受新任务,但能够处理阻塞队列中的任务。
      • 在线程池处于 RUNNING 状态时,调用 shutdown 方法会使线程池进入到该状态。
      • finalize 方法在执行过程当中也会调用 shutdown 方法进入该状态。
    • STOP - 中止状态。不接受新任务,也不处理队列中的任务。会中断正在处理任务的线程。在线程池处于 RUNNINGSHUTDOWN 状态时,调用 shutdownNow 方法会使线程池进入到该状态。
    • TIDYING - 整理状态。若是全部的任务都已终止了,workerCount (有效线程数) 为 0,线程池进入该状态后会调用 terminated 方法进入 TERMINATED 状态。
    • TERMINATED - 已终止状态。在 terminated 方法执行完后进入该状态。默认 terminated 方法中什么也没有作。进入 TERMINATED 的条件以下:
      • 线程池不是 RUNNING 状态;
      • 线程池状态不是 TIDYING 状态或 TERMINATED 状态;
      • 若是线程池状态是 SHUTDOWN 而且 workerQueue 为空;
      • workerCount 为 0;
      • 设置 TIDYING 状态成功。

构造方法

ThreadPoolExecutor 有四个构造方法,前三个都是基于第四个实现。第四个构造方法定义以下:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
复制代码

参数说明:

  • corePoolSize - 核心线程数量。当有新任务经过 execute 方法提交时 ,线程池会执行如下判断:
    • 若是运行的线程数少于 corePoolSize,则建立新线程来处理任务,即便线程池中的其余线程是空闲的。
    • 若是线程池中的线程数量大于等于 corePoolSize 且小于 maximumPoolSize,则只有当 workQueue 满时才建立新的线程去处理任务;
    • 若是设置的 corePoolSizemaximumPoolSize 相同,则建立的线程池的大小是固定的。这时若是有新任务提交,若 workQueue 未满,则将请求放入 workQueue 中,等待有空闲的线程去从 workQueue 中取任务并处理;
    • 若是运行的线程数量大于等于 maximumPoolSize,这时若是 workQueue 已经满了,则使用 handler 所指定的策略来处理任务;
    • 因此,任务提交时,判断的顺序为 corePoolSize => workQueue => maximumPoolSize
  • maximumPoolSize - 最大线程数量
    • 若是队列满了,而且已建立的线程数小于最大线程数,则线程池会再建立新的线程执行任务。
    • 值得注意的是:若是使用了无界的任务队列这个参数就没什么效果。
  • keepAliveTime线程保持活动的时间
    • 当线程池中的线程数量大于 corePoolSize 的时候,若是这时没有新的任务提交,核心线程外的线程不会当即销毁,而是会等待,直到等待的时间超过了 keepAliveTime
    • 因此,若是任务不少,而且每一个任务执行的时间比较短,能够调大这个时间,提升线程的利用率。
  • unit - keepAliveTime 的时间单位。有 7 种取值。可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
  • workQueue - 等待执行的任务队列。用于保存等待执行的任务的阻塞队列。 能够选择如下几个阻塞队列。
    • ArrayBlockingQueue - 有界阻塞队列
      • 此队列是基于数组的先进先出队列(FIFO)
      • 此队列建立时必须指定大小。
    • LinkedBlockingQueue - 无界阻塞队列
      • 此队列是基于链表的先进先出队列(FIFO)
      • 若是建立时没有指定此队列大小,则默认为 Integer.MAX_VALUE
      • 吞吐量一般要高于 ArrayBlockingQueue
      • 使用 LinkedBlockingQueue 意味着: maximumPoolSize 将不起做用,线程池能建立的最大线程数为 corePoolSize,由于任务等待队列是无界队列。
      • Executors.newFixedThreadPool 使用了这个队列。
    • SynchronousQueue - 不会保存提交的任务,而是将直接新建一个线程来执行新来的任务
      • 每一个插入操做必须等到另外一个线程调用移除操做,不然插入操做一直处于阻塞状态。
      • 吞吐量一般要高于 LinkedBlockingQueue
      • Executors.newCachedThreadPool 使用了这个队列。
    • PriorityBlockingQueue - 具备优先级的无界阻塞队列
  • threadFactory - 线程工厂。能够经过线程工厂给每一个建立出来的线程设置更有意义的名字。
  • handler - 饱和策略。它是 RejectedExecutionHandler 类型的变量。当队列和线程池都满了,说明线程池处于饱和状态,那么必须采起一种策略处理提交的新任务。线程池支持如下策略:
    • AbortPolicy - 丢弃任务并抛出异常。这也是默认策略。
    • DiscardPolicy - 丢弃任务,但不抛出异常。
    • DiscardOldestPolicy - 丢弃队列最前面的任务,而后从新尝试执行任务(重复此过程)。
    • CallerRunsPolicy - 只用调用者所在的线程来运行任务。
    • 若是以上策略都不能知足须要,也能够经过实现 RejectedExecutionHandler 接口来定制处理策略。如记录日志或持久化不能处理的任务。

execute 方法

默认状况下,建立线程池以后,线程池中是没有线程的,须要提交任务以后才会建立线程。

提交任务可使用 execute 方法,它是 ThreadPoolExecutor 的核心方法,经过这个方法能够向线程池提交一个任务,交由线程池去执行

execute 方法工做流程以下:

  1. 若是 workerCount < corePoolSize,则建立并启动一个线程来执行新提交的任务;
  2. 若是 workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
  3. 若是 workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则建立并启动一个线程来执行新提交的任务;
  4. 若是workerCount >= maximumPoolSize,而且线程池内的阻塞队列已满,则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

其余重要方法

ThreadPoolExecutor 类中还有一些重要的方法:

  • submit - 相似于 execute,可是针对的是有返回值的线程。submit 方法是在 ExecutorService 中声明的方法,在 AbstractExecutorService 就已经有了具体的实现。ThreadPoolExecutor 直接复用 AbstractExecutorServicesubmit 方法。
  • shutdown - 不会当即终止线程池,而是要等全部任务缓存队列中的任务都执行完后才终止,但不再会接受新的任务。
    • 将线程池切换到 SHUTDOWN 状态;
    • 并调用 interruptIdleWorkers 方法请求中断全部空闲的 worker;
    • 最后调用 tryTerminate 尝试结束线程池。
  • shutdownNow - 当即终止线程池,并尝试打断正在执行的任务,而且清空任务缓存队列,返回还没有执行的任务。与 shutdown 方法相似,不一样的地方在于:
    • 设置状态为 STOP
    • 中断全部工做线程,不管是不是空闲的;
    • 取出阻塞队列中没有被执行的任务并返回。
  • isShutdown - 调用了 shutdownshutdownNow 方法后,isShutdown 方法就会返回 true。
  • isTerminaed - 当全部的任务都已关闭后,才表示线程池关闭成功,这时调用 isTerminaed 方法会返回 true。
  • setCorePoolSize - 设置核心线程数大小。
  • setMaximumPoolSize - 设置最大线程数大小。
  • getTaskCount - 线程池已经执行的和未执行的任务总数;
  • getCompletedTaskCount - 线程池已完成的任务数量,该值小于等于 taskCount
  • getLargestPoolSize - 线程池曾经建立过的最大线程数量。经过这个数据能够知道线程池是否满过,也就是达到了 maximumPoolSize
  • getPoolSize - 线程池当前的线程数量;
  • getActiveCount - 当前线程池中正在执行任务的线程数量。

使用示例

public class ThreadPoolExecutorDemo {

    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 500, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());

        for (int i = 0; i < 100; i++) {
            threadPoolExecutor.execute(new MyThread());
            String info = String.format("线程池中线程数目:%s,队列中等待执行的任务数目:%s,已执行玩别的任务数目:%s",
                threadPoolExecutor.getPoolSize(),
                threadPoolExecutor.getQueue().size(),
                threadPoolExecutor.getCompletedTaskCount());
            System.out.println(info);
        }
        threadPoolExecutor.shutdown();
    }

    static class MyThread implements Runnable {

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " 执行");
        }

    }

}
复制代码

4、Executors

JDK 的 Executors 类中提供了几种具备表明性的线程池,这些线程池 都是基于 ThreadPoolExecutor 的定制化实现

在实际使用线程池的场景中,咱们每每不是直接使用 ThreadPoolExecutor ,而是使用 JDK 中提供的具备表明性的线程池实例。

newSingleThreadExecutor

建立一个单线程的线程池

只会建立惟一的工做线程来执行任务,保证全部任务按照指定顺序(FIFO, LIFO, 优先级)执行。 若是这个惟一的线程由于异常结束,那么会有一个新的线程来替代它

单工做线程最大的特色是:可保证顺序地执行各个任务

示例:

public class SingleThreadExecutorDemo {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 100; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 执行");
                }
            });
        }
        executorService.shutdown();
    }

}
复制代码

newFixedThreadPool

建立一个固定大小的线程池

每次提交一个任务就会新建立一个工做线程,若是工做线程数量达到线程池最大线程数,则将提交的任务存入到阻塞队列中

FixedThreadPool 是一个典型且优秀的线程池,它具备线程池提升程序效率和节省建立线程时所耗的开销的优势。可是,在线程池空闲时,即线程池中没有可运行任务时,它不会释放工做线程,还会占用必定的系统资源。

示例:

public class FixedThreadPoolDemo {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 100; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 执行");
                }
            });
        }
        executorService.shutdown();
    }

}
复制代码

newCachedThreadPool

建立一个可缓存的线程池

  • 若是线程池长度超过处理任务所须要的线程数,就会回收部分空闲的线程;
  • 若是长时间没有往线程池中提交任务,即若是工做线程空闲了指定的时间(默认为 1 分钟),则该工做线程将自动终止。终止后,若是你又提交了新的任务,则线程池从新建立一个工做线程。
  • 此线程池不会对线程池大小作限制,线程池大小彻底依赖于操做系统(或者说 JVM)可以建立的最大线程大小。 所以,使用 CachedThreadPool 时,必定要注意控制任务的数量,不然,因为大量线程同时运行,颇有会形成系统瘫痪。

示例:

public class CachedThreadPoolDemo {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 100; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 执行");
                }
            });
        }
        executorService.shutdown();
    }

}
复制代码

newScheduleThreadPool

建立一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

public class ScheduledThreadPoolDemo {

    public static void main(String[] args) {
        schedule();
        scheduleAtFixedRate();
    }

    private static void schedule() {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        for (int i = 0; i < 100; i++) {
            executorService.schedule(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 执行");
                }
            }, 1, TimeUnit.SECONDS);
        }
        executorService.shutdown();
    }

    private static void scheduleAtFixedRate() {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        for (int i = 0; i < 100; i++) {
            executorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + " 执行");
                }
            }, 1, 1, TimeUnit.SECONDS);
        }
        executorService.shutdown();
    }

}
复制代码

参考资料

相关文章
相关标签/搜索