前段时间写过一篇《线程池没你想的那么简单》,和你们一块儿撸了一个基本的线程池,具有:java
这些功能,最后也留下了三个待实现的 features
。git
此次就实现这三个特性来看看 j.u.c
中的线程池是如何实现这些需求的。github
再看本文以前,强烈建议先查看上文《线程池没你想的那么简单》api
你们在用线程池的时候或多或少都会有这样的需求:缓存
线程池中的任务执行完毕后再通知主线程作其余事情,好比一批任务都执行完毕后再执行下一波任务等等。安全
以咱们以前的代码为例:多线程
总共往线程池中提交了 13 个任务,直到他们都执行完毕后再打印 “任务执行完毕” 这个日志。并发
执行结果以下:异步
为了简单的达到这个效果,咱们能够在初始化线程池的时候传入一个接口的实现,这个接口就是用于任务完成以后的回调。函数
public interface Notify { /** * 回调 */ void notifyListen() ; }
以上就是线程池的构造函数以及接口的定义。
因此想要实现这个功能的关键是在什么时候回调这个接口?
仔细想一想其实也简单:只要咱们记录提交到线程池中的任务及完成的数量,他们二者的差为 0 时就认为线程池中的任务已执行完毕;这时即可回调这个接口。
因此在往线程池中写入任务时咱们须要记录任务数量:
为了并发安全的考虑,这里的计数器采用了原子的 AtomicInteger
。
而在任务执行完毕后就将计数器 -1 ,一旦为 0 时则任务任务所有执行完毕;这时即可回调咱们自定义的接口完成通知。
这样的需求在 jdk 中的 ThreadPoolExecutor
中也有相关的 API
,只是用法不太同样,但本质原理都大同小异。
咱们使用 ThreadPoolExecutor
的常规关闭流程以下:
executorService.shutdown(); while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) { logger.info("thread running"); }
线程提交完毕后执行 shutdown()
关闭线程池,接着循环调用 awaitTermination()
方法,一旦任务所有执行完毕后则会返回 true
从而退出循环。
这两个方法的目的和原理以下:
shutdown()
后会将线程池的状态置为关闭状态,这时将会中止接收新的任务同时会等待队列中的任务所有执行完毕后才真正关闭线程池。awaitTermination
会阻塞直到线程池全部任务执行完毕或者超时时间已到。为何要两个 api
结合一块儿使用呢?
主要还在最终的目的是:全部线程执行完毕后再作某件事情,也就是在线程执行完毕以前其实主线程是须要被阻塞的。
shutdown()
执行后并不会阻塞,会当即返回,全部才须要后续用循环不停的调用 awaitTermination()
,由于这个 api 才会阻塞线程。
其实咱们查看源码会发现,ThreadPoolExecutor
中的阻塞依然也是等待通知机制的运用,只不过用的是 LockSupport
的 API
而已。
接下来是带有返回值的线程,这个需求也很是常见;好比须要线程异步计算某些数据而后获得结果最终汇总使用。
先来看看如何使用(和 jdk 的相似):
首先任务是不能实现 Runnable
接口了,毕竟他的 run()
函数是没有返回值的;因此咱们改实现一个 Callable
的接口:
这个接口有一个返回值。
同时在提交任务时也稍做改动:
首先是执行任务的函数由 execute()
换为了 submit()
,同时他会返回一个返回值 Future
,经过它即可拿到线程执行的结果。
最后经过第二步将全部执行结果打印出来:
再看具体实现以前先来思考下这样的功能如何实现?
jdk
的线程 api
的规范,要执行一个线程不论是实现接口仍是继承类,最终都是执行的 run()
函数。run()
函数时去调用一个有返回值的方法,再将这个返回值存放起来用于后续使用。好比咱们这里新建了一个 Callable<T>
的接口:
public interface Callable<T> { /** * 执行任务 * @return 执行结果 */ T call() ; }
它的 call
函数就是刚才提到的有返回值的方法,因此咱们应当在线程的 run() 函数中去调用它。
接着还会有一个 Future
的接口,他的主要做用是获取线程的返回值,也就是 再将这个返回值存放起来用于后续使用
这里提到的后续使用。
既然有了接口那天然就得有它的实现 FutureTask
,它实现了 Future
接口用于后续获取返回值。
同时实现了 Runnable
接口会把本身变为一个线程。
因此在它的 run()
函数中会调用刚才提到的具备返回值的 call()
函数。
再次结合 submit()
提交任务和 get()
获取返回值的源码来看会更加理解这其中的门道。
/** * 有返回值 * * @param callable * @param <T> * @return */ public <T> Future<T> submit(Callable<T> callable) { FutureTask<T> future = new FutureTask(callable); execute(future); return future; }
submit()
很是简单,将咱们丢进来的 Callable
对象转换为一个 FutureTask
对象,而后再调用以前的 execute()
来丢进线程池(后续的流程就和一个普通的线程进入线程池的流程同样)。
FutureTask 自己也是线程,因此能够直接使用
execute()
函数。
而 future.get()
函数中 future
对象因为在 submit()
中返回的真正对象是 FutureTask
,因此咱们直接看其中的源码就好。
因为 get()
在线程没有返回以前是一个阻塞函数,最终也是经过 notify.wait()
使线程进入阻塞状态来实现的。
而使其从 wait()
中返回的条件必然是在线程执行完毕拿到返回值的时候才进行唤醒。
也就是图中的第二部分;一旦线程执行完毕(callable.call()
)就会唤醒 notify
对象,这样 get
方法也就能返回了。
一样的道理,ThreadPoolExecutor
中的原理也是相似,只不过它考虑的细节更多因此看起来很复杂,但精简代码后核心也就是这些。
甚至最终使用的 api 看起来都是相似的:
最后一个是一些新手使用线程池很容易踩坑的一个地方:那就是异常处理。
好比相似于这样的场景:
建立了只有一个线程的线程池,这个线程只作一件事,就是一直不停的 while 循环。
可是循环的过程当中不当心抛出了一个异常,巧的是这个异常又没有被捕获。你以为后续会发生什么事情呢?
是线程继续运行?仍是线程池会退出?
经过现象来看其实哪一种都不是,线程既没有继续运行同时线程池也没有退出,会一直卡在这里。
当咱们 dump
线程快照会发现:
这时线程池中还有一个线程在运行,经过线程名称会发现这是新建立的一个线程(以前是Thread-0
,如今是 Thread-1
)。
它的线程状态为 WAITING
,经过堆栈发现是卡在了 CustomThreadPool.java:272
处。
就是卡在了从队列里获取任务的地方,因为此时的任务队列是空的,因此他会一直阻塞在这里。
看到这里,以前关注的朋友有没有似曾相识的感受。
没错,我以前写过两篇:
线程池相关的问题,当时的讨论也很是“激烈”
,其实最终的缘由和这里是如出一辙的。
因此就此次简版的代码来看看其中的问题:
如今又简化了一版代码我以为以前还有疑问的朋友此次应该会更加明白。
其实在线程池内部会对线程的运行捕获异常,但它并不会处理,只是用于标记是否执行成功;
一旦执行失败则会回收掉当前异常的线程,而后从新建立一个新的 Worker
线程继续从队列里取任务而后执行。
因此最终才会卡在从队列中取任务
处。
其实 ThreadPoolExecutor
的异常处理也是相似的,具体的源码就很少分析了,在上面两篇文章中已经说过几回。
因此咱们在使用线程池时,其中的任务必定要作好异常处理。
这一波下来我以为线程池搞清楚没啥问题了,总的来看它内部运用了很是多的多线程解决方案,好比:
ReentrantLock
重入锁来保证线程写入的并发安全。最后也学会了:
最后本文全部源码(结合其中的测试代码使用):
你的点赞与分享是对我最大的支持