上面就是我结合本身的理解分析的future机制的设计思想,可能说的不够全,但愿有人能够补充。下面会讲解java future的具体实现java
execute方式:咱们知道一个类若是实现了runnable接口,它就可以被线程来执行,由于实现了runnable接口就拥有了run方法,因此可以被执行。因此最简单的异步线程执行方式以下:利用Executors框架来建立一个线程池,而后调用execute方法来提交异步任务,注意这里的execute方法是没有返回的,也就是说咱们无法知道提交的任务的执行结果。node
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(()->System.out.println("异步执行!"));复制代码
submit方式:前面提到的java给咱们提供的线程池接口ExecutorService提供了两种提交异步任务的方式,一种就是没有返回值的execute方法(因为ExecutorService接口是extends了Executor接口的,因此拥有了execute方法),还有一种是带有返回值的submit方法。在submit方法中,提供了三个重载方法:多线程
<T> Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);
<T> Future<T> submit(Runnable task, T result);复制代码
能够看到,submit方法支持实现了callable和runnable的task,不一样于runnable只有没有返回值的run方法,callable提供了一个带返回值的call方法,能够有返回值。正是由于runnable没有返回值,因此第二个重载方法返回值为null,第三个重载方法里面能够从外部设置一个返回值,这个返回值将会做为runnable的返回值。具体代码以下:并发
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}复制代码
两个方法都调用newTaskFor方法来建立了一个RunnableFuture的对象,而后调用execute方法来执行这个对象,说明咱们线程池真正执行的对象就是这个RunnableFuture对象。框架
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}复制代码
由上面代码看出就是建立了一个futureTask对象,这个对象封装了咱们提供的runnable和callable对象。futuretask实现了runnablefuture接口,这就是说明futuretask具有了runnable的功能(能被线程执行)和future功能(可以获取自身执行的结果和状态)。能被线程执行功能是咱们本身经过实现runnable接口或者callable接口来完成的。future功能前面咱们提过是很通用的功能,因此java给咱们实现了。下面就进入futuretask查看。异步
futuretask对象:futuretask是真正的future功能实现的地方。前面说过这个一个RunnableFuture对象,因此咱们看看它的run方法this
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** 封装的callable对象 */
private Callable<V> callable;
/** task的执行结果 */
private Object outcome;
/** 当前线程池的哪一个线程正在执行这个task */
private volatile Thread runner;
/** 等待的线程列表 */
private volatile WaitNode waiters;
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;// 1. 内部包装的一个callable对象
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();// 2. 调用包装的call方法
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);//3. 设置返回值
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}复制代码
前面提到futuretask是封装了runnable和callable的,但是为何内部只有一个callable呢,其实是由于futuretask本身调用适配器转换了一下:代码以下,采用了java的适配器模式。spa
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}复制代码
futuretask的run方法调用了内部封装的callable对象的call方法,获取返回值,而且设置到本身outcome中,state表明执行的状态,这样就经过代理的方式代理了咱们的callable的call方法,帮助咱们获取执行的结果和状态,因此咱们本身编写业务逻辑的时候就不用去管这层通用的逻辑了。这里面还有一个waitnode咱们单独讲线程
WaitNode: 经过前面的分析咱们知道,实际上咱们submit任务以后返回的future对象就是线程池为咱们建立的runnablefuture对象,也就是futuretask这个对象。future接口为咱们提供了一系列的方法,以下设计
V get() throws InterruptedException, ExecutionException;
boolean cancel(boolean mayInterruptIfRunning);复制代码
上面是主要的两个方法,get和cancel,cancel的时候调用runner的interrupt方法便可
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}复制代码
其中unsafe是用于cas操做的,在java并发包中大量用到,后续会讲解。
get方法的设计是阻塞的,也就是说若是结果没有返回时须要等待的,因此才会有waitnode这个对象的产生,当多个线程都调用futuretask的get方法的时候,若是结果还没产生,就都须要等待,这时候全部等待的线程就会造成一个链表,因此waitnode实际上就是线程的链表。
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}复制代码
再看get方法:若是任务没有完成就调用awaitDone进入阻塞,若是完成了直接调用report返回结果
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}复制代码
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {//1. 若是等待过程当中,被中断过了,那么就移除本身
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)//2. cas更新链表节点
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);//3. locksupport原语让线程进入休眠
}
else
LockSupport.park(this);
}
}复制代码
仍是比较好看懂,其中LockSupport是原语,让线程进行休眠。若是线程在休眠中醒来了,有多是多种状况,好比get的时间到了,也就是从3中醒来了,这样的话下一次循环就会判断时间到了,从而remove掉节点退出。还有可能等待的线程被interrupt了,这时候就会走到1的逻辑,经过判断中断标记将其remove掉。
既然有了waitnode这个等待链表,那么确定会有相应的唤醒机制,当执行完毕以后就会将waitnode链表上的线程一次唤醒,以下。
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}复制代码