最近在看并发包的源码把本身的理解分享给你们啦,有不正确的地方欢迎你们指正。 FutureTask类中的waiters成员变量保存着调用get方法获取FutureTask计算结果的线程构成的一个栈。 当FutureTask类run方法没有执行完时,调用get方法的线程会造成一个阻塞的栈,即waiters。并发
static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
一旦FutureTask类run方法执行完、执行中出现异常或者是调用cancel方法取消执行(能够通知正在执行 FutureTask类run方法的线程响应中断,经过设置cancel方法参数mayInterruptIfRunning为true达到) 时,就必须让阻塞在waiters栈的全部线程退出阻塞。这个是经过FutureTask类的finishCompletion方法 完成的。源码以下:this
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
因此,在FutureTask类run方法中的执行完成员变量的callable的call方法时,正常执行或是执行出现异常 调用set设置执行后的结果或是setException设置执行返回出现的异常时,其内部都调用finishCompletion 方法,同理cancel方法内部也调用了finishCompletion方法。FutureTask类run方法源码以下:.net
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; /*执行callable的call方法出现异常,设置异常,setException内部调用 finishCompletion让阻塞在waiters栈上的全部要获取run方法执行结 果的线程所有中止阻塞,获得执行时的异常 */ setException(ex); } if (ran) /*正常执行完callable的call方法,设置返回结果,set内部调用finishCompletion 让阻塞在waiters栈上的全部要获取run方法执行结果的线程所有中止阻塞获得 执行的结果 */ set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
如今反过来分析一下,waiters栈是如何构成的。先看一下FutureTask类get方法,就是获得Callable 的执行结果,咱们前说的全部要获取run方法执行结果的线程,就是调用 get方法。源码以下:线程
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) /*FutureTask类run方法执行callable的call方法没完成,要获取run方法执行结果的线程阻塞构成 waiters栈 */ s = awaitDone(false, 0L); return report(s); }
FutureTask类有个get变体用于在指定时间没有获取结果抛出异常,同get相似这里不作分析,来看下 awaitDone源码:code
/** * Awaits completion or aborts on interrupt or timeout. * * [@param](https://my.oschina.net/u/2303379) timed true if use timed waits * [@param](https://my.oschina.net/u/2303379) nanos time to wait, if timed * [@return](https://my.oschina.net/u/556800) state upon completion */ private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; //自旋 for (;;) { //调用get的线程被中断,将其对应waiterNode移除,同时抛出异常,interrupted会重置中断状态 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; //FutureTask中run方法执行,即callable任务执行完,返回执行的状态 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } //FutureTask执行处于中间状态,获取结果的线程将cup执行机会让给真正要执行FutureTask类run方法的线程 else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) //将要获取结果的线程封装成对应的WaitNode,用于后面构建阻塞waiters栈 q = new WaitNode(); else if (!queued) //构建阻塞waiters栈 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else //阻塞当前获取FutureTask类执行结果的线程 LockSupport.park(this); } }
上面注解已详细分析了awaitDone方法。有问题欢迎指正。rem