1. 疑惑java
Future f = executor.submit(callable);异步
线程池执行任务的时候, 为了高效,使用的都是异步执行方式;上面这种形式是异步执行的,可是使用 f.get()方法获取结果时又是阻塞的, 这是怎么实现的呢?ide
2. 分析this
从使用方式上咱们能够推测出几点:线程
进入线程池中的任务(callable), 应该被放到Future中执行了;code
Future执行任务是在新的线程中执行(这样才不会阻塞原来的线程);orm
get方法调用时,阻塞主线程, 当任务执行完成后, 才取消主线程的阻塞;get
3. 实现it
/** * future有几种状态:new(新建立), completing(正在执行), normal(执行完毕) */ public class MyFuture<T> implements Runnable { private int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private Callable<T> callable; private Object result; private WaitNode waiters; public MyFuture(Callable<T> callable) { this.callable = callable; this.state = NEW; } public T get() throws InterruptedException, ExecutionException{ int s = state; //若不是‘完成’状态, 则阻塞当前线程 if (s <= COMPLETING) { s = await(); } return report(s); } private int await() { WaitNode q = null; boolean queued = false; for(;;) { if (state > COMPLETING) { //若是是完成状态, 则直接返回 return state; } else if (q == null) { q = new WaitNode(); } else if (!queued) { queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); } else { //阻塞当前线程 LockSupport.park(this); } } } private T report(int s) throws ExecutionException{ Object x = result; if (s == NORMAL) { return (T)x; } throw new ExecutionException((Throwable)x); } @Override public void run() { //此方法是有线程池中线程调用, 主线程阻塞, 不会影响这个方法的执行 try { if (callable != null && state == NEW) { //任务的执行 T v = callable.call(); setResult(v); } } catch (Exception e) { e.printStackTrace(); } } private void setResult(T result) { if (UNSAFE.compareAndSwapLong(this, stateOffset, state, COMPLETING)) { this.result = result; UNSAFE.compareAndSwapLong(this, stateOffset, state, NORMAL); finishComplete(); } } private void finishComplete() { for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; //取消阻塞 LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; q = next; } break; } } callable = null; } static final class WaitNode { Thread thread; WaitNode next; WaitNode() { this.thread = Thread.currentThread(); } } private static sun.misc.Unsafe UNSAFE; private static long stateOffset; private static long waitersOffset; static { try { Field f = sun.misc.Unsafe.class.getDeclaredField("theUnsafe"); f.setAccessible(true); UNSAFE = (Unsafe)f.get(null); Class myFutureClass = MyFuture.class; stateOffset = UNSAFE.objectFieldOffset(myFutureClass.getDeclaredField("state")); waitersOffset = UNSAFE.objectFieldOffset(myFutureClass.getDeclaredField("waiters")); } catch (Exception e) { e.printStackTrace(); } } }