A Future计算的结果。 提供方法来检查计算是否完成,等待其完成,并检索计算结果。 结果只能在计算完成后使用方法get进行检索,若有必要,阻塞,直到准备就绪。 取消由cancel方法执行。 提供其余方法来肯定任务是否正常完成或被取消。 计算完成后,不能取消计算。 若是您想使用Future ,以便不可撤销,但不提供可用的结果,则能够声明Future<?>表格的类型,并返回null做为基础任务的结果。 node
public interface Future<V> { //尝试取消执行此任务。若是任务已经完成,已经被取消或因为某些其余缘由而没法取消,则此尝试将失败。 //若是成功,而且在调用 cancel 时此任务还没有开始,则该任务永远没法运行。 //若是任务已经开始,则 mayInterruptIfRunning 参数肯定是否应中断执行该任务的线程以尝试中止该任务。 //mayInterruptIfRunning == true, 表示中断执行中的线程,false 表示让线程正常完成 boolean cancel(boolean mayInterruptIfRunning); //若是此任务在正常完成以前被取消,则返回true。 boolean isCancelled(); //若是此任务完成,则返回true。完成多是因为正常终止,异常或取消引发的,在全部这些状况下,此方法都将返回true。 boolean isDone(); //必要时等待计算完成,而后检索其结果 V get() throws InterruptedException, ExecutionException; //必要时最多等待给定时间以完成计算,而后检索其结果(若是有)。 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
Future是一个接口,提供了方法来检测当前的任务是否已经结束,还能够等待任务结束而且拿到一个结果,经过调用Future的get()方法能够当任务结束后返回一个结果值,若是工做没有结束,则会阻塞当前线程,直到任务执行完毕;能够经过调用cancel()方法来中止一个任务,若是任务已经中止,则cancel()方法会返回true;若是任务已经完成或者已经中止了或者这个任务没法中止,则cancel()会返回一个false。当一个任务被成功中止后,他没法再次执行。isDone()和isCancel()方法能够判断当前工做是否完成和是否取消。 多线程
一个场景,咱们要学习作饭,那么咱们须要准备厨具和食材,厨具经过电子商务网购,食材去菜市场挑选。那么可使用多线程来并发进行,即咱们能够先网购下单,在等待快递员送货过来的这段时间去菜市场买食材,节省时间,提升效率。并发
public class FutureTest { public static void main(String[] args) throws InterruptedException { long startTime = System.currentTimeMillis(); OnlineShopping shopping = new OnlineShopping(); shopping.start(); Thread.sleep(2000);//等待送货执行完 System.out.println("第二步:食材到位"); shopping.join();//阻塞订单直到快递送到获得厨具 System.out.println("第三步:开始厨艺"); System.out.println("总共用时:" + (System.currentTimeMillis() - startTime) + "ms"); } static class OnlineShopping extends Thread { @Override public void run() { System.out.println("第一步:下单"); System.out.println("第一步:等待送货"); try { Thread.sleep(5000);//送货中 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第一步:快递送到"); } } } //======结果====== 第一步:下单 第一步:等待送货 第二步:食材到位 第一步:快递送到 第三步:开始厨艺 总共用时:5003ms
public class FutureTest { public static void main(String[] args) throws Exception { long startTime = System.currentTimeMillis(); Callable<String> shopping = () ->{ System.out.println("第一步:下单"); System.out.println("第一步:等待送货"); Thread.sleep(5000);//快递员送货中 System.out.println("第一步:快递送到"); return "厨具到达"; }; FutureTask<String> task = new FutureTask<>(shopping); new Thread(task).start(); Thread.sleep(2000);//保证下单操做执行到“等待送货”中 System.out.println("第二步:食材到位"); if (!task.isDone()) { // 联系快递员,询问是否到货 System.out.println("第三步:厨具还没到,心情好就等着(心情很差就调用cancel方法取消订单)"); } String chuju = task.get();//获得厨具 System.out.println("第三步:开始厨艺"); System.out.println("总共用时:" + (System.currentTimeMillis() - startTime) + "ms"); } } //======结果====== 第一步:下单 第一步:等待送货 第二步:食材到位 第三步:厨具还没到,心情好就等着(心情很差就调用cancel方法取消订单) 第一步:快递送到 第三步:开始厨艺 总共用时:5048ms
使用Future模式的三部曲:app
public interface Callable<V> { V call() throws Exception; }
public class FutureTask<V> implements RunnableFuture<V>{ public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // 确保callable的可见性 } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // 确保callable的可见性 } } public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
public void run() { // 1. 若是 state != NEW 说明 run 方法已经运行过,直接 return // 2. 若是 state == NEW && CAS 竞争 设置 runner 失败,说明已经有别的线程在运行,直接 return // NEW 的状态由构造方法初始化,runner 是运行该 Callable 的线程 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable;// 这里的callable是从构造方法里面传人的 if (c != null && state == NEW) { V result; boolean ran;// 标识符 try { result = c.call();//得到结果 ran = true; } catch (Throwable ex) {//异常 result = null; ran = false; setException(ex); } if (ran)//成功没有异常,设置返回值 set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() //在状态设置以前,runner必须是非空的,以防止对run()的并发调用 runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts //为防止泄漏中断,必须在空runner以后将状态设置为重复读 int s = state; // 若是最终状态 >= INTERRUPTING,则处理中断 // cancel 方法会经过参数 mayInterruptIfRunning 来设置 state 的值 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
private volatile int state;//状态,volatile让状态可见性 private static final int NEW = 0;//构造方法建立时的状态 private static final int COMPLETING = 1;//这是一个中间态,完成时和出现异常时有使用到 private static final int NORMAL = 2;//完成运行时的最终状态 private static final int EXCEPTIONAL = 3;//异常时的最终状态 private static final int CANCELLED = 4;//已取消 private static final int INTERRUPTING = 5;//中断中 private static final int INTERRUPTED = 6;//已中断 可能的 state 转换: NEW -> COMPLETING -> NORMAL NEW -> COMPLETING -> EXCEPTIONAL NEW -> CANCELLED NEW -> INTERRUPTING -> INTERRUPTED
private Object outcome;//经过get方法得到的返回值 //设置返回值,状态NEW -> COMPLETING -> NORMAL protected void set(V v) { // 这里为何要用 CAS 由于可能会和 cancel 方法产生竞争。 // 若是竞争失败,说明取消竞争成功,在 cancel 方法承担唤醒的工做,因此直接跳过。 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//NEW -> COMPLETING // 竞争成功 outcome = v;//outcome为返回结果 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最终状态为NORMAL,COMPLETING -> NORMAL finishCompletion(); } }
//状态:NEW -> COMPLETING -> EXCEPTIONAL protected void setException(Throwable t) { // 这里为何要用 CAS 由于可能会和 cancel 方法产生竞争。 // 若是竞争失败,说明取消竞争成功,在 cancel 方法承担唤醒的工做,因此直接跳过。 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//NEW -> COMPLETING // 竞争成功 outcome = t; // outcome 为一个 Throwable // 把最终状态改成 EXCEPTIONAL UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state,COMPLETING -> EXCEPTIONAL finishCompletion(); } }
//删除当前线程并唤醒全部等待线程,调用done(),并取消进行中的方法 private void finishCompletion() { // assert state > COMPLETING; //从 waiters 末尾开始遍历,for 自旋直到 CAS 成功。 for (WaitNode q; (q = waiters) != null;) { // 使用 CAS 把 waiters 设置为 null,和 awaitDone 和 removeWatier 方法竞争 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { // 自旋唤醒全部线程 for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t);// 唤醒线程 } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } // 这个一个空方法,主要用于继承重写,这也是模板模式的体现 done(); callable = null; // to reduce footprint } protected void done() { } //===========例子============== //ExecutorCompletionService 的做用就是把线程池的执行结果放到一个已完成队列中,方便获取执行结果,其内部主要经过一个 FutureTask 的实现类 QueueingFuture 来实现这个功能: private class QueueingFuture extends FutureTask<Void> { QueueingFuture(RunnableFuture<V> task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); }//done方法是FutureTask方法的重写。FutureTask在完成时会执行done方法,把task放入已完成队列completionQueue。 private final Future<V> task; }
public V get() throws InterruptedException, ExecutionException { int s = state;//获得状态 if (s <= COMPLETING)//状态未完成,把获取结果的线程放入等待链表,而后阻塞,直至被中断、完成或出现异常。 s = awaitDone(false, 0L); return report(s);//返回结果 } private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L;//用于计时 WaitNode q = null; boolean queued = false; for (;;) {//自旋 //若是已经被中断,则removeWaiter,抛出中断异常 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) {//task已经结束 if (q != null) q.thread = null; return s; } //第二个线程进来,正在运行,发现前面有等待节点,则让出cpu else if (s == COMPLETING) // cannot time out yet Thread.yield(); // 第一次遍历,初始化 WaitNode,第一个节点进来时进行,第一个线程状态是new else if (q == null) q = new WaitNode(); // 是否已入队,没有则把WaitNode接到末尾,第一个线程第二次遍历时运行下面代码 else if (!queued) // 和 finishCompletion 和 removeWaiter 竞争 // 1. finishCompletion竞争成功,说明state已经 > COMPLETING则下次循环就会退出 // 2. removeWaiter竞争成功,说明waiters变化了,下一次循环再次竞争 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 若是使用了计时,则判断是否超时,若是超时则移出WaitNode并当即返回无需等待结果,不然阻塞 nanos else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else //阻塞,直到被唤醒(正常完成 || 异常 || 中断) LockSupport.park(this); } } //根据awaitDone返回状态返回结果或抛出异常 private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL)//正常 return (V)x; if (s >= CANCELLED)//取消 throw new CancellationException(); // task 执行过程当中出现异常 throw new ExecutionException((Throwable)x); }
/** * Tries to unlink a timed-out or interrupted wait node to avoid * accumulating garbage. Internal nodes are simply unspliced * without CAS since it is harmless if they are traversed anyway * by releasers. To avoid effects of unsplicing from already * removed nodes, the list is retraversed in case of an apparent * race. This is slow when there are a lot of nodes, but we don't * expect lists to be long enough to outweigh higher-overhead * schemes. *尝试取消连接超时或中断的等待节点以免堆积垃圾。内部节点的拼接没有CAS, *由于这对释放者不管如何遍历都没有影响。 为了不已删除节点节点未拼接的影响, *若是出现明显的竞争,则从新遍历列表。 当节点不少时会很慢,可是咱们不 *指望列表足够长以抵消较高的开销计划。 */ private void removeWaiter(WaitNode node) { if (node != null) { node.thread = null; retry: for (;;) { // restart on removeWaiter race //遍历整个链表 for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; //把q看成前一个节点,遍历下一个节点 if (q.thread != null) pred = q; // q.thread == null && pred != null,表示当前节点不是第一个节点,是一个中间节点 // 这里没有使用 CAS,若是出现多个线程同时遍历,前一个节点变为null,则从新从头遍历 // 为何没有使用 CAS 由于做者的想法是这个链表不会太长,因此咱们使用时不该该使这个链表太长 // 操做:把下一个节点链接到前一个节点的后面 else if (pred != null) { pred.next = s;//把s链接到pred后面 if (pred.thread == null) // check for race continue retry; } // q.thread == null && pred == null,表示第一个节点的 thread == null, // 这里使用 CAS,由于可能多个线程在操做 // 操做:把下一个节点设置为末尾节点,若是竞争失败则从新从头遍历 else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } } }
public boolean isDone() { return state != NEW; }
建立Thread,把FutureTask实例放入构造方法中,start开启线程less
参考:https://www.jianshu.com/p/414cc2f0002c异步