(手机横屏看源码更方便)ide
(1)本身动手写的线程池如何支持带返回值的任务呢?函数
(2)若是任务执行的过程当中抛出异常了该怎么处理呢?源码分析
上一章咱们本身动手写了一个线程池,可是它是不支持带返回值的任务的,那么,咱们本身可否实现呢?必须能够,今天咱们就一块儿来实现带返回值任务的线程池。测试
首先,让咱们先回顾一下上一章写的线程池:this
(1)它包含四个要素:核心线程数、最大线程数、任务队列、拒绝策略;spa
(2)它具备执行无返回值任务的能力;线程
(3)它没法处理有返回值的任务;设计
(4)它没法处理任务执行的异常(线程中的异常不会抛出到线程外);code
那么,咱们能不能在现有的基础上实现其下面两项能力呢?让咱们一块儿来试一试吧!cdn
答案很明显,就是一个有返回值,一个无返回值,用伪代码来表示就是下面这样:
// 无返回值
threadPool.execute(()->{
System.out.println(1);
});
// 有返回值,分两步走
// 1. 提交任务到线程池中
SomeClass result = threadPool.execute(()->{
System.out.println(1);
return 1;
});
// 2. 等待任务的结果返回
Object value = result.get();
复制代码
无返回值的任务提交了就完事,主线程并不Care它到底有没有执行完,并不关心它是否是抛出异常,主线程Just提交线程到线程池中,其他什么都无论。
有返回值的任务就不同了,主线程首先要提交任务到线程池中,它须要使用到任务执行的结果,因此它必须等待任务执行完毕才能拿到任务执行的结果。
那么,为何不直接在execute的时候就等待任务执行完毕呢?这样的话那不就跟串行没啥区别了,还不如直接在主线程执行任务呢,还少了线程切换的资源消耗。
之因此要分红两步,是由于主线程并不必定须要当即获取返回值,在须要用到返回值的时候才去get,这样就能够在提交任务和获取返回值之间干些其它的事情,提升效率。
因此,提交任务的时候不须要阻塞,get返回值的时候才可能须要阻塞,若是get的时候任务已经执行完毕了,这时候也不须要阻塞,若是get的时候任务还未执行完毕,那就要阻塞等待任务执行完毕才能获取到返回值。
首先,无返回值的任务咱们直接使用的Runnable函数式接口,有返回值的任务有没有现成的接口呢?还真有,那就是Callable接口,它有个返回值。
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}复制代码
其次,提交任务的时候须要有个返回值,它是在未来用来获取任务执行结果的,实际上它也是新任务的一种能力,可使用它对任务进行包装,使其具备返回值的能力。
public interface Future<T> {
T get();
}复制代码
再次,咱们须要给现有的线程池增长一种新的能力,根据单一职责原则,咱们定义一个新的接口来承载这种能力。
public interface FutureExecutor extends Executor {
<T> Future<T> submit(Callable<T> command);
}复制代码
而后,咱们须要一种新的任务,它既具备旧任务的执行能力(run()方法),又具备新任务的返回值能力(get()方法),因此咱们造一个“未来的任务”对提交的任务进行包装,使其具备返回值的能力。
public class FutureTask<T> implements Runnable, Future<T> {
/**
* 真正的任务
*/
private Callable<T> task;
public FutureTask(Callable<T> task) {
this.task = task;
}
@Override
public void run() {
// 具体实现...
}
@Override
public T get() {
// 具体实现...
}
}复制代码
最后,咱们只要对原有的线程池进行扩展,将提交的任务包装成“未来获取返回值的任务”,仍是使用原来的方法去执行,而后返回这个未来的任务便可。
根据开闭原则,【本篇文章由公众号“彤哥读源码”原创】原来的代码咱们不作任何修改,扩展新的子类来实现新的能力。
public class MyThreadPoolFutureExecutor extends MyThreadPoolExecutor implements FutureExecutor, Executor {
public MyThreadPoolFutureExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {
super(name, coreSize, maxSize, taskQueue, rejectPolicy);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
// 包装成未来获取返回值的任务
FutureTask<T> futureTask = new FutureTask<>(task);
// 仍是使用原来的执行能力
execute(futureTask);
// 返回未来的任务,只须要返回其get返回值的能力便可
// 因此这里返回的是Future而不是FutureTask类型
return futureTask;
}
}复制代码
好了,到这里总体的逻辑咱们就已经比较清晰地实现完了,还剩下最关键的部分,这个“未来的任务”的两个能力要如何实现。
未来的任务,具备两个能力:一是执行真正任务的能力,二是未来获取返回值的能力。
public class FutureTask<T> implements Runnable, Future<T> {
@Override
public void run() {
// 具体实现...
}
@Override
public T get() {
// 具体实现...
}
}复制代码
首先,咱们要明确一件事,任务的执行是线程池中,获取返回值是在主线程中,它们是在两个线程中执行的,并且谁先谁后咱们没法肯定。
其次,若是run()在get()以前执行,咱们须要告诉get()任务已经执行完毕了,因此须要一个状态来通知这个事,还须要一个变量来承载任务执行的返回值。
/**
* 任务执行的状态,0未开始,1正常完成,2异常完成
* 也可使用volatile+Unsafe实现CAS操做
*/
private AtomicInteger state = new AtomicInteger(NEW);
private static final int NEW = 0;
private static final int FINISHED = 1;
private static final int EXCEPTION = 2;
/**
* 任务执行的结果【本篇文章由公众号“彤哥读源码”原创】
* 若是执行正常,返回结果为T
* 若是执行异常,返回结果为Exception
*/
private Object result;复制代码
再次,若是get()在run()以前执行,那就须要阻塞等待run()执行完毕才能拿到返回值,因此须要保存调用者(主线程),get()的时候park阻塞住,run()完成了unpark唤醒它来拿返回值。
/**
* 调用者线程
* 也可使用volatile+Unsafe实现CAS操做
*/
private AtomicReference<Thread> caller = new AtomicReference<>();复制代码
而后,咱们先来看看run()方法的逻辑,它其实就是先执行真正的任务,而后修改状态为完成,并保存任务的返回值,若是保存了主线程,还要唤醒它。
@Override
public void run() {
// 若是状态不是NEW,说明执行过了,直接返回
if (state.get() != NEW) {
return;
}
try {
// 执行任务【本篇文章由公众号“彤哥读源码”原创】
T r = task.call();
// CAS更新state的值为FINISHED
// 若是更新成功,就把r赋值给result
// 若是更新失败,说明state的值不为NEW了,也就是任务已经执行过了
if (state.compareAndSet(NEW, FINISHED)) {
this.result = r;
// finish()必须放在修改state里面,见下面的分析
finish();
}
} catch (Exception e) {
// 若是CAS更新state的值为EXCEPTION成功,就把e赋值给result
// 若是CAS更新失败,说明state的值不为NEW了,也就是任务已经执行过了
if (state.compareAndSet(NEW, EXCEPTION)) {
this.result = e;
// finish()必须放在修改state里面,见下面的分析
finish();
}
}
}
private void finish() {
// 检查调用者是否为空,若是不为空,唤醒它
// 调用者在调用get()方法的进入阻塞状态
for (Thread c; (c = caller.get()) != null;) {
if (caller.compareAndSet(c, null)) {
LockSupport.unpark(c);
}
}
}复制代码
最后,咱们再看看get()方法,若是任务还未执行,就阻塞等待任务的执行;若是任务已经执行完毕了,直接拿返回值便可;可是,还有一种状况,get()方法执行的过程当中run()方法也在执行,因此get()方法中的每一步都要检查状态的值有没有变化。
@Override
public T get() {
int s = state.get();
// 若是任务还未执行完成,判断当前线程是否要进入阻塞状态
if (s == NEW) {
// 标识调用者线程是否被标记过
boolean marked = false;
for (;;) {
// 从新获取state的值
s = state.get();
// 若是state大于NEW说明完成了,跳出循环
if (s > NEW) {
break;
// 此处必须把caller的CAS更新和park()方法分红两步处理,不能把park()放在CAS里面
} else if (!marked) {
// 尝试更新调用者线程
// 试想断点停在此处【本篇文章由公众号“彤哥读源码”原创】
// 此时state为NEW,让run()方法执行到底,它不会执行finish()中的unpark()方法
// 这时打开断点,这里会更新caller成功,可是循环从头再执行一遍发现state已经变了,
// 直接在上面的if(s>NEW)处跳出循环了,由于finish()在修改state内部
marked = caller.compareAndSet(null, Thread.currentThread());
} else {
// 调用者线程更新以后park当前线程
// 试想断点停在此处
// 此时state为NEW,让run()方法执行到底,由于上面的caller已经设置值了,
// 因此会执行finish()方法中的unpark()方法,
// 这时再打开断点,这里不会park信
// 见unpark()方法的注释,上面写得很清楚:
// 若是线程执行了park()方法,那么执行unpark()方法会唤醒那个线程
// 若是先执行了unpark()方法,那么线程下一次执行park()方法将不会阻塞
LockSupport.park();
}
}
}
if (s == FINISHED) {
return (T) result;
}
throw new RuntimeException((Throwable) result);
}复制代码
在咱们的实现中,若是任务执行的过程抛出异常了,也是经过result返回给主线程,这样主线程就拿到了这个异常,它就能够作相应的处理了。
好了,完整的实现到此结束,不知道你领悟了没有。
最后奉上测试代码:
public class MyThreadPoolFutureExecutorTest {
public static void main(String[] args) {
FutureExecutor threadPool = new MyThreadPoolFutureExecutor("test", 2, 4, new ArrayBlockingQueue<>(6), new DiscardRejectPolicy());
List<Future<Integer>> list = new ArrayList<>();
for (int i = 0; i < 100; i++) {
int num = i;
Future<Integer> future = threadPool.submit(() -> {
Thread.sleep(1000);
System.out.println("running: " + num);
return num;
});
list.add(future);
}
for (Future<Integer> future : list) {
System.out.println("runned: " + future.get());
}
}
}复制代码
运行结果:
thread name: core_test2
thread name: test4
thread name: test3
discard one task
thread name: core_test1
discard one task
...省略被拒绝的任务
【本篇文章由公众号“彤哥读源码”原创】
discard one task
running: 0
running: 1
running: 8
running: 9
runned: 0
runned: 1
running: 4
running: 2
running: 3
running: 5
runned: 2
runned: 3
runned: 4
runned: 5
running: 6
running: 7
runned: 6
runned: 7
runned: 8
runned: 9
复制代码
(1)有返回值的任务是经过包装成未来的任务来实现的,这个任务既具备基本的执行能力,又具备未来获取返回值的能力;
(2)任务执行的异常跟任务正常的返回值是经过同一个返回值返回到主线程的,主线程根据状态判断是异常仍是正常值;
(3)咱们的实现中运用了单一职责原则、开闭原则等设计原则,对原有代码没有形成任何的入侵;
手写线程池目前只打算写这两章,后面开始进入jdk原生线程池的源码分析,敬请期待。
另外,须要手写线程池完整源码的同窗请关注个人公众号“彤哥读源码”,在后台回复“MyThreadPool”(不带引号)便可领取手写线程池完整源码,注意大小写不要弄错哦,不然彤哥是不会给你的哈。
欢迎关注个人公众号“彤哥读源码”,查看更多源码系列文章, 与彤哥一块儿畅游源码的海洋。