1:使用java
package com.huiguan.facade.mobile; import java.util.concurrent.*; /** * @author chengwei * @since $$Revision:1.0.0, $$Date: 2018/4/26 11:39 $$ */ public class FutureDemo { public static void main(String[] args) { try { //建立线程池,此线程池建立为非推荐方式 ExecutorService threadPool = Executors.newCachedThreadPool(); //task是线程 Task task = new Task(); FutureTask<String> futureTask = new FutureTask<String>(task); //submit运行有返回值的线程,execute运行无返回值的线程 threadPool.submit(futureTask); //get方法会阻塞,直到task线程运行结束,才会获取返回值 String s = futureTask.get(); System.out.println(s); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } package com.huiguan.facade.mobile; import java.util.concurrent.Callable; /** * @author chengwei * @since $$Revision:1.0.0, $$Date: 2018/4/26 11:39 $$ */ public class Task implements Callable { @Override public String call() throws Exception { //线程休眠1s,方便观察FutureTask.get的阻塞现象 Thread.sleep(1000); return "qqqqqqq"; } }
2. 解析FutureTask.get()源码算法
2.1 原理安全
当futuretask任务线程未运行结束时,调用get方法时会把futuretask放入链表中,使用LockSupport.park将当前主线程置为休眠状态;当futuretask任务线程运行结束,会遍历链表,使用LockSupport.unpark唤醒主线程,主线程获取futuretask线程返回的值。ide
2.2 源码ui
Futureta各类状态this
* Possible state transitions: //Futuretask运行时各类状态转化 * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; //建立状态 private static final int NEW = 0; //运行中 private static final int COMPLETING = 1; //正常结束 private static final int NORMAL = 2; //异常结束 private static final int EXCEPTIONAL = 3; //取消 private static final int CANCELLED = 4; //中断 private static final int INTERRUPTING = 5; //中断结束 private static final int INTERRUPTED = 6;
java.util.concurrent.FutureTask#get()spa
public V get() throws InterruptedException, ExecutionException { int s = state; //当是建立或者运行中状态时,休眠,等待task运行结束 if (s <= COMPLETING) s = awaitDone(false, 0L); //运行结束,获取返回值 return report(s); }
java.util.concurrent.FutureTask#awaitDone线程
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { //线程中断,抛出异常 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; //非建立和运行中状态,直接返回状态值 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } //当是运行中状态时,让出cpu,由本身和其余线程从新竞争 else if (s == COMPLETING) // cannot time out yet Thread.yield(); //新建一个等待链表,等待链表是任务线程处于运行中,把主线程放入等待链表,等待任务线程运行结束 else if (q == null) q = new WaitNode(); //用CAS算法,将当前主线程放入等待链表waiters中,CAS算法:compareAndSwapObject(this,vOffset,A,B)内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改成B,不然什么都不作。能够保证线程安全。 else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } //LockSupport.park相似于wait,使当前线程等待 else LockSupport.park(this); } }
java.util.concurrent.FutureTask#report 获取返回值code
/** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) //正常结束,获取返回值 return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
java.util.concurrent.FutureTask#run内存
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) //运行结束,返回返回值,唤醒等待的线程 set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
java.util.concurrent.FutureTask#set
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { //设置返回值 outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state //唤醒等待线程 finishCompletion(); } }
java.util.concurrent.FutureTask#finishCompletion
private void finishCompletion() { // assert state > COMPLETING; //遍历waiters链表 for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; //唤醒等待线程 LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }