上一篇内容写了Java
中线程池的实现原理及源码分析,说好的是实实在在的大知足,想经过一篇文章让你们对线程池有个透彻的了解,可是文章写完总以为还缺点什么?java
上篇文章只提到线程提交的execute()
方法,并无讲解线程提交的submit()
方法,submit()
有一个返回值,能够获取线程执行的结果Future<T>
,这一讲就那深刻学习下submit()
和FutureTask
实现原理。数据结构
我能想到的使用场景就是在并行计算的时候,例如一个方法中调用methodA()、methodB()
,咱们能够经过线程池异步去提交方法A、B,而后在主线程中获取组装方法A、B计算后的结果,可以大大提高方法的吞吐量。多线程
/** * @author wangmeng * @date 2020/5/28 15:30 */ public class FutureTaskTest { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService threadPool = Executors.newCachedThreadPool(); System.out.println("====执行FutureTask线程任务===="); Future<String> futureTask = threadPool.submit(new Callable<String>() { @Override public String call() throws Exception { System.out.println("FutureTask执行业务逻辑"); Thread.sleep(2000); System.out.println("FutureTask业务逻辑执行完毕!"); return "欢迎关注: 一枝花算不算浪漫!"; } }); System.out.println("====执行主线程任务===="); Thread.sleep(1000); boolean flag = true; while(flag){ if(futureTask.isDone() && !futureTask.isCancelled()){ System.out.println("FutureTask异步任务执行结果:" + futureTask.get()); flag = false; } } threadPool.shutdown(); } }
上面的使用很简单,submit()
内部传递的其实是个Callable
接口,咱们本身实现其中的call()
方法,咱们经过futureTask
既能够获取到具体的返回值。异步
submit()
是也是提交任务到线程池,只是它能够获取任务返回结果,返回结果是经过FutureTask
来实现的,先看下ThreadPoolExecutor
中代码实现:ide
public class ThreadPoolExecutor extends AbstractExecutorService { public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } } public abstract class AbstractExecutorService implements ExecutorService { protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } }
提交任务仍是执行execute()
方法,只是task
被包装成了FutureTask
,也就是在excute()
中启动线程后会执行FutureTask.run()
方法。源码分析
再来具体看下它执行的完整链路图:学习
上图能够看到,执行任务并返回执行结果的核心逻辑实在FutureTask
中,咱们以FutureTask.run/get
两个方法为突破口,一点点剖析FutureTask
的实现原理。this
先看下FutureTask
中部分属性:线程
public class FutureTask<V> implements RunnableFuture<V> { private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; private Callable<V> callable; private Object outcome; private volatile Thread runner; private volatile WaitNode waiters; }
当前task状态,共有7中类型。
NEW: 当前任务还没有执行
COMPLETING: 当前任务正在结束,还没有彻底结束,一种临界状态
NORMAL:当前任务正常结束
EXCEPTIONAL: 当前任务执行过程当中发生了异常。
CANCELLED: 当前任务被取消
INTERRUPTING: 当前任务中断中..
INTERRUPTED: 当前任务已中断3d
用户提交任务传递的Callable,自定义call方法,实现业务逻辑
任务结束时,outcome保存执行结果或者异常信息。
当前任务被线程执行期间,保存当前任务的线程对象引用
由于会有不少线程去get当前任务的结果,因此这里使用了一种stack数据结构来保存
咱们已经知道在线程池runWorker()
中最终会调用到FutureTask.run()
方法中,咱们就来看下它的执行原理吧:
具体代码以下:
public class FutureTask<V> implements RunnableFuture<V> { public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } }
首先是判断FutureTask
中state
状态,必须是NEW
才能够继续执行。
而后经过CAS
修改runner
引用为当前线程。
接着执行用户自定义的call()
方法,将返回结果设置到result中,result可能为正常返回也可能为异常信息。这里主要是调用set()/setException()
set()
方法的实现很简单,直接看下代码:
public class FutureTask<V> implements RunnableFuture<V> { protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); finishCompletion(); } } }
将call()
返回的数据赋值给全局变量outcome
上,而后修改state
状态为NORMAL
,最后调用finishCompletion()
来作挂起线程的唤醒操做,这个方法等到get()
后面再来说解。
接着看下代码:
public class FutureTask<V> implements RunnableFuture<V> { public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } }
若是FutureTask
中state
为NORMAL
或者COMPLETING
,说明当前任务并无执行完成,调用get()
方法会被阻塞,具体的阻塞逻辑在awaitDone()
方法:
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }
这个方法能够说是FutureTask
中最核心的方法了,一步步来分析:
若是timed
不为空,这说明指定nanos
时间还未返回结果,线程就会退出。
q
是一个WaitNode
对象,是将当前引用线程封装在一个stack
数据结构中,WaitNode
对象属性以下:
static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
接着判断当前线程是否中断,若是中断则抛出中断异常。
下面就进入一轮轮的if... else if...
判断逻辑,咱们仍是采用分支的方式去分析。
分支一:if (s > COMPLETING) {
此时get()
方法已经有结果了,不管是正常返回的结果,仍是异常、中断、取消等,此时直接返回state
状态,而后执行report()
方法。
分支二:else if (s == COMPLETING)
条件成立,说明当前任务接近完成状态,这里让当前线程再释放cpu
,进行下一轮抢占cpu
。
分支三:else if (q == null)
第一次自旋执行,WaitNode
尚未初始化,初始化q=new WaitNode();
分支四:else if (!queued){
queued
表明当前线程是否入栈,若是没有入栈则进行入栈操做,顺便将全局变量waiters
指向栈顶元素。
分支五/六:LockSupport.park
若是设置了超时时间,则使用parkNanos
来挂起当前线程,不然使用park()
通过这么一轮自旋循环后,若是执行call()
尚未返回结果,那么调用get()
方法的线程都会被挂起。
被挂起的线程会等待run()
返回结果后依次唤醒,具体的执行逻辑在finishCompletion()
中。
最终stack
结构中数据以下:
具体实现代码以下:
private void finishCompletion() { for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; q = next; } break; } } done(); callable = null; }
代码实现很简单,看过get()
方法后,咱们知道全部调用get()
方法的线程,在run()
尚未返回结果前,都会保存到一个有WaitNode
构成的statck
数据结构中,并且每一个线程都会被挂起。
这里是遍历waiters
栈顶元素,而后依次查询起next
节点进行唤醒,唤醒后的节点接着会日后调用report()
方法。
具体代码以下:
private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
这个方法很简单,由于执行到了这里,说明当前state
状态确定大于COMPLETING
,判断若是是正常返回,那么返回outcome
数据。
若是state
是取消状态,抛出CancellationException
异常。
若是状态都不知足,则说明执行中出现了差错,直接抛出ExecutionException
public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; }
cancel()
方法的逻辑很简单,就是修改state
状态为CANCELLED
,而后调用finishCompletion()
来唤醒等待的线程。
这里若是mayInterruptIfRunning
,就会先中断当前线程,而后再去唤醒等待的线程。
FutureTask
的实现原理其实很简单,每一个方法基本上都画了一个简单的流程图来方便当即。
后面还打算分享一个BlockingQueue
相关的源码解读,这样线程池也能够算是完结了。
在这以前可能会先分享一个SpringCloud
常见配置代码分析、最佳实践等手册,方便工做中使用,也是对以前看过的源码一种总结。敬请期待!
欢迎关注: