1.上文介绍了Future的使用,Future通常搭配Callable来使用,通常咱们使用Thread或者ExecutorService来执行,并返回执行结果Future;java
2.在JDK中,FutureTask实现了Future,而且封装了Runnable和Callable两种形式的任务;node
3.该源码的环境是一个FutureTask被Thread A执行,一个Thread List 等待调用它的执行结果,因此它的方法分为2部分,一部分是线程A执行的run(),另外一部分是供Thread List执行的get()、cancel等,其中get()是经过循环等待、yield等阻塞等待线程的执行结果的;并发
4.下面将从成员变量、构造方法、get()、get(long,TimeUnit)、run()、cancel(boolean)方法来剖析它的运行机制;函数
1.FutureTask的成员变量包括:state执行状态、callable、outcomethis
/**volatile变量,用来表示当前Futuretask的状态,初始状态为null,状态切换到终结状态,仅仅能经过 set、setException和cancel方法,在完成的期间,状态多是completing或interrupting,这些 转换使用延迟写,由于这些值是独特的不能被更远的修改,下面是可能的状态转换:*/ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; /** The underlying callable; nulled out after running ,运行之后变为null*/ private Callable<V> callable; //运行结果或者异常,non-volatile,可是它是受state的值来保护的 /** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes //运行该FutureTask的线程 /** The thread running the callable; CASed during run() */ private volatile Thread runner; //等待该FutureTask结果的线程s /** Treiber stack of waiting threads */ private volatile WaitNode waiters;
2.FutureTask提供了2种构造方法来支持,其中支持Runnable是使用RunnableAdapter封装了一层:线程
//支持Callable public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } //支持Runnable public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } //使用RunnableAdapter来封装Runnable,来代替Callable public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); } //继承了Callable,用来执行Runnable,并返回结果 static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
3.内部类WaitNode,使用链表 的方式,来记录等待Future结果的线程:rest
static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
在这里,通常认为线程A中经过Thread或者线程池,执行了FutureTask.run(),而后一个List来调用FutureTask.get(),来获取执行结果,固然List中可能包含线程A,也可能不包含;code
在FutureTask中最重要的几个方法为:执行函数run、获取结果get,取消cancel,下面将会重点介绍这三个方法;orm
a)run方法分三部分:Callable.call()、set()、setException()和等待Interrupt完成的handlePossibleCancellationInterrupt()组成,set()、setException()中又包含了对waiters队列操做的finishCompletion()方法:继承
//这是线程A的执行 public void run() { //若当前Task状态不为NEW,或者CAS设置runner失败,则退出 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; //当可执行任务callable不为null,且状态为NEW,继续执行 if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { //异常状况下,设置result result = null; ran = false; setException(ex); } //正常返回的状况下,设置result if (ran) set(result); } } finally { // runner必须设置为非null直到状态被设置,防止并发执行 runner = null; // state必须重写读取,防止泄露的中断 int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
b)set()、setException()方法主要分为2部分,一部分是经过UnSafe进行CAS赋值,另外一部分是操做waiters队列:
protected void set(V v) { //CAS设置Completing以后,再赋值为Normal结束 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } } protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
c)finishCompletion是设置waiter队列为null,并设置每一个node的thread为null:
private void finishCompletion() { // assert state > COMPLETING; //循环,直至waiters为null for (WaitNode q; (q = waiters) != null;) { //CAS设置waiters为null if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; //若是当前Node不为空,设置thread为null,并解除阻塞 if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; //若是到队列末尾,则跳出循环 if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
d)特别说下done()方法,是为了子类能够自定义完成之后的逻辑,而定义的方法:
/**当task的状态转换成Done时,该Protected方法被调用,它的默认实现没有作任何事情。子类能够重载这个方法去调用完成后的Callback或记录 */ protected void done() { }
e)handlePossibleCancellationInterrupt是为了防止一种特殊场景,等待Interrupting变为Interrupted:
private void handlePossibleCancellationInterrupt(int s) { // It is possible for our interrupter to stall before getting a // chance to interrupt us. Let's spin-wait patiently. if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt }
a)get方法主要由report和awaitDone方法组成,先介绍get和report方法,report方法是用来根据task状态返回结果或者抛出异常的方法:
public V get() throws InterruptedException, ExecutionException { int s = state; //若当前task的状态为Completing,表示正在执行中,则等待task执行完成 if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } private V report(int s) throws ExecutionException { Object x = outcome; //正常返回,则返回result if (s == NORMAL) return (V)x; //若被cancel、Interrupt相关,则跑步cancel异常 if (s >= CANCELLED) throw new CancellationException(); //若为Exceptional,则抛出execution异常 throw new ExecutionException((Throwable)x); }
b)get(long timeout, @NotNull TimeUnit unit)方法,这个方法是为了防止线程内部发生了异常、死循环等无限等待的状况,而引入的超时获取方法:
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { //若是时间单元为null,则抛出异常 if (unit == null) throw new NullPointerException(); int s = state; //当为New、Completing时,继续去等待获取结果,若超时,仍未获取结果,则抛出异常 if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }
c)awaitDone方法,该方法是用来等待,并获取结果,采用的是无限循环的方法:
/**等待完成、中断,或者超时*/ private int awaitDone(boolean timed, long nanos) throws InterruptedException { //若须要超时机制,timed为true,deadline为当前时间加上超时时间 final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { //若调用future.get()方法的线程被中断了,则从waiter列表中移除该线程,并抛出异常 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; //若为normal、exceptional、cancelled、interrupting、interrupted,则返回,交由report处理 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } //若为completing,则放弃时间片,等待下次时间片 else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) //若须要等待,总会有一次进来,设置当前线程的waiterNode q = new WaitNode(); //有且仅有一次设置成功,将当前线程的waiterNode加到waiters队列中 else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); //用于超时机制,若超时,则一处waiterNode,并返回 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } //若未超时,则阻塞当前线程 LockSupport.parkNanos(this, nanos); } else //阻塞当前线程 LockSupport.park(this); } }
d)removewaiter方法,此方法写的至关精妙,我自认熟写链表,感受仍是差很多距离:
private void removeWaiter(WaitNode node) { if (node != null) { //将须要删除node的thread设置为null,作标志使用 node.thread = null; retry: for (;;) { // restart on removeWaiter race //循环waiters队列,找到被删除的node for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; //q.thread不为null,表示不是当前node if (q.thread != null) pred = q; else if (pred != null) { pred.next = s; //若是上一个thread为null,防止并发状况下的操做 if (pred.thread == null) // check for race continue retry; } //thread为null,表示要删除当前node,pred为null,表示node为首节点 else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } } }
a)cancel方法主要分为设置状态、Interrupt执行线程、finishCompletion三部分,参数MayInterruptIfRunning表示运行时是否能够Interrupt:
public boolean cancel(boolean mayInterruptIfRunning) { //若是状态为New,且当前状态指望值为New,设置为Interrupting或Cancelled成功,则不返回false if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception //若是容许终端线程,则中断线程 if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; }