启动线程执行任务,若是须要在任务执行完毕以后获得任务执行结果,可使用从Java 1.5开始提供的Callable和Future
下面就分析一下Callable、Future以及FutureTask的具体实现及使用方法
源码分析基于JDK 1.7java
java.lang.Runnable是一个接口,只有一个run()方法
node
public interface Runnable { public abstract void run(); }
run()方法
的返回值是void,故在执行完任务后没法返回任何结果less
Callable是java.util.concurrent包下的,也是一个接口,也只有一个call()方法
,相似于java.lang.Runnable的run()方法
,实现Callable接口的类和实现Runnable接口的类都是能够被其它线程执行的任务异步
public interface Callable<V> { V call() throws Exception; }
能够看到call()方法是有返回值的,能够将执行的结果返回ide
Callable和Runnable的区别:
一、Callable中定义的是call()方法,Runnable中定义的是run()方法
二、Callable中的call()方法能够返回执行任务后的结果,Runnable中的run()方法没法得到返回值
三、Callable中的call()方法定义了throws Exception抛出异常,抛出的异常能够在主线程Future.get()时被主线程捕获;Runnable中的run()方法没有定义抛出异常,运行任务时发生异常时也会上抛,由于即便不加默认也会上抛RuntimeException,但异常没法被主线程获取
四、运行Callable任务能够拿到一个Future对象表明异步运算的结果源码分析
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
Future是java.util.concurrent包下的一个接口,表明着一个异步计算的结果,能够经过get()
获取线程执行的返回值,cancel()
取消任务执行,isCancelled()
和isDone()
得到任务执行的状况this
尝试取消任务的执行,取消成功返回true,取消失败返回false
mayInterruptIfRunning表示是否容许中断正在执行的任务
一、若是任务还未开始,cancel返回true,且任务永远不会被执行
二、若是任务正在执行,根据mayInterruptIfRunning的值判断是否须要中断执行中的任务,且若是mayInterruptIfRunning为true,会调用中断逻辑,返回true;若是mayInterruptIfRunning为false,不会调用线程中断,只是将任务取消
三、若是任务结束(多是正常完成、异常终止、被取消),返回false
四、若是cancel()操做返回true,后续调用isDone()、isCancelled()都返回true线程
表示任务是否被取消成功,若是在任务正常完成前被取消成功,则返回truecode
表示任务是否已经完成,则返回true,注意:正常完成、异常 或 取消操做都表明任务完成对象
get()
用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回
get(long timeout, TimeUnit unit)
用来获取执行结果,若是在指定时间内还没获取到结果,会抛出TimeoutException
Future提供了三种功能:
一、获取任务执行的结果
二、取消任务
三、判断任务是否完成 或 是否取消
由于Future只是一个接口,因此是没法直接用来建立对象使用的,所以就有了下面的FutureTask
public class FutureTask<V> implements RunnableFuture<V>
FutureTask实现了RunnableFuture接口,那么RunnableFuture又是什么呢?
public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); }
RunnableFuture接口继承了Runnable
和Future
,因此它既是一个可让线程执行的Runnable任务,又是一个能够获取Callable返回值的Future
/** The run state of this task */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; /** The underlying callable; nulled out after running */ private Callable<V> callable; /** The result to return or exception to throw from get() */ private Object outcome; /** The thread running the callable; CASed during run() */ private volatile Thread runner; /** Treiber stack of waiting threads */ private volatile WaitNode waiters;
state 是任务的运行状态
可能的状态转换:
NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED
callable 是线程执行的有返回值的任务
outcome 是任务执行后的结果或异常
waiters 表示等待获取结果的阻塞线程,链表结构,后等待线程的会排在链表前面
FutureTask有两个构造方法:
FutureTask(Callable
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
构造方法参数是Callable定义的任务,并将state置为NEW,只有当state为NEW时,callable才能被执行
FutureTask(Runnable runnable, V result)
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
参数为Runnable和带泛型的result对象,因为Runnable自己是没有返回值的,故线程的执行结果经过result返回
能够看到经过runnable和result封装了个Callable,其实是new RunnableAdapter<T>(task, result)
,这个Adapter适配器将Runnable和result转换成Callable,并返回result
线程运行时真正执行的方法,Callable.call()
会在其中执行,并包含设置返回值或异常的逻辑
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
一、任务执行状态不是NEW,直接返回;将runner属性从null->当前线程不成功,直接返回
二、调用call()方法,调用成功,使用set()设置返回值
三、调用过程发生异常,使用setException()保存异常
set() 和 setException()
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } } protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
set()
和setException()
的实现基本同样,都是先将任务运行状态从NEW->COMPLETING,分别设置返回值或异常给outcome,再将状态分别置为NORMAL和EXCEPTIONAL,最后调用finishCompletion()
依次唤醒等待获取结果的阻塞线程
finishCompletion()实现
/** * Removes and signals all waiting threads, invokes done(), and nulls out callable. */ private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { //将成员变量waiters置为null if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { //循环唤醒WaitNode中的等待线程 for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } //由子类实现的方法 done(); callable = null; // to reduce footprint }
一、执行FutureTask类的get方法时,会把主线程封装成WaitNode节点并保存在waiters链表中
二、FutureTask任务执行完成后,经过UNSAFE设置waiters的值为null,并经过LockSupport.unpark方法依次唤醒等待获取结果的线程
get()
方法有两个实现,一个是一直等待获取结果,直到任务执行完;一个是等待指定时间,超时后任务还未完成会上抛TimeoutException
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }
内部经过awaitDone()
对主线程进行阻塞,具体实现以下:
/** * Awaits completion or aborts on interrupt or timeout. * * @param timed true if use timed waits * @param nanos time to wait, if timed * @return state upon completion */ private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; //截止时间 WaitNode q = null; boolean queued = false; for (;;) { //若是主线程已经被中断,removeWaiter(),并上抛InterruptedException //注意:Thread.interrupted()后会致使线程的中断状态为false if (Thread.interrupted()) { removeWaiter(q); //线程被中断的状况下,从waiters链表中删除q throw new InterruptedException(); } int s = state; //若是任务已经完成(多是正常完成、异常、中断),直接返回,即尚未开始等待,任务已经完成了 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } //若是任务正在完成,让出CPU资源,等待state变成NORMAL或EXCEPTIONAL else if (s == COMPLETING) // cannot time out yet Thread.yield(); //s<COMPLETING 且 尚未建立WaitNode else if (q == null) q = new WaitNode(); //s<COMPLETING 且 已经建立WaitNode,但尚未入队 else if (!queued) /** * 一、将当前waiters赋值给q.next,即“q-->当前waiters” * 二、CAS,将waiters属性,从“当前waiters-->q” * 因此后等待的会排在链表的前面,而任务完成时会从链表前面开始依次唤醒等待线程 */ queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); //全部准备工做完成,判断等待是否须要计时 else if (timed) { nanos = deadline - System.nanoTime(); //若是已经等待超时,remove当前WaiterNode if (nanos <= 0L) { removeWaiter(q); //等待超时的状况下,从waiters链表中删除q return state; } LockSupport.parkNanos(this, nanos); //挂起一段时间 } else LockSupport.park(this); //一直挂起,等待唤醒 } }
一、判断主线程是否被中断,若是被中断,将当前WaitNode节点从waiters链表中删除,并上抛InterruptedException
二、若是任务已经完成(多是正常完成、异常、中断),直接返回(即尚未开始等待,任务已经完成了,就返回了)
三、若是任务正在完成,让出CPU资源,等待state变成NORMAL或EXCEPTIONAL
四、若是任务没有被中断,也没有完成,new WaitNode()
五、若是任务没有被中断,也没有完成,也建立了WaitNode,使用UNSAFE.CAS()操做将WaitNode加入waiters链表
六、全部准备工做完毕,经过LockSupport的park或parkNanos挂起线程
而WaitNode就是一个简单的链表节点,记录这等待的线程和下一个WaitNode
/** * Simple linked list nodes to record waiting threads in a Treiber * stack. See other classes such as Phaser and SynchronousQueue * for more detailed explanation. */ static final class WaitNode { volatile Thread thread; //等待的线程 volatile WaitNode next; //下一个WaitNode WaitNode() { thread = Thread.currentThread(); } }
public boolean cancel(boolean mayInterruptIfRunning) { if (state != NEW) return false; if (mayInterruptIfRunning) { if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) return false; Thread t = runner; if (t != null) t.interrupt(); //中断线程 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state } else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) return false; finishCompletion(); return true; }
一、若是任务不是运行状态,直接返回false失败
二、若是mayInterruptIfRunning==true,中断运行中的任务,使用CAS操做将状态NEW-->INTERRUPTING,再调用runner.interrupt(),最后将状态置为INTERRUPTED
三、若是mayInterruptIfRunning==false,将任务置为CANCELLED取消状态
四、调用finishCompletion()
依次唤醒等待获取结果的线程,返回true取消成功
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class TestFuture { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(1); Task task = new Task(); //callable任务 Future<Integer> result = executor.submit(task); executor.shutdown(); try { Thread.sleep(1000); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println("主线程在执行任务"); try { System.out.println("task运行结果:"+result.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } System.out.println("全部任务执行完毕"); } static class Task implements Callable<Integer>{ @Override public Integer call() throws Exception { System.out.println("子线程在进行计算"); Thread.sleep(3000); int sum = 0; for(int i=0;i<100;i++) sum += i; return sum; } } }
运行结果:
子线程在进行计算 主线程在执行任务 task运行结果:4950 全部任务执行完毕
若是只是想控制在某些状况下能够将任务取消,可使用Future<?> future = executor.submit(runnable)
,这样返回结果确定为null,但可使用future.cancel()取消任务执行
一、有了Runnable,为何还须要Callable,它们的区别是什么?
Runnable和Callable都表示执行的任务,但不一样的是Runnable.run()方法没有返回值,Callable.call()有返回值
但其实线程在执行任务时仍是执行的Runnable.run()方法,因此在使用ThreadPoolExecutor.submit()时会将Callable封装为FutureTask,而FutureTask是Runnable和Future的实现类
因此在执行Callable的任务时,线程实际上是执行FutureTask这个Runnable的run()方法,其中封装了调用Callable.call()并返回结果的逻辑执行Runnable任务若是发生异常,主线程没法知晓;而执行Callable任务若是发生异常,在Future.get()时会抛出java.util.concurrent.ExecutionException,其中封装了真实异常
二、Future.get()是如何获取线程返回值的?
首先得益于Callable.call()方法定义了返回值,提交Callable任务后,Callable会被封装成FutureTask,其既能够做为Runnable被执行,也能够做为Future获取返回值,FutureTask.run()方法会调用Callable.call()中的任务代码
在任务执行完成前,若是主线程使用Future.get(),实际上是调用FutureTask.get(),其中会判断任务状态还没有结束,将主线程加入waiters等待链表,并挂起主线程
待任务执行结束后,FutureTask会唤醒全部等待获取返回值的线程,此时主线程的FutureTask.get()就会返回了因此,主线程和运行线程是经过FutureTask做为桥梁获取线程返回值的
三、Future.cancel()真的能取消任务的执行吗?
首先答案是“不必定”,根据JDK中的方法注释“Attempts to cancel execution of this task”,即尝试去取消执行的任务 若是任务正在执行,且调用cancel()时参数mayInterruptIfRunning传的是true,那么会对执行线程调用interrupt()方法 那么问题就变成了interrupt()方法能中断线程执行吗? interrupt()方法不会中断正在运行的线程。这一方法实际上完成的是在线程受到阻塞时抛出一个中断信号,这样线程就得以退出阻塞的状态。更确切的说,若是线程被Object.wait()、Thread.join()、Thread.sleep()等阻塞,那么它将接收到一个中断异常(InterruptedException),从而提前地终结被阻塞状态。 若是线程没有被阻塞,调用interrupt()将不起做用 那么即便线程正在阻塞状态,并抛出了InterruptedException,线程可否真的取消执行还要看代码中是否捕获了InterruptedException和有没有作相应的对中断标示的判断逻辑