FutureTask是一种可取消的异步计算任务。它实现了Future接口,表明了异步任务的返回结果。从而FutureTask能够启动和取消异步计算任务、查询异步计算任务是否完成和获取异步计算任务的返回结果。只有到异步计算任务结束时才能获取返回结果,当异步计算任务还未结束时调用get方法会使线程阻塞。一旦异步计算任务完成,计算任务不能从新启动或者取消,除非调用了runAndReset。node
FutureTask实现了RunnableFuture,RunnableFuture结合了Future和Runnable。安全
在ThreadPoolExecutor分析中咱们没有看它的父类AbstractExecutorService,其中有一个方法submit,返回一个Future,说明该方法能够获取异步任务的返回结果。该方法有三个重载,能够接收Runnable和Callable,Callable是能够返回结果的一个Runnable,而Callable就是FutureTask的一个重要的变量。异步
@FunctionalInterface public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
/** * The run state of this task, initially NEW. The run state * transitions to a terminal state only in methods set, * setException, and cancel. During completion, state may take on * transient values of COMPLETING (while outcome is being set) or * INTERRUPTING (only while interrupting the runner to satisfy a * cancel(true)). Transitions from these intermediate to final * states use cheaper ordered/lazy writes because values are unique * and cannot be further modified. * * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; /** The underlying callable; nulled out after running */ //一个能够返回结果的任务 private Callable<V> callable; /** The result to return or exception to throw from get() */ //包装返回结果或者异常,没有被volatile修饰,状态保护读写安全 private Object outcome; // non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ //运行线程 private volatile Thread runner; /** Treiber stack of waiting threads */ //单链表,是一个线程的栈的结构 private volatile WaitNode waiters;
FutureTask有7中状态,介绍一下状态之间的转换:
NEW -> COMPLETING -> NORMAL:任务正常执行;
NEW -> COMPLETING -> EXCEPTIONAL:任务发生异常;
NEW -> CANCELLED:任务被取消;
NEW -> INTERRUPTING -> INTERRUPTED:任务被中断;oop
public void run() { //若是state不为NEW,说明任务已经在执行或者取消 //若是设置运行线程失败,说明任务已经有运行线程抢在前面 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; //NEW状态才能够执行 if (c != null && state == NEW) { V result; boolean ran; try { //执行任务 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; //设置异常信息 setException(ex); } if (ran) //设置任务运行结果 set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() //将运行线程清空,在state被更改以前要保证runner非空,这样能包装run方法不被屡次执行 runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; //中断处理 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
当执行时发生异常,调用setException,首先将state设置为COMPLETING,设置成功后将outcome设置为异常,而后将state设置为EXCEPTIONAL。this
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
当callable执行成功并返回,调用set,首先将state设置为COMPLETING,设置成功后将结果设置为outcome,而后设置state为NORMAL。spa
finally中若是state为中断,调用handlePossibleCancellationInterrupt:线程
private void handlePossibleCancellationInterrupt(int s) { // It is possible for our interrupter to stall before getting a // chance to interrupt us. Let's spin-wait patiently. if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt // assert state == INTERRUPTED; // We want to clear any interrupt we may have received from // cancel(true). However, it is permissible to use interrupts // as an independent mechanism for a task to communicate with // its caller, and there is no way to clear only the // cancellation interrupt. // // Thread.interrupted(); }
若是状态一直是INTERRUPTING,稍稍等待。rest
在上面set和setException中最后都调用了finishCompletion方法:code
private void finishCompletion() { // assert state > COMPLETING; //该方法必须在state > COMPLETING时调用 //从头至尾唤醒WaitNode中阻塞的线程 for (WaitNode q; (q = waiters) != null;) { //设置栈顶为空 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; //唤醒线程 if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; //若是next为空,break if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
在调用get方法时,若是任务还在执行,线程会阻塞,FutureTask会将阻塞的线程放入waiters单链表。等待任务结束时被唤醒,咱们继续看get方法:接口
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) //若是任务还在执行,阻塞当前线程,放入waiters单链表 s = awaitDone(false, 0L); return report(s); }
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { //若是线程被中断,移除当前node,抛出异常 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; //若是任务完成或者被取消,直接返回 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } //若是任务正在执行,线程等待一下 else if (s == COMPLETING) // cannot time out yet Thread.yield(); //若是q为空,新建一个node else if (q == null) q = new WaitNode(); //若是还未入列,尝试将新建的node放入链表 else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); //若是设置了超时且超时了 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { //超时,移除node removeWaiter(q); return state; } //阻塞线程 LockSupport.parkNanos(this, nanos); } //阻塞当前线程 else LockSupport.park(this); } }
private void removeWaiter(WaitNode node) { if (node != null) { //设置节点的线程为空,作删除标记 node.thread = null; retry: for (;;) { // restart on removeWaiter race for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; //thread不为空,continue if (q.thread != null) pred = q; //thread为空且pred不为空 else if (pred != null) { //删除q pred.next = s; //检查一下pred的thread,若是被其余线程修改,retry outer loop if (pred.thread == null) // check for race continue retry; } //thread为空且pred为空说明q为栈顶,将q.next设置为栈顶,失败则retry else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } } }
get方法最后调用了report方法:
private V report(int s) throws ExecutionException { Object x = outcome; //NORMAL表示任务执行正常,返回结果 if (s == NORMAL) return (V)x; //任务被取消,抛出异常 if (s >= CANCELLED) throw new CancellationException(); //其余状况只有可能发生异常,抛出该异常 throw new ExecutionException((Throwable)x); }
最后看一下cancel方法:
public boolean cancel(boolean mayInterruptIfRunning) { //当state不为NEW说明任务已经开始,不能被取消,返回false //当设置state失败时,返回false if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; //中断线程 if (t != null) t.interrupt(); } finally { // final state //设置任务为INTERRUPTED UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; }