Runnable封装一个异步运行的任务,能够把它想象成为一个没有参数和返回值的异步方法。Callable与Runnable相似,可是有返回值。Callable接口是一个参数化的类型,只有一个方法call。并发
public interface Callable<V> { V call() throws Exception; }
类型参数是返回值的类型。例如,异步
Callable<Integer>表示一个最终返回Integer对象的异步计算。
Future保存异步计算的结果。能够启动一个计算,将Future对象交给某个线程,而后忘掉它。Future对象的全部者在结果计算好以后就能够得到它。
Future接口具备下面的方法:高并发
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
第一个get方法的调用被阻塞,知道计算完成。若是在计算完成以前,第二个get方法的调用超时,抛出一个TimeoutException异常。若是运行该计算的线程被中断,两个方法都将抛出InterruptedException。若是计算已经完成,那么get方法当即返回。源码分析
若是计算还在进行,isDone方法返回false;若是完成了,则返回true。this
能够用cancel方法取消该计算。若是计算尚未开始,它被取消且再也不开始。若是计算处于运行之中,那么若是mayInterrupt参数为true,它就被中断。线程
FutureTask包装器是一种很是便利的机制,同时实现了Future和Runnable接口。code
类图以下:对象
FutureTask的状态转换过程:blog
* NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED
建立一个futureTask对象task 提交task到调度器executor等待调度或者在另一个线程中执行task 等待调度中... 若是此时currentThread调取执行结果task.get(),会有几种状况 if task 尚未被executor调度或正在执行中 阻塞当前线程,并加入到一个阻塞链表中waitNode else if task被其它Thread取消,并取消成功 或task处于中断状态 throw exception else if task执行完毕,返回执行结果,或执行存在异常,返回异常信息 若是此时有另一个线程调用task.get() 执行过程同上
1. Future用于异步获取执行结果或者取消任务。 2. 在高并发场景下确保任务只执行一次。
Callable<Integer> myComputation = ...; FutureTask<Integer> task = new FutureTask<Integer>(myComputation); Thread t = new Thread(task); t.start(); ... Integer result = task.get(); //获取结果
/** * The run state of this task, initially NEW. The run state * transitions to a terminal state only in methods set, * setException, and cancel. During completion, state may take on * transient values of COMPLETING (while outcome is being set) or * INTERRUPTING (only while interrupting the runner to satisfy a * cancel(true)). Transitions from these intermediate to final * states use cheaper ordered/lazy writes because values are unique * and cannot be further modified. * * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } } protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
public class Memoizer<A, V> implements Computable<A, V> { private final ConcurrentMap<A, Future<V>> cache = new ConcurrentMap<A, Future>>(); private final Computable<A, V> c; public Memoizer(Computable<A, V> c) { this.c = c; } public C computer(final A arg) throws InterruptedException { while(true) { Future<V> f = cache.get(arg); if(f == null) { Callable<V> eval = new Callable<V>() { public V call() throws InterruptedException { return c.compute(arg); } }; FutureTask<V> ft = new FutureTask<V>(eval); f = cache.putIfAbsent(arg, ft); if(f == null) { f = ft; ft.run(); } } try { return f.get(); } catch(CancellationException e) { cache.remove(arg, f); } catch(ExecutionException e) { throw launderThrowable(e.getCause()); } } } }