Java 线程池的认识和使用

多线程编程很难,难点在于多线程代码的执行不是按照咱们直觉上的执行顺序。因此多线程编程必需要创建起一个宏观的认识。java

线程池是多线程编程中的一个重要概念。为了可以更好地使用多线程,学习好线程池固然是必须的。git

为何要使用线程池?

平时咱们在使用多线程的时候,一般都是架构师配置好了线程池的 Bean,咱们须要使用的时候,提交一个线程便可,不须要过多关注其内部原理。github

在学习一门新的技术以前,咱们仍是先了解下为何要使用它,使用它可以解决什么问题:面试

  1. 建立/销毁线程伴随着系统开销,过于频繁的建立/销毁线程,会很大程度上影响处理效率

    例如:编程

    记建立线程消耗时间T1,执行任务消耗时间T2,销毁线程消耗时间T3缓存

    若是T1+T3>T2,那么是否是说开启一个线程来执行这个任务太不划算了!多线程

    正好,线程池缓存线程,可用已有的闲置线程来执行新任务,避免了T1+T3带来的系统开销架构

  2. 线程并发数量过多,抢占系统资源从而致使阻塞

    咱们知道线程能共享系统资源,若是同时执行的线程过多,就有可能致使系统资源不足而产生阻塞的状况并发

    运用线程池能有效的控制线程最大并发数,避免以上的问题异步

  3. 对线程进行一些简单的管理

    好比:延时执行、定时循环执行的策略等

    运用线程池都能进行很好的实现

建立一个线程池

在 Java 中,新建一个线程池对象很是简单,Java 自己提供了工具类java.util.concurrent.Executors,可使用以下代码建立一个固定数量线程的线程池:

ExecutorService service = Executors.newFixedThreadPool(10);

注意:以上代码用来测试还能够,实际使用中最好可以显示地指定相关参数。

咱们能够看下其内部源码实现:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

在阿里巴巴代码规范中,建议咱们本身指定线程池的相关参数,为的是让开发人员可以自行理解线程池建立中的每一个参数,根据实际状况,建立出合理的线程池。接下来,咱们来剖析下java.util.concurrent.ThreadPoolExecutor的构造方法参数。

ThreadPoolExecutor 浅析

java.util.concurrent.ThreadPoolExecutor有多个构造方法,咱们拿参数最多的构造方法来举例,如下是阿里巴巴代码规范中给出的建立线程池的范例:

ThreadPoolExecutor service = new ThreadPoolExecutor(5, 200,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024), 
                new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(), 
                new ThreadPoolExecutor.AbortPolicy());

贴一张IDEA中的图更方便看:

clipboard.png

首先最重要的几个参数,可能就是:corePoolSizemaximumPoolSizeworkQueue了,先看下这几个参数的解释:

  • corePoolSize
    用于设定 thread pool 须要时刻保持的最小 core threads 的数量,即使这些 core threads 处于空闲状态啥事都不作也不会将它们回收掉,固然前提是你没有设置 allowCoreThreadTimeOut 为 true。至于 pool 是如何作到保持这些个 threads 不死的,咱们稍后再说。
  • maximumPoolSize
    用于限定 pool 中线程数的最大值。若是你本身构造了 pool 且传入了一个 Unbounded 的 queue 且没有设置它的 capacity,那么很差意思,最大线程数会永远 <= corePoolSize,maximumPoolSize 变成了无效的。
  • workQueue
    该线程池中的任务队列:维护着等待执行的 Runnable 对象。当全部的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,若是队列满了,则新建非核心线程执行任务

因为本文是初步了解线程池,因此先理解这几个参数,上文对于这三个参数的解释,基本上跟JDK源码中的注释一致(java.util.concurrent.ThreadPoolExecutor#execute里的代码)。

咱们编写个程序来方便理解:

// 建立线程池
ThreadPoolExecutor service = new ThreadPoolExecutor(5, 200,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(1024),
            new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(),
            new ThreadPoolExecutor.AbortPolicy());
// 等待执行的runnable   
Runnable runnable = () -> {
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
};

// 启动的任务数量
int counts = 1224;
for (int i = 0; i < counts; i++) {
    service.execute(runnable);
}

// 监控线程池执行状况的代码 
ThreadPoolExecutor tpe = ((ThreadPoolExecutor) service);
while (true) {
    System.out.println();

    int queueSize = tpe.getQueue().size();
    System.out.println("当前排队线程数:" + queueSize);

    int activeCount = tpe.getActiveCount();
    System.out.println("当前活动线程数:" + activeCount);

    long completedTaskCount = tpe.getCompletedTaskCount();
    System.out.println("执行完成线程数:" + completedTaskCount);

    long taskCount = tpe.getTaskCount();
    System.out.println("总线程数:" + taskCount);

    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

线程池的容量与咱们启动的任务数量息息相关。

已知:

  • corePoolSize = 5
  • maximumPoolSize = 200
  • workQueue.size() = 1024

咱们修改同时 execute 添加到线程池的 Runnable 数量 counts:

  • counts <= corePoolSize:全部的任务均为核心线程执行,没有任何 Runnable 被添加到 workQueue中
当前排队线程数:0
当前活动线程数:3
执行完成线程数:0
总线程数:3
  • corePoolSize < counts <= corePoolSize + workQueue.size():全部任务均为核心线程执行,当核心线程处于繁忙状态,则将任务添加到 workQueue 中等待
当前排队线程数:15
当前活动线程数:5
执行完成线程数:0
总线程数:20
  • corePoolSize + workQueue.size() < counts <= maximumPoolSize + workQueue.size():corePoolSize 个线程由核心线程执行,超出队列长度 workQueue.size() 的任务,将另启动非核心线程执行
当前排队线程数:1024
当前活动线程数:105
执行完成线程数:0
总线程数:1129
  • counts > maximumPoolSize + workQueue.size():将会报异常java.util.concurrent.RejectedExecutionException
java.util.concurrent.RejectedExecutionException: Task com.bwjava.util.ExecutorServiceUtilTest$$Lambda$1/314265080@725bef66 rejected from java.util.concurrent.ThreadPoolExecutor@2aaf7cc2[Running, pool size = 200, active threads = 200, queued tasks = 1024, completed tasks = 0]

线程池踩坑:线程嵌套致使阻塞

此次的踩坑才是我写这篇文章的初衷,借此机会好好了解下线程池的各个概念。自己这段时间在研究爬虫,为了尽可能提升爬虫的效率,用到了多线程处理。因为代码写得比较随性,因此遇到了一个阻塞的问题,研究了一下才搞明白,模拟的代码以下:

ThreadPoolExecutor service = new ThreadPoolExecutor(5, 200,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<>(1024),
        new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(),
        new ThreadPoolExecutor.AbortPolicy());

@Test
public void testBlock() {
    Runnable runnableOuter = () -> {
        try {
            Runnable runnableInner1 = () -> {
                try {
                    TimeUnit.SECONDS.sleep(3); // 模拟比较耗时的爬虫操做
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
            Future<?> submit = service.submit(runnableInner1);

            submit.get(); // 实际业务中,runnableInner2须要用到此处返回的参数,因此必须get

            Runnable runnableInner2 = () -> {
                try {
                    TimeUnit.SECONDS.sleep(5); // 模拟比较耗时的爬虫操做
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            };
            Future<?> submit2 = service.submit(runnableInner2);
            submit2.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    };

    for (int i = 0; i < 20; i++) {
        service.execute(runnableOuter);
    }

    ThreadPoolExecutor tpe = ((ThreadPoolExecutor) service);

    while (true) {
        System.out.println();

        int queueSize = tpe.getQueue().size();
        System.out.println("当前排队线程数:" + queueSize);

        int activeCount = tpe.getActiveCount();
        System.out.println("当前活动线程数:" + activeCount);

        long completedTaskCount = tpe.getCompletedTaskCount();
        System.out.println("执行完成线程数:" + completedTaskCount);

        long taskCount = tpe.getTaskCount();
        System.out.println("总线程数:" + taskCount);

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

线程池是前文的线程池,参数彻底不变。线程的监控代码也一致。当咱们运行这个单元测试的时候,会发现打印出来的结果一直是以下:

当前排队线程数:15
当前活动线程数:5
执行完成线程数:0
总线程数:20

当前排队线程数:20
当前活动线程数:5
执行完成线程数:0
总线程数:25

当前排队线程数:20
当前活动线程数:5
执行完成线程数:0
总线程数:25

……略

根本问题是 Runnable 内部还嵌套了 Runnable ,且他们都提交到了一个线程池。下面分步骤说明问题:

  1. runnableOuter 被提交到了线程池
  2. runnableOuter 开始执行,runnableInner1 被提交到线程池,对 runnableInner1 的结果进行 get,致使runnableOuter 被阻塞

    1. 于此同时,更多的 runnableOuter 被提交到线程池,核心线程被 runnableOuter 和 runnableInner1 占满,多余的线程 runnableInner2 被加入 workQueue 中等待执行
  3. runnableInner2 被提交到线程池,可是由于核心线程已满,被提交到了 workQueue ,也处于阻塞状态,此时对 runnableInner2 的结果进行 get,致使 runnableOuter 被阻塞
  4. runnableOuter 被阻塞,没法释放核心线程资源,而 runnableInner2 又由于没法获得核心线程资源,只能呆在 workQueue 里,致使整个程序卡死,没法返回。(有点相似死锁,互相占有了资源,对方不释放,我也不释放)

用图表示大概为:

clipboard.png

既然明白了出错的缘由,那么解决起来就简单了。这个案例告诉咱们,设计一个多线程程序,必定要自顶向下有一个良好的设计,而后再开始编码,不可以盲目地使用多线程、线程池,这样只会致使程序出现莫名其妙的错误。

动态修改 corePoolSize & maximumPoolSize

其实这个我没怎么关注过,曾经在一次面试中被问到过。很简单,java.util.concurrent.ThreadPoolExecutor提供了Setter方法,能够直接设置相关参数。按我目前的实践经验,几乎没有用到过,可是知道这个聊胜于无吧。特定的复杂场景下应该颇有用。

线程池和消息队列

笔者在实际工程应用中,使用过多线程和消息队列处理过异步任务。不少新手工程师每每弄不清楚这二者的区别。按笔者的浅见:

多线程是用来充分利用多核 CPU 以提升程序性能的一种开发技术,线程池能够维持一个队列保存等待处理的多线程任务,可是因为此队列是内存控制的,因此断电或系统故障后未执行的任务会丢失。

消息队列是为消息处理而生的一门技术。其根据消费者的自身消费能力进行消费的特性使其普遍用于削峰的高并发任务处理。此外利用其去耦合的特性也能够实现代码上的解耦。消息队列大多能够对其消息进行持久化,即便断电也可以恢复未被消费的任务并继续处理。

以上是笔者在学习实践以后对于多线程和消息队列的粗浅认识,初学者切莫混淆二者的做用。

参考文献:

  1. Deep thinking in Java thread pool
  2. 线程池,这一篇或许就够了
相关文章
相关标签/搜索