原觉得线程池还挺简单的(平时经常使用,也分析过原理),此次是想本身动手写一个线程池来更加深刻的了解它;但在动手写的过程当中落地到细节时发现并没想的那么容易。结合源码对比后确实不得不佩服 Doug Lea
。java
我以为大部分人直接去看 java.util.concurrent.ThreadPoolExecutor
的源码时都是看一个大概,由于其中涉及到了许多细节处理,还有部分 AQS
的内容,因此想要理清楚具体细节并非那么容易。git
与其挨个分析源码不如本身实现一个简版,固然简版并不意味着功能缺失,须要保证核心逻辑一致。github
因此也是本篇文章的目的:安全
本身动手写一个五脏俱全的线程池,同时会了解到线程池的工做原理,以及如何在工做中合理的利用线程池。多线程
再开始以前建议对线程池不是很熟悉的朋友看看这几篇:并发
这里我截取了部份内容,也许能够埋个伏笔(坑)。函数
具体请看这两个连接。测试
因为篇幅限制,本次可能会分为上下两篇。this
如今进入正题,新建了一个 CustomThreadPool
类,它的工做原理以下:spa
简单来讲就是往线程池里边丢任务,丢的任务会缓冲到队列里;线程池里存储的其实就是一个个的 Thread
,他们会一直不停的从刚才缓冲的队列里获取任务执行。
流程仍是挺简单。
先来看看咱们这个自创的线程池的效果如何吧:
初始化了一个核心为三、最大线程数为五、队列大小为 4 的线程池。
先往其中丢了 10 个任务,因为阻塞队列的大小为 4 ,最大线程数为 5 ,因此因为队列里缓冲不了最终会建立 5 个线程(上限)。
过段时间没有任务提交后(sleep
)则会自动缩容到三个线程(保证不会小于核心线程数)。
来看看具体是如何实现的。
下面则是这个线程池的构造函数:
会有如下几个核心参数:
miniSize
最小线程数,等效于 ThreadPool
中的核心线程数。maxSize
最大线程数。keepAliveTime
线程保活时间。workQueue
阻塞队列。notify
通知接口。大体上都和 ThreadPool
中的参数相同,而且做用也是相似的。
须要注意的是其中初始化了一个 workers
成员变量:
/** * 存放线程池 */
private volatile Set<Worker> workers;
public CustomThreadPool(int miniSize, int maxSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, Notify notify) {
workers = new ConcurrentHashSet<>();
}
复制代码
workers
是最终存放线程池中运行的线程,在 j.u.c
源码中是一个 HashSet
因此对他全部的操做都是须要加锁。
我这里为了简便起见就本身定义了一个线程安全的 Set
称为 ConcurrentHashSet
。
其实原理也很是简单,和 HashSet
相似也是借助于 HashMap
来存放数据,利用其 key
不可重复的特性来实现 set
,只是这里的 HashMap
是用并发安全的 ConcurrentHashMap
来实现的。
这样就能保证对它的写入、删除都是线程安全的。
不过因为 ConcurrentHashMap
的 size()
函数并不许确,因此我这里单独利用了一个 AtomicInteger
来统计容器大小。
往线程池中丢一个任务的时候其实要作的事情还蛮多的,最重要的事情莫过于建立线程存放到线程池中了。
固然咱们不能无限制的建立线程,否则拿线程池来就没任何意义了。因而 miniSize maxSize
这两个参数就有了它的意义。
但这两个参数再哪一步的时候才起到做用呢?这就是首先须要明确的。
从这个流程图能够看出第一步是须要判断是否大于核心线程数,若是没有则建立。
结合代码能够发如今执行任务的时候会判断是否大于核心线程数,从而建立线程。
worker.startTask()
执行任务部分放到后面分析。
这里的 miniSize
因为会在多线程场景下使用,因此也用 volatile
关键字来保证可见性。
结合上面的流程图,第二步天然是要判断队列是否能够存听任务(是否已满)。
优先会往队列里存放。
一旦写入失败则会判断当前线程池的大小是否大于最大线程数,若是没有则继续建立线程执行。
否则则执行会尝试阻塞写入队列(j.u.c
会在这里执行拒绝策略)
以上的步骤和刚才那张流程图是同样的,这样你们是否有看出什么坑嘛?
从上面流程图的这两步能够看出会直接建立新的线程。
这个过程相对于中间直接写入阻塞队列的开销是很是大的,主要有如下两个缘由:
因此理想状况下咱们应该避免这两步,尽可能让丢入线程池中的任务进入阻塞队列中。
任务是添加进来了,那是如何执行的?
在建立任务的时候提到过 worker.startTask()
函数:
/** * 添加任务,须要加锁 * @param runnable 任务 */
private void addWorker(Runnable runnable) {
Worker worker = new Worker(runnable, true);
worker.startTask();
workers.add(worker);
}
复制代码
也就是在建立线程执行任务的时候会建立 Worker
对象,利用它的 startTask()
方法来执行任务。
因此先来看看 Worker
对象是长啥样的:
其实他自己也是一个线程,将接收到须要执行的任务存放到成员变量 task
处。
而其中最为关键的则是执行任务 worker.startTask()
这一步骤。
public void startTask() {
thread.start();
}
复制代码
其实就是运行了 worker
线程本身,下面来看 run
方法。
task.run
),接着会一直不停的从队列里获取任务执行,直到获取不到新任务了。workers.remove(this)
)。其实 getTask
也是很是关键的一个方法,它封装了从队列中获取任务,同时对不须要保活的线程进行回收。
很明显,核心做用就是从队列里获取任务;但有两个地方须要注意:
null
以后在上文的 run()
中就会退出这个线程;从而达到了回收线程的目的,也就是咱们以前演示的效果
workers.size() > miniSize
条件屡次执行,从而致使线程被所有回收完毕。最后来谈谈线程关闭的事;
仍是以刚才那段测试代码为例,若是提交任务后咱们没有关闭线程,会发现即使是任务执行完毕后程序也不会退出。
从刚才的源码里其实也很容易看出来,不退出的缘由是 Worker
线程必定还会一直阻塞在 task = workQueue.take();
处,即使是线程缩容了也不会小于核心线程数。
经过堆栈也能证实:
刚好剩下三个线程阻塞于此处。
而关闭线程一般又有如下两种:
咱们先来看第一种当即关闭
:
/** * 当即关闭线程池,会形成任务丢失 */
public void shutDownNow() {
isShutDown.set(true);
tryClose(false);
}
/** * 关闭线程池 * * @param isTry true 尝试关闭 --> 会等待全部任务执行完毕 * false 当即关闭线程池--> 任务有丢失的可能 */
private void tryClose(boolean isTry) {
if (!isTry) {
closeAllTask();
} else {
if (isShutDown.get() && totalTask.get() == 0) {
closeAllTask();
}
}
}
/** * 关闭全部任务 */
private void closeAllTask() {
for (Worker worker : workers) {
//LOGGER.info("开始关闭");
worker.close();
}
}
public void close() {
thread.interrupt();
}
复制代码
很容易看出,最终就是遍历线程池里全部的 worker
线程挨个执行他们的中断函数。
咱们来测试一下:
能够发现后面丢进去的三个任务实际上是没有被执行的。
而正常关闭则不同:
/** * 任务执行完毕后关闭线程池 */
public void shutdown() {
isShutDown.set(true);
tryClose(true);
}
复制代码
他会在这里多了一个判断,须要全部任务都执行完毕以后才会去中断线程。
同时在线程须要回收时都会尝试关闭线程:
来看看实际效果:
上文或多或少提到了线程回收的事情,其实总结就是如下两点:
shutdown/shutdownNow
方法都会将线程池的状态置为关闭状态,这样只要 worker
线程尝试从队列里获取任务时就会直接返回空,致使 worker
线程被回收。
null
时就会触发回收。
但若是咱们的队列足够大,致使线程数都不会超过核心线程数,这样是不会触发回收的。
好比这里我将队列大小调为 10 ,这样任务就会累计在队列里,不会建立五个 worker
线程。
因此一直都是 Thread-1~3
这三个线程在反复调度任务。
本次实现了线程池里大部分核心功能,我相信只要看完并动手敲一遍必定会对线程池有不同的理解。
结合目前的内容来总结下:
shutdownNow()
方法关闭线程池,会致使任务丢失(除非业务容许)。keepalive
值,使得线程尽可能不被回收从而能够复用线程。同时下次会分享一些线程池的新特性,如:
本文全部源码:
你的点赞与分享是对我最大的支持