如何优雅的使用线程池

线程池不只在项目中是很是经常使用的一项技术并且在面试中基本上也是必问的知识点,接下来跟着我一块儿来巩固一下线程池的相关知识。在了解线程池以前咱们先了解一下什么是进程什么是线程java

进程

  • 程序:通常是一组CPU指令的集合构成的文件,静态存储在诸如硬盘之类的存储设备上
  • 进程:当一个程序要被计算机运行时,就是在内存中产生该程序的一个运行时实例,咱们就把这个实例叫作进程

用户下达运行程序的命令之后,就会产生一个进程,同一个程序能够产生多个进程(一对多的关系),以容许同时有多个用户运行同一个程序,却不会相冲突。nginx

进程须要一些资源才能工做,如CPU的使用时间、存储器、文件、以及I/O设备,且为依序逐一执行,也就是每一个CPU核心任什么时候间内仅能运行一项进程。可是在一个应用程序中通常不会是只有一个任务单条线执行下去,确定会有多个任务,而建立进程又是耗费时间和资源的,称之为重量级操做。git

  1. 建立进程占用资源太多
  2. 进程之间的通讯须要数据在不一样的内存空间传来传去,因此进程间通讯会更加耗费时间和资源

线程

线程是操做系统可以进行运算调度的最小单位,大部分状况下它被包含在进程之中,是进程中实际的运做单位。一个进程能够并发多个线程,每一个线程执行不一样的任务。同一个进程中的多条线程共享该进程中的所有虚拟资源,例如虚拟地址空间、文件描述符、信号处理等等。可是同一个进程中的多个线程各自有各自的调用栈。github

一个进程能够有不少线程,每条线程并行执行不一样的任务。web

线程中的数据

  1. 线程栈上的本地数据:好比函数执行过程的局部变量,咱们知道在Java中线程模型是使用栈的模型。每一个线程都有本身的栈空间。
  2. 在整个进程里共享的全局数据:咱们知道在Java程序中,Java就是一个进程,咱们能够经过ps -ef | grep java能够看到在程序中运行了多少个Java进程,例如咱们Java中的全局变量,在不一样进程之间是隔离的,可是在线程之间是共享的。
  3. 线程的私有数据:在Java中咱们能够经过ThreadLocal来建立线程间私有的数据变量。

线程栈上的本地数据只能在本方法内有效,而线程的私有数据是在线程间多个函数共享的。面试

CPU密集型和IO密集型

理解是服务器是CPU密集型仍是IO密集型可以帮助咱们更好的设置线程池中的参数。具体如何设置咱们在后面讲到线程池的时候再分析,这里你们先知道这两个概念。数据库

  • IO密集型:大部分时间CPU闲着,在等待磁盘的IO操做
  • CPU(计算)密集型:大部分时间磁盘IO闲着,等着CPU的计算操做

线程池

线程池实际上是池化技术的应用一种,常见的池化技术还有不少,例如数据库的链接池、Java中的内存池、常量池等等。而为何会有池化技术呢?程序的运行本质,就是经过使用系统资源(CPU、内存、网络、磁盘等等)来完成信息的处理,好比在JVM中建立一个对象实例须要消耗CPU的和内存资源,若是你的程序须要频繁建立大量的对象,而且这些对象的存活时间短就意味着须要进行频繁销毁,那么颇有可能这段代码就成为了性能的瓶颈。总结下来其实就如下几点。编程

  • 复用相同的资源,减小浪费,减小新建和销毁的成本;
  • 减小单独管理的成本,统一交由"池";
  • 集中管理,减小"碎片";
  • 提升系统响应速度,由于池中有现成的资源,不用从新去建立;

因此池化技术就是为了解决咱们这些问题的,简单来讲,线程池就是将用过的对象保存起来,等下一次须要这个对象的时候,直接从对象池中拿出来重复使用,避免频繁的建立和销毁。在Java中万物皆对象,那么线程也是一个对象,Java线程是对于操做系统线程的封装,建立Java线程也须要消耗操做系统的资源,所以就有了线程池。可是咱们该如何建立呢?缓存

Java提供的四种线程池

Java为咱们提供了四种建立线程池的方法。服务器

  • Executors.newCachedThreadPool:建立可缓存无限制数量的线程池,若是线程中没有空闲线程池的话此时再来任务会新建线程,若是超过60秒此线程无用,那么就会将此线程销毁。简单来讲就是忙不来的时候无限制建立临时线程,闲下来再回收

    1public static ExecutorService newCachedThreadPool() {
    2    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    3                                  60L, TimeUnit.SECONDS,
    4                                  new SynchronousQueue<Runnable>());
    5}
    复制代码
  • Executors.newFixedThreadPool:建立固定大小的线程池,可控制线程最大的并发数,超出的线程会在队列中等待。简单来讲就是忙不来的时候会将任务放到无限长度的队列里。

    1   public static ExecutorService newFixedThreadPool(int nThreads) {
    2    return new ThreadPoolExecutor(nThreads, nThreads,
    3                                  0L, TimeUnit.MILLISECONDS,
    4                                  new LinkedBlockingQueue<Runnable>());
    5}
    复制代码
  • Executors.newSingleThreadExecutor:建立线程池中线程数量为1的线程池,用惟一线程来执行任务,保证任务是按照指定顺序执行

    1public static ExecutorService newSingleThreadExecutor() {
    2    return new FinalizableDelegatedExecutorService
    3        (new ThreadPoolExecutor(11,
    4                                0L, TimeUnit.MILLISECONDS,
    5                                new LinkedBlockingQueue<Runnable>()));
    6}
    复制代码
  • Executors.newScheduledThreadPool:建立固定大小的线程池,支持定时及周期性的任务执行

    1public ScheduledThreadPoolExecutor(int corePoolSize) {
    2    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
    3          new DelayedWorkQueue());
    4}
    复制代码

线程池的建立原理

咱们点击去这四种实现方式的源码中咱们能够看到其实它们的底层建立原理都是同样的,只不过是所传的参数不一样组成的四个不一样类型的线程池。都是使用了ThreadPoolExecutor来建立的。咱们能够看一下ThreadPoolExecutor建立所传的参数。

1public ThreadPoolExecutor(int corePoolSize,
2                              int maximumPoolSize,
3                              long keepAliveTime,
4                              TimeUnit unit,
5                              BlockingQueue<Runnable> workQueue,
6                              ThreadFactory threadFactory,
7                              RejectedExecutionHandler handler)

8

复制代码

那么这些参数都具体表明什么意思呢?

  • corePoolSize:线程池中核心线程数的数量
  • maximumPoolSize:在线程池中容许存在的最大线程数
  • keepAliveTime:当存在的线程数大于corePoolSize,那么会找到空闲线程去销毁,此参数是设置空闲多久的线程才被销毁。
  • unit:时间单位
  • workQueue:工做队列,线程池中的当前线程数大于核心线程的话,那么接下来的任务会放入到队列中
  • threadFactory:在建立线程的时候,经过工厂模式来生产线程。这个参数就是设置咱们自定义的线程建立工厂。
  • handler:若是超过了最大线程数,那么就会执行咱们设置的拒绝策略

接下来咱们将这些参数合起来看一下他们的处理逻辑是什么。

  1. corePoolSize个任务时,来一个任务就建立一个线程
  2. 若是当前线程池的线程数大于了corePoolSize那么接下来再来的任务就会放入到咱们上面设置的workQueue队列中
  3. 若是此时workQueue也满了,那么再来任务时,就会新建临时线程,那么此时若是咱们设置了keepAliveTime或者设置了allowCoreThreadTimeOut,那么系统就会进行线程的活性检查,一旦超时便销毁线程
  4. 若是此时线程池中的当前线程大于了maximumPoolSize最大线程数,那么就会执行咱们刚才设置的handler拒绝策略

为何建议不用Java提供的线程池建立方法

理解了上面设置的几个参数之后,咱们再来看一下为何在《阿里巴巴Java手册》中有一条这样规定。

相信你们看到上面提供四种建立线程池的实现原理,应该知道为何阿里巴巴会有这么规定了。

  • FixedThreadPoolSingleThreadExecutor:这两个线程池的实现方式,咱们能够看到它设置的工做队列都是LinkedBlockingQueue,咱们知道此队列是一个链表形式的队列,此队列是没有长度限制的,是一个无界队列,那么此时若是有大量请求,就有可能形成OOM
  • CachedThreadPoolScheduledThreadPool:这两个线程池的实现方式,咱们能够看到它设置的最大线程数都是Integer.MAX_VALUE,那么就至关于容许建立的线程数量为Integer.MAX_VALUE。此时若是有大量请求来的时候也有可能形成OOM

如何设置参数

因此咱们在项目中若是要使用线程池的话,那么就推荐根据本身项目和机器的状况进行个性化建立线程池。那么这些参数如何设置呢?为了正确的定制线程池的长度,须要理解你的计算机配置、所需资源的状况以及任务的特性。好比部署的计算机安装了多少个CPU?多少的内存?任务主要执行是IO密集型仍是CPU密集型?所执行任务是否须要数据库链接这样的稀缺资源?

若是你有多个不一样类别的任务,它们的行为有很大的差异,那么应该考虑使用多个线程池。这样也能根据每一个任务不一样定制不一样的线程池,也不至于由于一种类型的任务失败而托垮另外一个任务。

  • CPU密集型任务:说明包含了大量的运算操做,好比有N个CPU,那么就配置线程池的容量大小为N+1,这样能得到最优的利用率。由于CPU密集型的线程刚好在某时由于发生一个页错误或者由于其余的缘由而暂停,恰好有一个额外的线程,能够确保在这种状况下CPU周期不会中断工做。

  • IO密集任务:说明CPU大部分时间都是在等待IO的阻塞操做,那么此时就能够将线程池的容量大小配置的大一些。此时能够根据一些参数进行计算大概你的线程池的数量多少合适。

    • N:CPU的数量
    • U:目标CPU的使用率,0<=U<=1
    • W/C:等待时间与计算时间的比率
    • 那么最优的池的大小就是NU(1+W/C)

页缺失(英语:Page fault,又名硬错误、硬中断、分页错误、寻页缺失、缺页中断、页故障等)指的是当软件试图访问已映射在虚拟地址空间中,可是当前并未被加载在物理内存中的一个分页时,由中央处理器的内存管理单元所发出的中断

其实线程池大小的设置仍是要根据本身业务类型来设置,好比当前任务须要池化的资源的时候,好比数据库的链接池,俺么线程池的长度和资源池的长度会相互的影响。若是每个任务都须要一个数据库链接,那么链接池的大小就会限制了线程池的有效大小,相似的,当线程池中的任务是链接池的惟一消费者时,那么线程池的大小反而又会限制了链接池的有效大小。

线程池中的线程销毁

线程池的核心线程数(corePoolSize)、最大线程数(maximumPoolSize)、线程的存活时间(keepAliveTime)共同管理的线程的建立与销毁。接下来咱们再复习一下线程池是如何建立和销毁线程的

  • 当前线程数 < 核心线程数:来一个任务建立一个线程
  • 当前线程数 = 核心线程数:来一个任务就会将其加入到队列中
  • 当前线程数 > 核心线程数:此时有一个前提条件就是队列已满,才会新建线程,此时就会开启线程的活性检查,对于设置为keepAliveTime时间没有活动的线程将会被回收

那么这里可能有人会想到将corePoolSize核心线程数设置为0(若是你们还记得上面讲的CachedThreadPool的话应该还会记得它的核心线程数就是0),由于这样设置的话线程就会动态的进行建立了,闲的时候没有线程,忙的时候再在线程池中建立线程。这样想法当然是好,可是若是咱们自定义参数设置了此参数为0,而正好又设置了等待队列不是SynchronousQueue,那么其实就会有问题,由于只有在队列满的状况下才会新建线程。下面代码我使用了无界队列LinkedBlockingQueue,其实你们看一下输出

 1ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0,Integer.MAX_VALUE,1, TimeUnit.SECONDS,new LinkedBlockingQueue<>());
2for (int i = 0; i < 10; i++) {
3    threadPoolExecutor.execute(new Runnable() {
4        @Override
5        public void run(
{
6            try {
7                Thread.sleep(1000);
8            } catch (InterruptedException e) {
9                e.printStackTrace();
10            }
11            System.out.printf("1");
12        }
13    });
14}
复制代码

你们能够看一下演示的效果,其实1是每隔一秒打印一次,其实这就和咱们使用线程池初衷相悖了,由于咱们这个至关因而单线程在运行。

可是若是咱们将工做队列换成SynchronousQueue呢,咱们发现这些1是一块输出出来的。

SynchronousQueue并非一个真正的队列,而是一种管理直接在线程间移交信息的机制,这里能够简单将其想象成一个生产者生产消息交给SynchronousQueue,而消费者这边若是有线程来接收,那么此消息就会直接交给消费者,反之会阻塞。

因此咱们在设置线程池中某些参数的时候应该想一想其建立和销毁线程流程,否则咱们自定义的线程池还不如使用Java提供的四种线程池了。

线程池中的拒绝策略

ThreadPoolExecutor为咱们提供了四种拒绝策略,咱们能够看下Java提供的四种线程池建立所提供的拒绝策略都是其定义的默认的拒绝策略。那么除了这个拒绝策略其余的拒绝策略都是什么呢?

1private static final RejectedExecutionHandler defaultHandler =
2    new AbortPolicy();
复制代码

咱们能够到拒绝策略是一个接口RejectedExecutionHandler,这也就意味我着咱们能够本身订本身的拒绝策略,咱们先看一下Java提供四种拒绝策略是什么。

 1public interface RejectedExecutionHandler {
2
3    /**
4     * Method that may be invoked by a {@link ThreadPoolExecutor} when
5     * {@link ThreadPoolExecutor#execute execute} cannot accept a
6     * task.  This may occur when no more threads or queue slots are
7     * available because their bounds would be exceeded, or upon
8     * shutdown of the Executor.
9     *
10     * <p>In the absence of other alternatives, the method may throw
11     * an unchecked {@link RejectedExecutionException}, which will be
12     * propagated to the caller of {@code execute}.
13     *
14     * @param r the runnable task requested to be executed
15     * @param executor the executor attempting to execute this task
16     * @throws RejectedExecutionException if there is no remedy
17     */

18    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
19}
复制代码

AbortPolicy

这个拒绝策略就是Java提供的四种线程池建立方法提供的默认拒绝策略。咱们能够看下它的实现。

 1public static class AbortPolicy implements RejectedExecutionHandler {
2
3    public AbortPolicy() { }
4
5    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
6        throw new RejectedExecutionException("Task " + r.toString() +
7                                             " rejected from " +
8                                             e.toString());
9    }
10}
复制代码

因此此拒绝策略就是抛RejectedExecutionException异常

CallerRunsPolicy

此拒绝策略简单来讲就是将此任务交给调用者直接执行。

 1public static class CallerRunsPolicy implements RejectedExecutionHandler {
2
3    public CallerRunsPolicy() { }
4
5    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
6        if (!e.isShutdown()) {
7            r.run();
8        }
9    }
10}
复制代码

这里为何是交给了调用者来执行呢?咱们能够看到它是调用了run()方法,而不是start()方法。

DiscardOldestPolicy

从源码中应该能看出来,此拒绝策略是丢弃队列中最老的任务,而后再执行。

 1public static class DiscardOldestPolicy implements RejectedExecutionHandler {
2
3        public DiscardOldestPolicy() { }
4
5        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
6            if (!e.isShutdown()) {
7                e.getQueue().poll();
8                e.execute(r);
9            }
10        }
11    }
复制代码

DiscardPolicy

从源码中应该能看出来,此拒绝策略是对于当前任务不作任何操做,简单来讲就是直接丢弃了当前任务不执行。

1public static class DiscardPolicy implements RejectedExecutionHandler {
2
3    public DiscardPolicy() { }
4
5    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
6    }
7}
复制代码

线程池的拒绝策略给咱们默认提供了这四种的实现方式,固然咱们也可以自定义拒绝策略使线程池更加符合咱们当前的业务,在后面讲解Tomcat自定义本身的线程池时也会讲解它本身实现的拒绝策略。

线程饥饿死锁

线程池为“死锁”这一律念带来了一种新的可能:线程饥饿死锁。在线程池中,若是一个任务另外一个任务提交到同一个Executor,那么一般会引起死锁。第二个线程停留在工做队列中等待第一个提交的任务执行完成,可是第一个任务又没法执行完成,由于它在等待第二个任务执行完成。用图表示以下

用代码表示的话以下,这里注意咱们这里定义的线程池是SingleThreadExecutor,线程池中只有一个线程,这样好模拟出这样的状况,若是在更大的线程池中,若是全部线程都在等待其余仍处于工做队列的任务而阻塞,那么这种状况被称为线程饥饿死锁。因此尽可能避免在同一个线程池中处理两种不一样类型的任务。

 1public class AboutThread {
2    ExecutorService executorService = Executors.newSingleThreadExecutor();
3    public static void main(String[] args) {
4        AboutThread aboutThread = new AboutThread();
5        aboutThread.threadDeadLock();
6    }
7
8    public void threadDeadLock(){
9        Future<String> taskOne  = executorService.submit(new TaskOne());
10        try {
11            System.out.printf(taskOne.get());
12        } catch (InterruptedException e) {
13            e.printStackTrace();
14        } catch (ExecutionException e) {
15            e.printStackTrace();
16        }
17    }
18
19    public class TaskOne implements Callable{
20
21        @Override
22        public Object call() throws Exception {
23            Future<String> taskTow = executorService.submit(new TaskTwo());
24            return "TaskOne" + taskTow.get();
25        }
26    }
27
28    public class TaskTwo implements Callable{
29
30        @Override
31        public Object call() throws Exception {
32            return "TaskTwo";
33        }
34    }
35}
复制代码

拓展ThreadPoolExecutor

若是咱们想要对线程池进行一些扩展,那么可使用ThreadPoolExecutor给我预留的一些接口可使咱们进行更深层次话的定制线程池。

线程工厂

若是咱们想要给咱们的线程池中的每一个线程自定义一些名称,那么咱们就可使用线程工厂来实现一些自定义化的一些操做。只要咱们将咱们自定义的工厂传给ThreadPoolExecutor,那么不管什么时候线程池须要建立一个线程,都要经过咱们定义的工厂来进行建立。接下来咱们看一下接口ThreadFactory,只要咱们实现了此接口就能自定义本身线程独有的信息。

 1public interface ThreadFactory {
2
3    /**
4     * Constructs a new {@code Thread}.  Implementations may also initialize
5     * priority, name, daemon status, {@code ThreadGroup}, etc.
6     *
7     * @param r a runnable to be executed by new thread instance
8     * @return constructed thread, or {@code null} if the request to
9     *         create a thread is rejected
10     */

11    Thread newThread(Runnable r);
12}
复制代码

接下来咱们能够看咱们本身写的线程池工厂类

 1class CustomerThreadFactory implements ThreadFactory{
2
3    private String name;
4    private final AtomicInteger threadNumber = new AtomicInteger(1);
5    CustomerThreadFactory(String name){
6        this.name = name;
7    }
8
9    @Override
10    public Thread newThread(Runnable r) {
11        Thread thread = new Thread(r,name+threadNumber.getAndIncrement());
12        return thread;
13    }
14}
复制代码

只须要在进行线程池实例化的时候将此工厂类加上去便可

 1   public static void customerThread(){
2        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0,Integer.MAX_VALUE,1, TimeUnit.SECONDS,new SynchronousQueue<>(),
3                new CustomerThreadFactory("customerThread"));
4
5        for (int i = 0; i < 10; i++) {
6            threadPoolExecutor.execute(new Runnable() {
7                @Override
8                public void run(
{
9                    System.out.printf(Thread.currentThread().getName());
10                    System.out.printf("\n");
11                }
12            });
13        }
14    }
复制代码

接下来咱们执行此语句,发现每一个线程的名字已经变了

 1customerThread1
2customerThread10
3customerThread9
4customerThread8
5customerThread7
6customerThread6
7customerThread5
8customerThread4
9customerThread3
10customerThread2
复制代码

经过继承ThreadPoolExecutor扩展

咱们查看ThreadPoolExecutor源码能够发现源码中有三个方法都是protected

1protected void beforeExecute(Thread t, Runnable r) { }
2protected void afterExecute(Runnable r, Throwable t) { }
3protected void terminated() { }
复制代码

被protected修饰的成员对于本包和其子类可见

咱们能够经过继承来覆写这些方法,那么就能够进行咱们独有的扩展了。执行任务的线程会调用beforeExecuteafterExecute方法,能够经过它们添加日志、时序、监视器或者同级信息收集的功能。不管任务是正常从run中返回,仍是抛出一个异常,afterExecute都会被调用(若是任务完成后抛出一个Error,则afterExecute不会被调用)。若是beforeExecute抛出一个RuntimeException,任务将不会被执行,afterExecute也不会被调用。

在线程池完成关闭时调用terminated,也就是在全部任务都已经完成而且全部工做者线程也已经关闭后,terminated能够用来释放Executor在其生命周期里分配的各类资源,此外还能够执行发送通知、记录日志或者手机finalize统计等操做。

本篇文章代码地址

有感兴趣的能够关注一下我新建的公众号,搜索[程序猿的百宝袋]。或者直接扫下面的码也行。

参考

相关文章
相关标签/搜索