java.util.concurrent.FutureTask 源码

线程池相关

源码:

package java.util.concurrent;

import java.util.concurrent.locks.LockSupport;

public class FutureTask<V> implements RunnableFuture<V> {

    private volatile int state;
    private static final int NEW = 0;
    private static final int COMPLETING = 1;
    private static final int NORMAL = 2;
    private static final int EXCEPTIONAL = 3;
    private static final int CANCELLED = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED = 6;


    private Callable<V> callable;

    private Object outcome;

    private volatile Thread runner;

    private volatile WaitNode waiters;


    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;

        WaitNode() {
            thread = Thread.currentThread();
        }
    }

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;
    }


    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;
    }

    public boolean isCancelled() {
        return state >= CANCELLED;
    }

    public boolean isDone() {
        return state != NEW;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally {
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }


    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }


    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

    public void run() {
        if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {

            runner = null;

            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V) x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable) x);
    }


    protected void done() {
    }


    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL);
            finishCompletion();
        }
    }


    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);
            finishCompletion();
        }
    }


    protected boolean runAndReset() {
        if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call();
                    ran = true;
                } catch (Throwable ex) {
                    setException(ex);
                }
            }
        } finally {

            runner = null;

            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }

    private void handlePossibleCancellationInterrupt(int s) {
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield();
    }


    private void finishCompletion() {
        for (WaitNode q; (q = waiters) != null; ) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (; ; ) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null;
                    q = next;
                }
                break;
            }
        }
        done();
        callable = null;
    }


    private int awaitDone(boolean timed, long nanos) throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (; ; ) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            } else if (s == COMPLETING)
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            } else
                LockSupport.park(this);
        }
    }


    private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null;
            retry:
            for (; ; ) {
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    if (q.thread != null)
                        pred = q;
                    else if (pred != null) {
                        pred.next = s;
                        if (pred.thread == null)
                            continue retry;
                    } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s))
                        continue retry;
                }
                break;
            }
        }
    }

    // Unsafe方法
    private static final sun.misc.Unsafe UNSAFE;
    private static final long stateOffset;
    private static final long runnerOffset;
    private static final long waitersOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = FutureTask.class;
            stateOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("state"));
            runnerOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("runner"));
            waitersOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("waiters"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

}

类 FutureTask<V>

    类型参数:html

    V - 此 FutureTask 的 get 方法所返回的结果类型。java

    全部已实现的接口:node

    RunnableFuture<V>, RunnableFuture<V>api

    可取消的异步计算。利用开始和取消计算的方法、查询计算是否完成的方法和获取计算结果的方法,此类提供了对 Future 的基本实现。异步

    仅在计算完成时才能获取结果;若是计算还没有完成,则阻塞 get 方法。一旦计算完成,就不能再从新开始或取消计算。this

    可以使用 FutureTask 包装 Callable 或 Runnable 对象。由于 FutureTask 实现了 Runnable,因此可将 FutureTask 提交给 Executor 执行。spa

    除了做为一个独立的类外,此类还提供了 protected 功能,这在建立自定义任务类时可能颇有用。.net

 

构造方法摘要线程

 

FutureTask(Callable<V> callable) 
          建立一个 FutureTask,一旦运行就执行给定的 Callable。
FutureTask(Runnable runnable, V result) 
          建立一个 FutureTask,一旦运行就执行给定的 Runnable,并安排成功完成时 get 返回给定的结果 。

 方法摘要设计

 boolean cancel(boolean mayInterruptIfRunning) 
          试图取消对此任务的执行。
protected  void done() 
          当此任务转换到状态 isDone(无论是正常地仍是经过取消)时,调用受保护的方法。
 V get() 
          若有必要,等待计算完成,而后获取其结果。
 V get(long timeout, TimeUnit unit) 
          若有必要,最多等待为使计算完成所给定的时间以后,获取其结果(若是结果可用)。
 boolean isCancelled() 
          若是在任务正常完成前将其取消,则返回 true。
 boolean isDone() 
          若是任务已完成,则返回 true。
 void run() 
          除非已将此 Future 取消,不然将其设置为其计算的结果。
protected  boolean runAndReset() 
          执行计算而不设置其结果,而后将此 Future 重置为初始状态,若是计算遇到异常或已取消,则该操做失败。
protected  void set(V v) 
          除非已经设置了此 Future 或已将其取消,不然将其结果设置为给定的值。
protected  void setException(Throwable t) 
          除非已经设置了此 Future 或已将其取消,不然它将报告一个 ExecutionException,并将给定的 throwable 做为其缘由。

 从类 java.lang.Object 继承的方法

cloneequalsfinalizegetClasshashCodenotifynotifyAlltoStringwaitwaitwait

 

FutureTask

public FutureTask(Callable<V> callable)

    建立一个 FutureTask,一旦运行就执行给定的 Callable。

    参数:

    callable - 可调用的任务。

    抛出:

    NullPointerException - 若是 callable 为 null。

 

FutureTask

public FutureTask(Runnable runnable,
                  V result)

    建立一个 FutureTask,一旦运行就执行给定的 Runnable,并安排成功完成时 get 返回给定的结果 。

    参数:

    runnable - 可运行的任务。

    result - 成功完成时要返回的结果。若是不须要特定的结果,则考虑使用下列形式的构造: Future<?> f = new FutureTask<Object>(runnable, null)

    抛出:

    NullPointerException - 若是 runnable 为 null。

 

 

isCancelled

public boolean isCancelled()

    从接口 Future 复制的描述

        若是在任务正常完成前将其取消,则返回 true。

    指定者:

        接口 Future<V> 中的 isCancelled

    返回:

        若是任务完成前将其取消,则返回 true

 

 

isDone

public boolean isDone()

    从接口 Future 复制的描述

        若是任务已完成,则返回 true。 可能因为正常终止、异常或取消而完成,在全部这些状况中,此方法都将返回 true。

    指定者:

        接口 Future<V> 中的 isDone

    返回:

        若是任务已完成,则返回 true

 

 

cancel

public boolean cancel(boolean mayInterruptIfRunning)

    从接口 Future 复制的描述

        试图取消对此任务的执行。若是任务已完成、或已取消,或者因为某些其余缘由而没法取消,则此尝试将失败。当调用 cancel 时,若是调用成功,而此任务还没有启动,则此任务将永不运行。若是任务已经启动,则 mayInterruptIfRunning 参数肯定是否应该以试图中止任务的方式来中断执行此任务的线程。

    此方法返回后,对 Future.isDone() 的后续调用将始终返回 true。若是此方法返回 true,则对 Future.isCancelled() 的后续调用将始终返回 true。

    指定者:

        接口 Future<V> 中的 cancel

    参数:

    mayInterruptIfRunning - 若是应该中断执行此任务的线程,则为 true;不然容许正在运行的任务运行完成

    返回:

        若是没法取消任务,则返回 false,这一般是因为它已经正常完成;不然返回 true

 

get

public V get()
      throws InterruptedException,
             ExecutionException

    从接口 Future 复制的描述

        若有必要,等待计算完成,而后获取其结果。

    指定者:

        接口 Future<V> 中的 get

    返回:

        计算的结果

    抛出:

    CancellationException - 若是计算被取消

    InterruptedException - 若是当前的线程在等待时被中断

    ExecutionException - 若是计算抛出异常

 

get

public V get(long timeout,
             TimeUnit unit)
      throws InterruptedException,
             ExecutionException,
             TimeoutException

    从接口 Future 复制的描述

        若有必要,最多等待为使计算完成所给定的时间以后,获取其结果(若是结果可用)。

    指定者:

        接口 Future<V> 中的 get

    参数:

    timeout - 等待的最大时间

    unit - timeout 参数的时间单位

    返回:

        计算的结果

    抛出:

    CancellationException - 若是计算被取消

    InterruptedException - 若是当前的线程在等待时被中断

    ExecutionException - 若是计算抛出异常

    TimeoutException - 若是等待超时

 

 

done

protected void done()

    当此任务转换到状态 isDone(无论是正常地仍是经过取消)时,调用受保护的方法。默认实现不执行任何操做。子类能够重写此方法,以调用完成回调或执行簿记。注意,能够查询此方法的实现内的状态,从而肯定是否已取消了此任务。

 

 

set

protected void set(V v)

    除非已经设置了此 Future 或已将其取消,不然将其结果设置为给定的值。在计算成功完成时经过 run 方法内部调用此方法。

    参数:

    v - 值

 

setException

protected void setException(Throwable t)

    除非已经设置了此 Future 或已将其取消,不然它将报告一个 ExecutionException,并将给定的 throwable 做为其缘由。在计算失败时经过 run 方法内部调用此方法。

    参数:

    t - 失败的缘由

 

 

run

public void run()

    除非已将此 Future 取消,不然将其设置为其计算的结果。

    指定者:

        接口 Runnable 中的 run

    指定者:

        接口 RunnableFuture<V> 中的 run

    另请参见:

    Thread.run()

 

runAndReset

protected boolean runAndReset()

    执行计算而不设置其结果,而后将此 Future 重置为初始状态,若是计算遇到异常或已取消,则该操做失败。本操做被设计用于那些本质上要执行屡次的任务。

    返回:

        若是成功运行并重置,则返回 true。

相关文章
相关标签/搜索