FutureTask源码解析(2)——深刻理解FutureTask

前言

系列文章目录java

有了上一篇对预备知识的了解以后,分析源码就容易多了,本篇咱们就直接来看看FutureTask的源码。node

本文的源码基于JDK1.8。编程

Future和Task

在深刻分析源码以前,咱们再来拎一下FutureTask究竟是干吗的。人如其名,FutureTask包含了FutureTask两部分。segmentfault

咱们上一篇说过,FutureTask实现了RunnableFuture接口,即Runnable接口和Future接口。
其中Runnable接口对应了FutureTask名字中的Task,表明FutureTask本质上也是表征了一个任务。而Future接口就对应了FutureTask名字中的Future,表示了咱们对于这个任务能够执行某些操做,例如,判断任务是否执行完毕,获取任务的执行结果,取消任务的执行等。安全

因此简单来讲,FutureTask本质上就是一个“Task”,咱们能够把它当作简单的Runnable对象来使用。可是它又同时实现了Future接口,所以咱们能够对它所表明的“Task”进行额外的控制操做。数据结构

Java并发工具类的三板斧

关于Java并发工具类的三板斧,咱们在分析AQS源码的时候已经说过了,即:多线程

状态,队列,CAS

以这三个方面为切入点来看源码,有助于咱们快速的看清FutureTask的概貌:并发

状态

首先是找状态。框架

在FutureTask中,状态是由state属性来表示的,不出所料,它是volatile类型的,确保了不一样线程对它修改的可见性:异步

private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

state属性是贯穿整个FutureTask的最核心的属性,该属性的值表明了任务在运行过程当中的状态,随着任务的执行,状态将不断地进行转变,从上面的定义中能够看出,总共有7种状态:包括了1个初始态,2个中间态和4个终止态。

虽然说状态有这么多,可是状态的转换路径却只有四种:

state of FutureTask

  • 任务的初始状态都是NEW, 这一点是构造函数保证的,咱们后面分析构造函数的时候再讲;
  • 任务的终止状态有4种:

    • NORMAL:任务正常执行完毕
    • EXCEPTIONAL:任务执行过程当中发生异常
    • CANCELLED:任务被取消
    • INTERRUPTED:任务被中断
  • 任务的中间状态有2种:

    • COMPLETING 正在设置任务结果
    • INTERRUPTING 正在中断运行任务的线程

值得一提的是,任务的中间状态是一个瞬态,它很是的短暂。并且任务的中间态并不表明任务正在执行,而是任务已经执行完了,正在设置最终的返回结果,因此能够这么说:

只要state不处于 NEW 状态,就说明任务已经执行完毕

注意,这里的执行完毕是指传入的Callable对象的call方法执行完毕,或者抛出了异常。因此这里的COMPLETING的名字显得有点迷惑性,它并不意味着任务正在执行中,而意味着call方法已经执行完毕,正在设置任务执行的结果。

而将一个任务的状态设置成终止态只有三种方法:

  • set
  • setException
  • cancel

咱们将在下文的源码解析中分析这三个方法。

队列

接着咱们来看队列,在FutureTask中,队列的实现是一个单向链表,它表示全部等待任务执行完毕的线程的集合。咱们知道,FutureTask实现了Future接口,能够获取“Task”的执行结果,那么若是获取结果时,任务尚未执行完毕怎么办呢?那么获取结果的线程就会在一个等待队列中挂起,直到任务执行完毕被唤醒。这一点有点相似于咱们以前学习的AQS中的sync queue,在下文的分析中,你们能够本身对照它们的异同点。

咱们前面说过,在并发编程中使用队列一般是将当前线程包装成某种类型的数据结构扔到等待队列中,咱们先来看看队列中的每个节点是怎么个结构:

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

可见,相比于AQS的sync queue所使用的双向链表中的Node,这个WaitNode要简单多了,它只包含了一个记录线程的thread属性和指向下一个节点的next属性。

值得一提的是,FutureTask中的这个单向链表是当作来使用的,确切来讲是当作Treiber栈来使用的,不了解Treiber栈是个啥的能够简单的把它当作是一个线程安全的栈,它使用CAS来完成入栈出栈操做(想进一步了解的话能够看这篇文章)。为啥要使用一个线程安全的栈呢,由于同一时刻可能有多个线程都在获取任务的执行结果,若是任务还在执行过程当中,则这些线程就要被包装成WaitNode扔到Treiber栈的栈顶,即完成入栈操做,这样就有可能出现多个线程同时入栈的状况,所以须要使用CAS操做保证入栈的线程安全,对于出栈的状况也是同理。

因为FutureTask中的队列本质上是一个Treiber栈,那么使用这个队列就只须要一个指向栈顶节点的指针就好了,在FutureTask中,就是waiters属性:

/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

事实上,它就是整个单向链表的头节点。

综上,FutureTask中所使用的队列的结构以下:
Treiber stack

CAS操做

CAS操做大多数是用来改变状态的,在FutureTask中也不例外。咱们通常在静态代码块中初始化须要CAS操做的属性的偏移量:

// Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long stateOffset;
    private static final long runnerOffset;
    private static final long waitersOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = FutureTask.class;
            stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state"));
            runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner"));
            waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

从这个静态代码块中咱们也能够看出,CAS操做主要针对3个属性,包括staterunnerwaiters,说明这3个属性基本是会被多个线程同时访问的。其中state属性表明了任务的状态,waiters属性表明了指向栈顶节点的指针,这两个咱们上面已经分析过了。runner属性表明了执行FutureTask中的“Task”的线程。为何须要一个属性来记录执行任务的线程呢?这是为了中断或者取消任务作准备的,只有知道了执行任务的线程是谁,咱们才能去中断它。

定义完属性的偏移量以后,接下来就是CAS操做自己了。在FutureTask,CAS操做最终调用的仍是Unsafe类的compareAndSwapXXX方法,关于这一点,咱们上一篇预备知识中已经讲过了,这里再也不赘述。

核心属性

前面咱们以java并发编程工具类的“三板斧”为切入点分析了FutureTask的状态,队列和CAS操做,对这个工具类有了初步的认识。接下来,咱们就要开始进入源码分析了。首先咱们先来看看FutureTask的几个核心属性:

/**
     * The run state of this task, initially NEW.  The run state
     * transitions to a terminal state only in methods set,
     * setException, and cancel.  During completion, state may take on
     * transient values of COMPLETING (while outcome is being set) or
     * INTERRUPTING (only while interrupting the runner to satisfy a
     * cancel(true)). Transitions from these intermediate to final
     * states use cheaper ordered/lazy writes because values are unique
     * and cannot be further modified.
     *
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;

能够看出,FutureTask的核心属性只有5个:

  • state
  • callable
  • outcome
  • runner
  • waiters

关于 state waiters runner三个属性咱们上面已经解释过了。剩下的callable属性表明了要执行的任务自己,即FutureTask中的“Task”部分,为Callable类型,这里之因此用Callable而不用Runnable是由于FutureTask实现了Future接口,须要获取任务的执行结果。outcome属性表明了任务的执行结果或者抛出的异常,为Object类型,也就是说outcome能够是任意类型的对象,因此当咱们将正常的执行结果返回给调用者时,须要进行强制类型转换,返回由Callable定义的V类型。这5个属性综合起来就完成了整个FutureTask的工做,使用关系以下:

  • 任务本尊:callable
  • 任务的执行者:runner
  • 任务的结果:outcome
  • 获取任务的结果:state + outcome + waiters
  • 中断或者取消任务:state + runner + waiters

构造函数

介绍完核心属性以后,咱们来看看FutureTask的构造函数:

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

FutureTask共有2个构造函数,这2个构造函数一个是直接传入Callable对象, 一个是传入一个Runnable对象和一个指定的result, 而后经过Executors工具类将它适配成callable对象, 因此这两个构造函数的本质是同样的:

  1. 用传入的参数初始化callable成员变量
  2. 将FutureTask的状态设为NEW

(关于将Runnable对象适配成Callable对象的方法Executors.callable(runnable, result)咱们在上一篇预备知识中已经讲过了,不记得的同窗能够倒回去再看一下)

接口实现

上一篇咱们提过,FutureTask实现了RunnableFuture接口:

public class FutureTask<V> implements RunnableFuture<V> {
    ...
}

所以,它必须实现Runnable和Future接口的全部方法。

Runnable接口实现

要实现Runnable接口, 就得覆写run方法, 咱们看看FutureTask的run方法干了点啥:

public void run() {
    if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

首先咱们看到,在run方法的一开始,就检查当前状态是否是New, 而且使用CAS操做将runner属性设置位当前线程,即记录执行任务的线程。compareAndSwapObject的用法在上一篇预备知识中已经介绍过了,这里再也不赘述。可见,runner属性是在运行时被初始化的。

接下来,咱们就调用Callable对象的call方法来执行任务,若是任务执行成功,就使用set(result)设置结果,不然,用setException(ex)设置抛出的异常。

咱们先来看看set(result)方法:

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

这个方法一开始经过CAS操做将state属性由原来的NEW状态修改成COMPLETING状态,咱们在一开始介绍state状态的时候说过,COMPLETING是一个很是短暂的中间态,表示正在设置执行的结果。

状态设置成功后,咱们就把任务执行结果赋值给outcome, 而后直接把state状态设置成NORMAL,注意,这里是直接设置,没有先比较再设置的操做,因为state属性被设置成volatile, 结合咱们上一篇预备知识的介绍,这里putOrderedInt应当和putIntVolatile是等价的,保证了state状态对其余线程的可见性。

在这以后,咱们调用了 finishCompletion()来完成执行结果的设置。

接下来咱们再来看看发生了异常的版本setException(ex)

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

可见,除了将outcome属性赋值为异常对象,以及将state的终止状态修改成EXCEPTIONAL,其他都和set方法相似。在方法的最后,都调用了 finishCompletion()来完成执行结果的设置。那么咱们就来看看 finishCompletion()干了点啥:

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    done();

    callable = null;        // to reduce footprint
}

这个方法事实上完成了一个“善后”工做。咱们先来看看if条件语句中的CAS操做:

UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)

该方法是将waiters属性的值由原值设置为null, 咱们知道,waiters属性指向了Treiber栈的栈顶节点,能够说是表明了整个Treiber栈,将该值设为null的目的就是清空整个栈。若是设置不成功,则if语句块不会被执行,又进行下一轮for循环,而下一轮for循环的判断条件又是waiters!=null ,由此咱们知道,虽然最外层的for循环乍一看好像是什么遍历节点的操做,其实只是为了确保waiters属性被成功设置成null,本质上至关于一个自旋操做。

将waiters属性设置成null之后,接下了 for (;;)死循环才是真正的遍历节点,能够看出,循环内部就是一个普通的遍历链表的操做,咱们前面讲属性的时候说过,Treiber栈里面存放的WaitNode表明了当前等待任务执行结束的线程,这个循环的做用也正是遍历链表中全部等待的线程,并唤醒他们。

将Treiber栈中全部挂起的线程都唤醒后,下面就是执行done方法:

/**
 * Protected method invoked when this task transitions to state
 * {@code isDone} (whether normally or via cancellation). The
 * default implementation does nothing.  Subclasses may override
 * this method to invoke completion callbacks or perform
 * bookkeeping. Note that you can query status inside the
 * implementation of this method to determine whether this task
 * has been cancelled.
 */
protected void done() { }

这个方法是一个空方法,从注释上看,它是提供给子类覆写的,以实现一些任务执行结束前的额外操做。

done方法以后就是callable属性的清理了(callable = null)。

至此,整个run方法分析完了。

真的吗???

并无!别忘了run方法最后还有一个finally块呢:

finally {
    // runner must be non-null until state is settled to
    // prevent concurrent calls to run()
    runner = null;
    // state must be re-read after nulling runner to prevent
    // leaked interrupts
    int s = state;
    if (s >= INTERRUPTING)
        handlePossibleCancellationInterrupt(s);
}

在finally块中,咱们将runner属性置为null,而且检查有没有遗漏的中断,若是发现s >= INTERRUPTING, 说明执行任务的线程有可能被中断了,由于s >= INTERRUPTING 只有两种可能,state状态为INTERRUPTINGINTERRUPTED

有的同窗可能就要问了,咱前面已经执行过的set方法或者setException方法不是已经将state状态设置成NORMAL或者EXCEPTIONAL了吗?怎么会出现INTERRUPTING或者INTERRUPTED状态呢?别忘了,我们在多线程的环境中,在当前线程执行run方法的同时,有可能其余线程取消了任务的执行,此时其余线程就可能对state状态进行改写,这也就是咱们在设置终止状态的时候用putOrderedInt方法,而没有用CAS操做的缘由——咱们没法确信在设置state前是处于COMPLETING中间态仍是INTERRUPTING中间态。

关于任务取消的操做,咱们后面讲Future接口的实现的时候再讲,回到如今的问题,咱们来看看handlePossibleCancellationInterrupt方法干了点啥:

/**
 * Ensures that any interrupt from a possible cancel(true) is only
 * delivered to a task while in run or runAndReset.
 */
private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us.  Let's spin-wait patiently.
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            Thread.yield(); // wait out pending interrupt
}

可见该方法是一个自旋操做,若是当前的state状态是INTERRUPTING,咱们在原地自旋,直到state状态转换成终止态。

至此,run方法的分析就真的结束了。咱们来总结一下:

run方法重点作了如下几件事:

  1. 将runner属性设置成当前正在执行run方法的线程
  2. 调用callable成员变量的call方法来执行任务
  3. 设置执行结果outcome, 若是执行成功, 则outcome保存的就是执行结果;若是执行过程当中发生了异常, 则outcome中保存的就是异常,设置结果以前,先将state状态设为中间态
  4. 对outcome的赋值完成后,设置state状态为终止态(NORMAL或者EXCEPTIONAL)
  5. 唤醒Treiber栈中全部等待的线程
  6. 善后清理(waiters, callable,runner设为null)
  7. 检查是否有遗漏的中断,若是有,等待中断状态完成。

这里再插一句,咱们前面说“state只要不是NEW状态,就说明任务已经执行完成了”就体如今这里,由于run方法中,咱们是在c.call()执行完毕或者抛出了异常以后才开始设置中间态和终止态的。

Future接口的实现

Future接口一共定义了5个方法,咱们一个个来看:

cancel(boolean mayInterruptIfRunning)

既然上面在分析run方法的最后,咱们提到了任务可能被别的线程取消,那咱们就趁热打铁,看看怎么取消一个任务的执行:

public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}

还记得咱们上一篇在介绍Future接口的时候对cancel方法的说明吗?

关于cancel方法,这里要补充说几点:
首先有如下三种状况之一的,cancel操做必定是失败的:

  1. 任务已经执行完成了
  2. 任务已经被取消过了
  3. 任务由于某种缘由不能被取消

其它状况下,cancel操做将返回true。值得注意的是,cancel操做返回true并不表明任务真的就是被取消了,这取决于发动cancel状态时,任务所处的状态:

  1. 若是发起cancel时任务尚未开始运行,则随后任务就不会被执行;
  2. 若是发起cancel时任务已经在运行了,则这时就须要看mayInterruptIfRunning参数了:

    • 若是mayInterruptIfRunning 为true, 则当前在执行的任务会被中断
    • 若是mayInterruptIfRunning 为false, 则能够容许正在执行的任务继续运行,直到它执行完

咱们来看看FutureTask是怎么实现cancel方法的这几个规范的:

首先,对于“任务已经执行完成了或者任务已经被取消过了,则cancel操做必定是失败的(返回false)”这两条,是经过简单的判断state值是否为NEW实现的,由于咱们前面说过了,只要state不为NEW,说明任务已经执行完毕了。从代码中能够看出,只要state不为NEW,则直接返回false。

若是state仍是NEW状态,咱们再往下看:

UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)

这一段是根据mayInterruptIfRunning的值将state的状态由NEW设置成INTERRUPTING或者CANCELLED,当这一操做也成功以后,就能够执行后面的try语句了,但不管怎么,该方法最后都返回了true

咱们再接着看try块干了点啥:

try {    // in case call to interrupt throws exception
    if (mayInterruptIfRunning) {
        try {
            Thread t = runner;
            if (t != null)
                t.interrupt();
        } finally { // final state
            UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
        }
    }
} finally {
    finishCompletion();
}

咱们知道,runner属性中存放的是当前正在执行任务的线程,所以,这个try块的目的就是中断当前正在执行任务的线程,最后将state的状态设为INTERRUPTED,固然,中断操做完成后,还须要经过finishCompletion()来唤醒全部在Treiber栈中等待的线程。

咱们如今总结一下,cancel方法实际上完成如下两种状态转换之一:

  1. NEW -> CANCELLED (对应于mayInterruptIfRunning=false)
  2. NEW -> INTERRUPTING -> INTERRUPTED (对应于mayInterruptIfRunning=true)

对于第一条路径,虽然说cancel方法最终返回了true,但它只是简单的把state状态设为CANCELLED,并不会中断线程的执行。可是这样带来的后果是,任务即便执行完毕了,也没法设置任务的执行结果,由于前面分析run方法的时候咱们知道,设置任务结果有一个中间态,而这个中间态的设置,是以当前state状态为NEW为前提的。

对于第二条路径,则会中断执行任务的线程,咱们在倒回上面的run方法看看:

public void run() {
    if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

虽然第二条路径中断了当前正在执行的线程,可是,响不响应这个中断是由执行任务的线程本身决定的,更具体的说,这取决于c.call()方法内部是否对中断进行了响应,是否将中断异常抛出。

那call方法中是怎么处理中断的呢?从上面的代码中能够看出,catch语句处理了全部的Throwable的异常,这天然也包括了中断异常。

然而,值得一提的是,即便这里进入了catch (Throwable ex){}代码块,setException(ex)的操做必定是失败的,由于在咱们取消任务执行的线程中,咱们已经先把state状态设为INTERRUPTING了,而setException(ex)的操做要求设置前线程的状态为NEW。因此这里响应cancel方法所形成的中断最大的意义不是为了对中断进行处理,而是简单的中止任务线程的执行,节省CPU资源。

那读者可能会问了,既然这个setException(ex)的操做必定是失败的,那放在这里有什么用呢?事实上,这个setException(ex)是用来处理任务本身在正常执行过程当中产生的异常的,在咱们没有主动去cancel任务时,任务的state状态在执行过程当中就会始终是NEW,若是任务此时本身发生了异常,则这个异常就会被setException(ex)方法成功的记录到outcome中。

反正不管如何,run方法最终都会进入finally块,而这时候它会发现s >= INTERRUPTING,若是检测发现s = INTERRUPTING,说明cancel方法尚未执行到中断当前线程的地方,那就等待它将state状态设置成INTERRUPTED。到这里,对cancel方法的分析就和上面对run方法的分析对接上了。

cancel方法到这里就分析完了,若是你一条条的去对照Future接口对于cancel方法的规范,它每一条都是实现了的,而它实现的核心机理,就是对state的当前状态的判断和设置。因而可知,state属性是贯穿整个FutureTask的最核心的属性。

isCancelled()

说完了cancel,咱们再来看看 isCancelled()方法,相较而言,它就简单多了:

public boolean isCancelled() {
    return state >= CANCELLED;
}

那么state >= CANCELLED 包含了那些状态呢,它包括了: CANCELLED INTERRUPTING INTERRUPTED

咱们再来回忆下上一篇讲的Future接口对于isCancelled()方法的规范:

该方法用于判断任务是否被取消了。若是一个任务在正常执行完成以前被Cancel掉了, 则返回true

再对比state的状态图:

isCancelled
可见选取这三个状态做为判断依据是很合理的, 由于只有调用了cancel方法,才会使state状态进入这三种状态。

isDone()

与 isCancelled方法相似,isDone方法也是简单地经过state状态来判断。

public boolean isDone() {
    return state != NEW;
}

关于这一点,其实咱们以前已经说过了,只要state状态不是NEW,则任务已经执行完毕了,由于state状态不存在相似“任务正在执行中”这种状态,即便是短暂的中间态,也是发生在任务已经执行完毕,正在设置任务结果的时候。

get()

最后咱们来看看获取执行结果的get方法,先来看看无参的版本:

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

该方法其实很简单,当任务尚未执行完毕或者正在设置执行结果时,咱们就使用awaitDone方法等待任务进入终止态,注意,awaitDone的返回值是任务的状态,而不是任务的结果。任务进入终止态以后,咱们就根据任务的执行结果来返回计算结果或者抛出异常。

咱们先来看看等待任务完成的awaitDone方法,该方法是获取任务结果最核心的方法,它完成了获取结果,挂起线程,响应中断等诸多操做:

private int awaitDone(boolean timed, long nanos) throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}

在具体分析它的源码以前,有一点咱们先特别说明一下,FutureTask中会涉及到两类线程,一类是执行任务的线程,它只有一个,FutureTask的run方法就由该线程来执行;一类是获取任务执行结果的线程,它能够有多个,这些线程能够并发执行,每个线程都是独立的,均可以调用get方法来获取任务的执行结果。若是任务尚未执行完,则这些线程就须要进入Treiber栈中挂起,直到任务执行结束,或者等待的线程自身被中断。

理清了这一点后,咱们再来详细看看awaitDone方法。能够看出,该方法的大框架是一个自旋操做,咱们一段一段来看:

for (;;) {
    if (Thread.interrupted()) {
        removeWaiter(q);
        throw new InterruptedException();
    }
    // ...
}

首先一开始,咱们先检测当前线程是否被中断了,这是由于get方法是阻塞式的,若是等待的任务尚未执行完,则调用get方法的线程会被扔到Treiber栈中挂起等待,直到任务执行完毕。可是,若是任务迟迟没有执行完毕,则咱们也有可能直接中断在Treiber栈中的线程,以中止等待。

当检测到线程被中断后,咱们调用了removeWaiter:

private void removeWaiter(WaitNode node) {
    if (node != null) {
        ...
    }
}

removeWaiter的做用是将参数中的node从等待队列(即Treiber栈)中移除。若是此时线程尚未进入Treiber栈,则 q=null,那么removeWaiter(q)啥也不干。在这以后,咱们就直接抛出了InterruptedException异常。

接着往下看:

for (;;) {
    /*if (Thread.interrupted()) {
        removeWaiter(q);
        throw new InterruptedException();
    }*/
    int s = state;
    if (s > COMPLETING) {
        if (q != null)
            q.thread = null;
        return s;
    }
    else if (s == COMPLETING) // cannot time out yet
        Thread.yield();
    else if (q == null)
        q = new WaitNode();
    else if (!queued)
        queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                             q.next = waiters, q);
    else if (timed) {
        nanos = deadline - System.nanoTime();
        if (nanos <= 0L) {
            removeWaiter(q);
            return state;
        }
        LockSupport.parkNanos(this, nanos);
    }
    else
        LockSupport.park(this);
}
  • 若是任务已经进入终止态(s > COMPLETING),咱们就直接返回任务的状态;
  • 不然,若是任务正在设置执行结果(s == COMPLETING),咱们就让出当前线程的CPU资源继续等待
  • 不然,就说明任务尚未执行,或者任务正在执行过程当中,那么这时,若是q如今还为null, 说明当前线程尚未进入等待队列,因而咱们新建了一个WaitNode, WaitNode的构造函数咱们以前已经看过了,就是生成了一个记录了当前线程的节点;
  • 若是q不为null,说明表明当前线程的WaitNode已经被建立出来了,则接下来若是queued=false,表示当前线程尚未入队,因此咱们执行了:
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);

这行代码的做用是经过CAS操做将新建的q节点添加到waiters链表的头节点以前,其实就是Treiber栈的入栈操做,写的仍是很简洁的,一行代码就搞定了,若是你们仍是以为晕乎,下面是它等价的伪代码:

q.next = waiters; //当前节点的next指向目前的栈顶元素
//若是栈顶节点在这个过程当中没有变,即没有发生并发入栈的状况
if(waiters的值仍是上面q.next所使用的waiters值){ 
    waiters = q; //修改栈顶的指针,指向刚刚入栈的节点
}

这个CAS操做就是为了保证同一时刻若是有多个线程在同时入栈,则只有一个可以操做成功,也即Treiber栈的规范。

若是以上的条件都不知足,则再接下来由于如今是不带超时机制的get,timed为false,则else if代码块跳过,而后来到最后一个else, 把当前线程挂起,此时线程就处于阻塞等待的状态。

至此,在任务没有执行完毕的状况下,获取任务执行结果的线程就会在Treiber栈中被LockSupport.park(this)挂起了。

那么这个挂起的线程何时会被唤醒呢?有两种状况:

  1. 任务执行完毕了,在finishCompletion方法中会唤醒全部在Treiber栈中等待的线程
  2. 等待的线程自身由于被中断等缘由而被唤醒。

咱们接下来就继续看看线程被唤醒后的状况,此时,线程将回到for(;;)循环的开头,继续下一轮循环:

for (;;) {
    if (Thread.interrupted()) {
        removeWaiter(q);
        throw new InterruptedException();
    }

    int s = state;
    if (s > COMPLETING) {
        if (q != null)
            q.thread = null;
        return s;
    }
    else if (s == COMPLETING) // cannot time out yet
        Thread.yield();
    else if (q == null)
        q = new WaitNode();
    else if (!queued)
        queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                             q.next = waiters, q);
    else if (timed) {
        nanos = deadline - System.nanoTime();
        if (nanos <= 0L) {
            removeWaiter(q);
            return state;
        }
        LockSupport.parkNanos(this, nanos);
    }
    else
        LockSupport.park(this); // 挂起的线程从这里被唤醒
}

首先天然仍是检测中断,所不一样的是,此时q已经不为null了,所以在有中断发生的状况下,在抛出中断以前,多了一步removeWaiter(q)操做,该操做是将当前线程从等待的Treiber栈中移除,相比入栈操做,这个出栈操做要复杂一点,这取决于节点是否位于栈顶。下面咱们来仔细分析这个出栈操做:

private void removeWaiter(WaitNode node) {
    if (node != null) {
        node.thread = null;
        retry:
        for (;;) {          // restart on removeWaiter race
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next;
                if (q.thread != null)
                    pred = q;
                else if (pred != null) {
                    pred.next = s;
                    if (pred.thread == null) // check for race
                        continue retry;
                }
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
                    continue retry;
            }
            break;
        }
    }
}

首先,咱们把要出栈的WaitNode的thread属性设置为null, 这至关于一个标记,是咱们后面在waiters链表中定位该节点的依据。

(1) 要移除的节点就在栈顶

咱们先来看看该节点就位于栈顶的状况,这说明在该节点入栈后,并无别的线程再入栈了。因为一开始咱们就将该节点的thread属性设为了null,所以,前面的q.thread != nullpred != null都不知足,咱们直接进入到最后一个else if 分支:

else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
    continue retry;

这一段是栈顶节点出栈的操做,和入栈相似,采用了CAS比较,将栈顶元素设置成原栈顶节点的下一个节点。

值得注意的是,当CAS操做不成功时,程序会回到retry处重来,但即便CAS操做成功了,程序依旧会遍历完整个链表,找寻node.thread == null 的节点,并将它们一并从链表中剔除。

(2) 要移除的节点不在栈顶

当要移除的节点不在栈顶时,咱们会一直遍历整个链表,直到找到q.thread == null的节点,找到以后,咱们将进入

else if (pred != null) {
    pred.next = s;
    if (pred.thread == null) // check for race
        continue retry;
}

这是由于节点不在栈顶,则其必然是有前驱节点pred的,这时,咱们只是简单的让前驱节点指向当前节点的下一个节点,从而将目标节点从链表中剔除。

注意,后面多加的那个if判断是颇有必要的,由于removeWaiter方法并无加锁,因此可能有多个线程在同时执行,WaitNode的两个成员变量threadnext都被设置成volatile,这保证了它们的可见性,若是咱们在这时发现了pred.thread == null,那就意味着它已经被另外一个线程标记了,将在另外一个线程中被拿出waiters链表,而咱们当前目标节点的原后继节点如今是接在这个pred节点上的,所以,若是pred已经被其余线程标记为要拿出去的节点,咱们如今这个线程再继续日后遍历就没有什么意义了,因此这时就调到retry处,从头再遍历。

若是pred节点没有被其余线程标记,那咱们就接着往下遍历,直到整个链表遍历完。

至此,将节点从waiters链表中移除的removeWaiter操做咱们就分析完了,咱们总结一下该方法:

在该方法中,会传入一个须要移除的节点,咱们会将这个节点的thread属性设置成null,以标记该节点。而后不管如何,咱们会遍历整个链表,清除那些被标记的节点(只是简单的将节点从链表中剔除)。若是要清除的节点就位于栈顶,则还须要注意从新设置waiters的值,指向新的栈顶节点。因此能够看出,虽然说removeWaiter方法传入了须要剔除的节点,可是事实上它可能剔除的不止是传入的节点,而是全部已经被标记了的节点,这样不只清除操做容易了些(不须要专门去定位传入的node在哪里),并且提高了效率(能够同时清除全部已经被标记的节点)。

咱们再回到awaitDone方法里:

private int awaitDone(boolean timed, long nanos) throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q); // 刚刚分析到这里了,咱们接着往下看
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}

若是线程不是由于中断被唤醒,则会继续往下执行,此时会再次获取当前的state状态。所不一样的是,此时q已经不为null, queued已经为true了,因此已经不须要将当前节点再入waiters栈了。

至此咱们知道,除非被中断,不然get方法会在原地自旋等待(用的是Thread.yield,对应于s == COMPLETING)或者直接挂起(对应任务尚未执行完的状况),直到任务执行完成。而咱们前面分析run方法和cancel方法的时候知道,在run方法结束后,或者cancel方法取消完成后,都会调用finishCompletion()来唤醒挂起的线程,使它们得以进入下一轮循环,获取任务执行结果。

最后,等awaitDone函数返回后,get方法返回了report(s),以根据任务的状态,汇报执行结果:

@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

可见,report方法很是简单,它根据当前state状态,返回正常执行的结果,或者抛出指定的异常。

至此,get方法就分析结束了。

值得注意的是,awaitDone方法和get方法都没有加锁,这在多个线程同时执行get方法的时候会不会产生线程安全问题呢?经过查看方法内部的参数咱们知道,整个方法内部用的大多数是局部变量,所以不会产生线程安全问题,对于全局的共享变量waiters的修改时,也使用了CAS操做,保证了线程安全,而state变量自己是volatile的,保证了读取时的可见性,所以整个方法调用虽然没有加锁,它仍然是线程安全的。

get(long timeout, TimeUnit unit)

最后咱们来看看带超时版本的get方法:

public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

它和上面不带超时时间的get方法很相似,只是在awaitDone方法中多了超时检测:

else if (timed) {
    nanos = deadline - System.nanoTime();
    if (nanos <= 0L) {
        removeWaiter(q);
        return state;
    }
    LockSupport.parkNanos(this, nanos);
}

即,若是指定的超时时间到了,则直接返回,若是返回时,任务尚未进入终止状态,则直接抛出TimeoutException异常,不然就像get()方法同样,正常的返回执行结果。

总结

FutureTask实现了Runnable和Future接口,它表示了一个带有任务状态和任务结果的任务,它的各类操做都是围绕着任务的状态展开的,值得注意的是,在全部的7个任务状态中,只要不是NEW状态,就表示任务已经执行完毕或者再也不执行了,并无表示“任务正在执行中”的状态。

除了表明了任务的Callable对象、表明任务执行结果的outcome属性,FutureTask还包含了一个表明全部等待任务结束的线程的Treiber栈,这一点其实和各类锁的等待队列特别像,即若是拿不到锁,则当前线程就会被扔进等待队列中;这里则是若是任务尚未执行结束,则全部等待任务执行完毕的线程就会被扔进Treiber栈中,直到任务执行完毕了,才会被唤醒。

FutureTask虽然为咱们提供了获取任务执行结果的途径,遗憾的是,在获取任务结果时,若是任务尚未执行完成,则当前线程会自旋或者挂起等待,这和咱们实现异步的初衷是相违背的,咱们后面将继续介绍另外一个同步工具类CompletableFuture, 它解决了这个问题。

(完)

系列文章目录