线程池没你想的那么简单

前言

原觉得线程池还挺简单的(平时经常使用,也分析过原理),此次是想本身动手写一个线程池来更加深刻的了解它;但在动手写的过程当中落地到细节时发现并没想的那么容易。结合源码对比后确实不得不佩服 Doug Leajava

我以为大部分人直接去看 java.util.concurrent.ThreadPoolExecutor 的源码时都是看一个大概,由于其中涉及到了许多细节处理,还有部分 AQS 的内容,因此想要理清楚具体细节并非那么容易。git

与其挨个分析源码不如本身实现一个简版,固然简版并不意味着功能缺失,须要保证核心逻辑一致。github

因此也是本篇文章的目的:安全

本身动手写一个五脏俱全的线程池,同时会了解到线程池的工做原理,以及如何在工做中合理的利用线程池。多线程

再开始以前建议对线程池不是很熟悉的朋友看看这几篇:并发

这里我截取了部份内容,也许能够埋个伏笔(坑)。函数


具体请看这两个连接。测试

因为篇幅限制,本次可能会分为上下两篇。this

建立线程池

如今进入正题,新建了一个 CustomThreadPool 类,它的工做原理以下:spa

简单来讲就是往线程池里边丢任务,丢的任务会缓冲到队列里;线程池里存储的其实就是一个个的 Thread ,他们会一直不停的从刚才缓冲的队列里获取任务执行。

流程仍是挺简单。

先来看看咱们这个自创的线程池的效果如何吧:

初始化了一个核心为三、最大线程数为五、队列大小为 4 的线程池。

先往其中丢了 10 个任务,因为阻塞队列的大小为 4 ,最大线程数为 5 ,因此因为队列里缓冲不了最终会建立 5 个线程(上限)。

过段时间没有任务提交后(sleep)则会自动缩容到三个线程(保证不会小于核心线程数)。

构造函数

来看看具体是如何实现的。

下面则是这个线程池的构造函数:

会有如下几个核心参数:

  • miniSize 最小线程数,等效于 ThreadPool 中的核心线程数。
  • maxSize 最大线程数。
  • keepAliveTime 线程保活时间。
  • workQueue 阻塞队列。
  • notify 通知接口。

大体上都和 ThreadPool 中的参数相同,而且做用也是相似的。

须要注意的是其中初始化了一个 workers 成员变量:

/** * 存放线程池 */
    private volatile Set<Worker> workers;
    
    public CustomThreadPool(int miniSize, int maxSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, Notify notify) {
       
        workers = new ConcurrentHashSet<>();
    }
复制代码

workers 是最终存放线程池中运行的线程,在 j.u.c 源码中是一个 HashSet 因此对他全部的操做都是须要加锁。

我这里为了简便起见就本身定义了一个线程安全的 Set 称为 ConcurrentHashSet

其实原理也很是简单,和 HashSet 相似也是借助于 HashMap 来存放数据,利用其 key 不可重复的特性来实现 set ,只是这里的 HashMap 是用并发安全的 ConcurrentHashMap 来实现的。

这样就能保证对它的写入、删除都是线程安全的。

不过因为 ConcurrentHashMapsize() 函数并不许确,因此我这里单独利用了一个 AtomicInteger 来统计容器大小。

建立核心线程

往线程池中丢一个任务的时候其实要作的事情还蛮多的,最重要的事情莫过于建立线程存放到线程池中了。

固然咱们不能无限制的建立线程,否则拿线程池来就没任何意义了。因而 miniSize maxSize 这两个参数就有了它的意义。

但这两个参数再哪一步的时候才起到做用呢?这就是首先须要明确的。

从这个流程图能够看出第一步是须要判断是否大于核心线程数,若是没有则建立。

结合代码能够发如今执行任务的时候会判断是否大于核心线程数,从而建立线程。

worker.startTask() 执行任务部分放到后面分析。

这里的 miniSize 因为会在多线程场景下使用,因此也用 volatile 关键字来保证可见性。

队列缓冲

结合上面的流程图,第二步天然是要判断队列是否能够存听任务(是否已满)。

优先会往队列里存放。

上至封顶

一旦写入失败则会判断当前线程池的大小是否大于最大线程数,若是没有则继续建立线程执行。

否则则执行会尝试阻塞写入队列(j.u.c 会在这里执行拒绝策略)

以上的步骤和刚才那张流程图是同样的,这样你们是否有看出什么坑嘛?

时刻当心

从上面流程图的这两步能够看出会直接建立新的线程

这个过程相对于中间直接写入阻塞队列的开销是很是大的,主要有如下两个缘由:

  • 建立线程会加锁,虽然说最终用的是 ConcurrentHashMap 的写入函数,但依然存在加锁的可能。
  • 会建立新的线程,建立线程还须要调用操做系统的 API 开销较大。

因此理想状况下咱们应该避免这两步,尽可能让丢入线程池中的任务进入阻塞队列中。

执行任务

任务是添加进来了,那是如何执行的?

在建立任务的时候提到过 worker.startTask() 函数:

/** * 添加任务,须要加锁 * @param runnable 任务 */
    private void addWorker(Runnable runnable) {
        Worker worker = new Worker(runnable, true);
        worker.startTask();
        workers.add(worker);
    }
复制代码

也就是在建立线程执行任务的时候会建立 Worker 对象,利用它的 startTask() 方法来执行任务。

因此先来看看 Worker 对象是长啥样的:

其实他自己也是一个线程,将接收到须要执行的任务存放到成员变量 task 处。

而其中最为关键的则是执行任务 worker.startTask() 这一步骤。

public void startTask() {
        thread.start();
    }
复制代码

其实就是运行了 worker 线程本身,下面来看 run 方法。

  • 第一步是将建立线程时传过来的任务执行(task.run),接着会一直不停的从队列里获取任务执行,直到获取不到新任务了。
  • 任务执行完毕后将内置的计数器 -1 ,方便后面任务所有执行完毕进行通知。
  • worker 线程获取不到任务后退出,须要将本身从线程池中释放掉(workers.remove(this))。

从队列里获取任务

其实 getTask 也是很是关键的一个方法,它封装了从队列中获取任务,同时对不须要保活的线程进行回收。

很明显,核心做用就是从队列里获取任务;但有两个地方须要注意:

  • 当线程数超过核心线程数时,在获取任务的时候须要经过保活时间从队列里获取任务;一旦获取不到任务则队列确定是空的,这样返回 null 以后在上文的 run() 中就会退出这个线程;从而达到了回收线程的目的,也就是咱们以前演示的效果
  • 这里须要加锁,加锁的缘由是这里确定会出现并发状况,不加锁会致使 workers.size() > miniSize 条件屡次执行,从而致使线程被所有回收完毕。

关闭线程池

最后来谈谈线程关闭的事;

仍是以刚才那段测试代码为例,若是提交任务后咱们没有关闭线程,会发现即使是任务执行完毕后程序也不会退出。

从刚才的源码里其实也很容易看出来,不退出的缘由是 Worker 线程必定还会一直阻塞在 task = workQueue.take(); 处,即使是线程缩容了也不会小于核心线程数。

经过堆栈也能证实:

刚好剩下三个线程阻塞于此处。

而关闭线程一般又有如下两种:

  • 当即关闭:执行关闭方法后无论如今线程池的运行情况,直接一刀切所有停掉,这样会致使任务丢失。
  • 不接受新的任务,同时等待现有任务执行完毕后退出线程池。

当即关闭

咱们先来看第一种当即关闭

/** * 当即关闭线程池,会形成任务丢失 */
    public void shutDownNow() {
        isShutDown.set(true);
        tryClose(false);
    }
    
    /** * 关闭线程池 * * @param isTry true 尝试关闭 --> 会等待全部任务执行完毕 * false 当即关闭线程池--> 任务有丢失的可能 */
    private void tryClose(boolean isTry) {
        if (!isTry) {
            closeAllTask();
        } else {
            if (isShutDown.get() && totalTask.get() == 0) {
                closeAllTask();
            }
        }

    }

    /** * 关闭全部任务 */
    private void closeAllTask() {
        for (Worker worker : workers) {
            //LOGGER.info("开始关闭");
            worker.close();
        }
    }
    
    public void close() {
        thread.interrupt();
    }
复制代码

很容易看出,最终就是遍历线程池里全部的 worker 线程挨个执行他们的中断函数。

咱们来测试一下:

能够发现后面丢进去的三个任务实际上是没有被执行的。

完过后关闭

正常关闭则不同:

/** * 任务执行完毕后关闭线程池 */
    public void shutdown() {
        isShutDown.set(true);
        tryClose(true);
    }
复制代码

他会在这里多了一个判断,须要全部任务都执行完毕以后才会去中断线程。

同时在线程须要回收时都会尝试关闭线程:


来看看实际效果:

回收线程

上文或多或少提到了线程回收的事情,其实总结就是如下两点:

  • 一旦执行了 shutdown/shutdownNow 方法都会将线程池的状态置为关闭状态,这样只要 worker 线程尝试从队列里获取任务时就会直接返回空,致使 worker 线程被回收。
  • 一旦线程池大小超过了核心线程数就会使用保活时间来从队列里获取任务,因此一旦获取不到返回 null 时就会触发回收。

但若是咱们的队列足够大,致使线程数都不会超过核心线程数,这样是不会触发回收的。

好比这里我将队列大小调为 10 ,这样任务就会累计在队列里,不会建立五个 worker 线程。

因此一直都是 Thread-1~3 这三个线程在反复调度任务。

总结

本次实现了线程池里大部分核心功能,我相信只要看完并动手敲一遍必定会对线程池有不同的理解。

结合目前的内容来总结下:

  • 线程池、队列大小要设计的合理,尽可能的让任务从队列中获取执行。
  • 慎用 shutdownNow() 方法关闭线程池,会致使任务丢失(除非业务容许)。
  • 若是任务多,线程执行时间短能够调大 keepalive 值,使得线程尽可能不被回收从而能够复用线程。

同时下次会分享一些线程池的新特性,如:

  • 执行带有返回值的线程。
  • 异常处理怎么办?
  • 全部任务执行完怎么通知我?

本文全部源码:

github.com/crossoverJi…

你的点赞与分享是对我最大的支持

相关文章
相关标签/搜索