承上启下:上一篇文章小豹子讲了线程池的实例化过程,粗略介绍了线程池的状态转换;这篇文章主要讲了我运行线程池时遇到的小问题,以及
execute
方法的源码理解。java
按照咱们的规划,下一步就应该提交任务,探究线程池执行任务时的内部行为,但首先,我要提交一个任务嘛。因而,接着上一篇文章的代码,我提交了一个任务:git
@Test
public void submitTest() {
// 建立线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread();
}
}, new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("拒绝服务");
}
});
// 提交任务,该任务为睡眠 1 秒后打印 Hello
threadPoolExecutor.submit(new Callable<String>() {
@Override
public String call() throws InterruptedException {
Thread.sleep(1000L);
System.out.println("Hello");
return null;
}
});
}
复制代码
而我并无看到任何输出,程序也并无睡眠一秒,而是立刻结束了。哦对,我想起来,咱们建立的线程默认是守护线程,当全部用户线程结束以后,程序就会结束了,并不会理会是否还有守护线程在运行。那么咱们用一个简单易行的办法来解决这个问题 —— 不让用户线程结束,让它多睡一会:github
@Test
public void submitTest() throws InterruptedException {
// 建立线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread();
}
}, new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("拒绝服务");
}
});
// 提交任务,该任务为睡眠 1 秒后打印 Hello
threadPoolExecutor.submit(new Callable<String>() {
@Override
public String call() throws InterruptedException {
Thread.sleep(1000L);
System.out.println("Hello");
return null;
}
});
// 使主线程休眠 5 秒,防止守护线程意外退出
Thread.sleep(5000L);
}
复制代码
然而,程序等待 5 秒以后,依旧没有输出。个人第一个反应是,我对于线程池的用法不对。是否是还须要调用某个方法来“激活”或者“启动”线程池?而不管在文档中,仍是各博客的例子中,我都没有找到相似的方法。咱们仔细思考一下这个 Bug,产生这样问题的可能缘由有三:安全
ThreadPoolExecutor
内部代码有问题ThreadPoolExecutor
的使用方法不对ThreadFactory
或 RejectedExecutionHandler
有问题缘由 1,可能性过小,几乎没有。那么缘由二、3,咱们如今无法排除,因而我尝试构建一个最小可重现错误,将 ThreadPoolExecutor
剥离出来,看 Bug 是否重现:bash
最小可重现(minimal reproducible)这个思想是我在翻译《使用 Rust 开发一个简单的 Web 应用,第 4 部分 —— CLI 选项解析》时,做者用到的思想。就是在咱们没法定位 Bug 时,剥离出当前代码中咱们认为无关的部分,剥离后观察 Bug 是否重现,一步步缩小 Bug 的范围。通俗的说,就是排除法。并发
private class MyThreadFactory implements ThreadFactory{
@Override
public Thread newThread(Runnable r) {
return new Thread();
}
}
@Test
public void reproducibleTest() throws InterruptedException {
new MyThreadFactory().newThread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Hello");
}
}).start();
Thread.sleep(5000L);
}
复制代码
仍是没有任何输出,不过这是一个好消息,这意味着咱们定位了问题所在:如今问题只可能出如今 MyThreadFactory
中,短短 6 行代码会有什么问题呢?哎呦(拍大腿),我没有把 Runnable r
传给 new Thread()
啊,我一直在执行一个空线程啊,怎么可能有任何输出!因而:return new Thread(r);
这样一改就行了。ide
上面的问题看似简单,但能出现这么低级的错误,值得我思考。我由于产生该错误的缘由有二:post
ThreadPoolExecutor
的原理,从语法上看 ThreadFactory
的实现类只须要传出一个 Thread
实例就好了,殊不知 Runnable r
不可或缺。因而,我决定对测试代码进行重构。此次重构,一要使线程工厂产生非守护线程,防止由于主进程的退出致使线程池中线程所有意外退出;二要对每一个操做打日志,咱们要能直观的观察到线程池在作什么,值得一提的是,对于阻塞队列的日志操做,我使用了动态代理的方式对每个方法打日志,不熟悉动态代理的童鞋能够戳我以前写的小豹子带你看源码:JDK 动态代理。测试
// import...
public class ThreadPoolExecutorTest {
/**
* 记录启动时间
*/
private final static long START_TIME = System.currentTimeMillis();
/**
* 自定义线程工厂,产生非守护线程,并打印日志
*/
private class MyThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(false);
debug("建立线程 - %s", thread.getName());
return thread;
}
}
/**
* 自定义拒绝服务异常处理器,打印拒绝服务信息
*/
private class MyRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
debug("拒绝请求,Runnable:%s,ThreadPoolExecutor:%s", r, executor);
}
}
/**
* 自定义任务,休眠 1 秒后打印当前线程名,并返回线程名
*/
private class MyTask implements Callable<String> {
@Override
public String call() throws InterruptedException {
Thread.sleep(1000L);
String threadName = Thread.currentThread().getName();
debug("MyTask - %s", threadName);
return threadName;
}
}
/**
* 对 BlockingQueue 的动态代理,实现对 BlockingQueue 的全部方法调用打 Log
*/
private class PrintInvocationHandler implements InvocationHandler {
private final BlockingQueue<?> blockingQueue;
private PrintInvocationHandler(BlockingQueue<?> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
debug("BlockingQueue - %s,参数为:%s", method.getName(), Arrays.toString(args));
Object result = method.invoke(blockingQueue, args);
debug("BlockingQueue - %s 执行完毕,返回值为:%s", method.getName(), String.valueOf(result));
return result;
}
}
/**
* 产生 BlockingQueue 代理类
* @param blockingQueue 原 BlockingQueue
* @param <E> 任意类型
* @return 动态代理 BlockingQueue,执行任何方法时会打 Log
*/
@SuppressWarnings("unchecked")
private <E> BlockingQueue<E> debugQueue(BlockingQueue<E> blockingQueue) {
return (BlockingQueue<E>) Proxy.newProxyInstance(this.getClass().getClassLoader(),
new Class<?>[]{BlockingQueue.class},
new PrintInvocationHandler(blockingQueue));
}
/**
* 实例化一个 核心池为 3,最大池为 5,存活时间为 20s,利用上述阻塞队列、线程工厂、拒绝服务处理器的线程池实例
* @return 返回 ThreadPoolExecutor 实例
*/
private ThreadPoolExecutor newTestPoolInstance() {
return new ThreadPoolExecutor(3, 5, 20,
TimeUnit.SECONDS, debugQueue(new LinkedBlockingQueue<>()),
new MyThreadFactory(), new MyRejectedExecutionHandler());
}
/**
* 向控制台打印日志,自动输出时间,线程等信息
* @param info
* @param arg
*/
private void debug(String info, Object... arg) {
long time = System.currentTimeMillis() - START_TIME;
System.out.println(String.format(((double) time / 1000) + "-" + Thread.currentThread().getName() + "-" + info, arg));
}
/**
* 测试实例化操做
*/
private void newInstanceTest() {
newTestPoolInstance();
}
/**
* 测试提交操做,提交 10 次任务
*/
private void submitTest() {
ThreadPoolExecutor threadPool = newTestPoolInstance();
for (int i = 0; i < 10; i++) {
threadPool.submit(new MyTask());
}
}
public static void main(String[] args) {
ThreadPoolExecutorTest test = new ThreadPoolExecutorTest();
test.submitTest();
}
}
复制代码
编译,运行 =>ui
0.047-main-建立线程 - Thread-0
0.064-main-建立线程 - Thread-1
0.064-main-建立线程 - Thread-2
0.064-main-BlockingQueue - offer,参数为:[java.util.concurrent.FutureTask@4d7e1886]
0.064-main-BlockingQueue - offer 执行完毕,返回值为:true
0.064-main-BlockingQueue - offer,参数为:[java.util.concurrent.FutureTask@3cd1a2f1]
0.065-main-BlockingQueue - offer 执行完毕,返回值为:true
0.065-main-BlockingQueue - offer,参数为:[java.util.concurrent.FutureTask@2f0e140b]
0.065-main-BlockingQueue - offer 执行完毕,返回值为:true
0.065-main-BlockingQueue - offer,参数为:[java.util.concurrent.FutureTask@7440e464]
0.065-main-BlockingQueue - offer 执行完毕,返回值为:true
0.065-main-BlockingQueue - offer,参数为:[java.util.concurrent.FutureTask@49476842]
0.065-main-BlockingQueue - offer 执行完毕,返回值为:true
0.065-main-BlockingQueue - offer,参数为:[java.util.concurrent.FutureTask@78308db1]
0.065-main-BlockingQueue - offer 执行完毕,返回值为:true
0.065-main-BlockingQueue - offer,参数为:[java.util.concurrent.FutureTask@27c170f0]
0.065-main-BlockingQueue - offer 执行完毕,返回值为:true
1.065-Thread-1-MyTask - Thread-1
1.065-Thread-0-MyTask - Thread-0
1.065-Thread-2-MyTask - Thread-2
1.065-Thread-1-BlockingQueue - take,参数为:null
1.065-Thread-0-BlockingQueue - take,参数为:null
1.065-Thread-2-BlockingQueue - take,参数为:null
1.065-Thread-0-BlockingQueue - take 执行完毕,返回值为:java.util.concurrent.FutureTask@3cd1a2f1
1.065-Thread-2-BlockingQueue - take 执行完毕,返回值为:java.util.concurrent.FutureTask@2f0e140b
1.065-Thread-1-BlockingQueue - take 执行完毕,返回值为:java.util.concurrent.FutureTask@4d7e1886
2.065-Thread-1-MyTask - Thread-1
2.065-Thread-2-MyTask - Thread-2
2.065-Thread-0-MyTask - Thread-0
2.065-Thread-1-BlockingQueue - take,参数为:null
2.065-Thread-2-BlockingQueue - take,参数为:null
2.065-Thread-0-BlockingQueue - take,参数为:null
2.065-Thread-1-BlockingQueue - take 执行完毕,返回值为:java.util.concurrent.FutureTask@7440e464
2.065-Thread-2-BlockingQueue - take 执行完毕,返回值为:java.util.concurrent.FutureTask@49476842
2.065-Thread-0-BlockingQueue - take 执行完毕,返回值为:java.util.concurrent.FutureTask@78308db1
3.066-Thread-1-MyTask - Thread-1
3.066-Thread-2-MyTask - Thread-2
3.066-Thread-0-MyTask - Thread-0
3.066-Thread-2-BlockingQueue - take,参数为:null
3.066-Thread-1-BlockingQueue - take,参数为:null
3.066-Thread-0-BlockingQueue - take,参数为:null
3.066-Thread-2-BlockingQueue - take 执行完毕,返回值为:java.util.concurrent.FutureTask@27c170f0
4.067-Thread-2-MyTask - Thread-2
4.067-Thread-2-BlockingQueue - take,参数为:null
复制代码
日志的格式是:时间(秒)-线程名-信息
从日志输出中,咱们能够获知:
take
方法阻塞(这一点由日志后几行能够看出,只有调用日志,没有调用完成的日志)由此,我产生一个疑问:为何始终只有三个线程?个人设置不是“核心池为 3,最大池为 5”吗?为何只有三个线程在工做呢?
终于开始看源码了,咱们以 submit
为切入点,探寻咱们提交任务时,线程池作了什么,submit
方法自己很简单,就是将传入参数封装为 RunnableFuture
实例,而后调用 execute
方法,如下给出 submit
多个重载方法其中之一:
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
复制代码
那么,咱们继续看 execute
的代码:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
复制代码
咱们首先解释一下 addWorker
方法,暂时咱们只须要了解几件事情就能够理解 execute
代码了:
true
,不然返回 false
那么咱们回过头来理解 execute
代码:
为了帮助理解,我根据代码逻辑画了一个流程图:
如今我明白了,只有等待队列插入失败(如达到容量上限等)状况下,才会建立非核心线程来处理任务,也就是说,咱们使用的 LinkedBlockingQueue
队列来做为等待队列,那是看不到非核心线程被建立的现象的。
有心的读者可能注意到了,整个过程没有加锁啊,怎样保证并发安全呢?咱们观察这段代码,其实不必所有加锁,只须要保证 addWorker
、remove
和 workQueue.offer
三个方法的线程安全,该方法就不必加锁。事实上,在 addWorker
中是有对线程池状态的 recheck 的,若是建立失败会返回 false。
小豹子仍是一个大三的学生,小豹子但愿你能“批判性的”阅读本文,对本文内容中不正确、不稳当之处进行严厉的批评,小豹子感激涕零。