转自:http://www.importnew.com/25286.htmlhtml
在Java中通常经过继承Thread类或者实现Runnable接口这两种方式来建立多线程,可是这两种方式都有个缺陷,就是不能在执行完成后获取执行的结果,所以Java 1.5以后提供了Callable和Future接口,经过它们就能够在任务执行完毕以后获得任务的执行结果。本文会简要的介绍使用方法,而后会从源代码角度分析下具体的实现原理。
本文以Java 1.7的代码进行分析。java
Callable接口多线程
对于须要执行的任务须要实现Callable接口,Callable接口定义以下:异步
public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
能够看到Callable是个泛型接口,泛型V就是要call()方法返回的类型。Callable接口和Runnable接口很像,均可以被另一个线程执行,可是正如前面所说的,Runnable不会返回数据也不能抛出异常。ide
Future接口函数
Future接口表明异步计算的结果,经过Future接口提供的方法能够查看异步计算是否执行完成,或者等待执行结果并获取执行结果,同时还能够取消执行。Future接口的定义以下:this
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
FutureTaskspa
Future只是一个接口,不能直接用来建立对象,FutureTask是Future的实现类,
FutureTask的继承图以下:线程
能够看到,FutureTask实现了RunnableFuture接口,则RunnableFuture接口继承了Runnable接口和Future接口,因此FutureTask既能当作一个Runnable直接被Thread执行,也能做为Future用来获得Callable的计算结果。指针
使用
FutureTask通常配合ExecutorService来使用,也能够直接经过Thread来使用。
package com.beautyboss.slogen.callback; import java.util.concurrent.*; /** * Author : Slogen * AddTime : 17/6/4 * Email : huangjian13@meituan.com */ public class CallDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { /** * 第一种方式:Future + ExecutorService * Task task = new Task(); * ExecutorService service = Executors.newCachedThreadPool(); * Future<Integer> future = service.submit(task1); * service.shutdown(); */ /** * 第二种方式: FutureTask + ExecutorService * ExecutorService executor = Executors.newCachedThreadPool(); * Task task = new Task(); * FutureTask<Integer> futureTask = new FutureTask<Integer>(task); * executor.submit(futureTask); * executor.shutdown(); */ /** * 第三种方式:FutureTask + Thread */ // 2. 新建FutureTask,须要一个实现了Callable接口的类的实例做为构造函数参数 FutureTask<Integer> futureTask = new FutureTask<Integer>(new Task()); // 3. 新建Thread对象并启动 Thread thread = new Thread(futureTask); thread.setName("Task thread"); thread.start(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Thread [" + Thread.currentThread().getName() + "] is running"); // 4. 调用isDone()判断任务是否结束 if(!futureTask.isDone()) { System.out.println("Task is not done"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } int result = 0; try { // 5. 调用get()方法获取任务结果,若是任务没有执行完成则阻塞等待 result = futureTask.get(); } catch (Exception e) { e.printStackTrace(); } System.out.println("result is " + result); } // 1. 继承Callable接口,实现call()方法,泛型参数为要返回的类型 static class Task implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println("Thread [" + Thread.currentThread().getName() + "] is running"); int result = 0; for(int i = 0; i < 100;++i) { result += i; } Thread.sleep(3000); return result; } } }
构造函数
先从FutureTask的构造函数看起,FutureTask有两个构造函数,其中一个以下:
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
这个构造函数会把传入的Callable变量保存在this.callable字段中,该字段定义为private Callable<V> callable;用来保存底层的调用,在被执行完成之后会指向null,接着会初始化state字段为NEW。state字段用来保存FutureTask内部的任务执行状态,一共有7中状态,每种状态及其对应的值以下:
private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
其中须要注意的是state是volatile类型的,也就是说只要有任何一个线程修改了这个变量,那么其余全部的线程都会知道最新的值。
为了后面更好的分析FutureTask的实现,这里有必要解释下各个状态。
有一点须要注意的是,全部值大于COMPLETING的状态都表示任务已经执行完成(任务正常执行完成,任务执行异常或者任务被取消)。
各个状态之间的可能转换关系以下图所示:
另一个构造函数以下,
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
这个构造函数会把传入的Runnable封装成一个Callable对象保存在callable字段中,同时若是任务执行成功的话就会返回传入的result。这种状况下若是不须要返回值的话能够传入一个null。
顺带看下Executors.callable()这个方法,这个方法的功能是把Runnable转换成Callable,代码以下:
public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); }
能够看到这里采用的是适配器模式,调用RunnableAdapter<T>(task, result)方法来适配,实现以下:
static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
这个适配器很简单,就是简单的实现了Callable接口,在call()实现中调用Runnable.run()方法,而后把传入的result做为任务的结果返回。
在new了一个FutureTask对象以后,接下来就是在另外一个线程中执行这个Task,不管是经过直接new一个Thread仍是经过线程池,执行的都是run()方法,接下来就看看run()方法的实现。
run()
run()方法实现以下:
public void run() { // 1. 状态若是不是NEW,说明任务或者已经执行过,或者已经被取消,直接返回 // 2. 状态若是是NEW,则尝试把当前执行线程保存在runner字段中 // 若是赋值失败则直接返回 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 3. 执行任务 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; // 4. 任务异常 setException(ex); } if (ran) // 4. 任务正常执行完毕 set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; // 5. 若是任务被中断,执行中断处理 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
run()方法首先会
若是任务执行发生异常,则调用setException()方法保存异常信息。setException()方法以下:
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
在setException()方法中
5. 若是任务成功执行则调用set()方法设置执行结果,该方法实现以下:
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
这个方法跟上面分析的setException()差很少,
发起任务线程跟执行任务线程一般状况下都不会是同一个线程,在任务执行线程执行任务的时候,任务发起线程能够查看任务执行状态、获取任务执行结果、取消任务等等操做,接下来分析下这些操做。
get()
任务发起线程能够调用get()方法来获取任务执行结果,若是此时任务已经执行完毕则会直接返回任务结果,若是任务还没执行完毕,则调用方会阻塞直到任务执行结束返回结果为止。get()方法实现以下:
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
get()方法实现比较简单,会
awaitDone()
当调用get()获取任务结果可是任务还没执行完成的时候,调用线程会调用awaitDone()方法进行阻塞等待,该方法定义以下:
private int awaitDone(boolean timed, long nanos) throws InterruptedException { // 计算等待截止时间 final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { // 1. 判断阻塞线程是否被中断,若是被中断则在等待队 // 列中删除该节点并抛出InterruptedException异常 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } // 2. 获取当前状态,若是状态大于COMPLETING // 说明任务已经结束(要么正常结束,要么异常结束,要么被取消) // 则把thread显示置空,并返回结果 int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } // 3. 若是状态处于中间状态COMPLETING // 表示任务已经结束可是任务执行线程还没来得及给outcome赋值 // 这个时候让出执行权让其余线程优先执行 else if (s == COMPLETING) // cannot time out yet Thread.yield(); // 4. 若是等待节点为空,则构造一个等待节点 else if (q == null) q = new WaitNode(); // 5. 若是尚未入队列,则把当前节点加入waiters首节点并替换原来waiters else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { // 若是须要等待特定时间,则先计算要等待的时间 // 若是已经超时,则删除对应节点并返回对应的状态 nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } // 6. 阻塞等待特定时间 LockSupport.parkNanos(this, nanos); } else // 6. 阻塞等待直到被其余线程唤醒 LockSupport.park(this); } }
awaitDone()中有个死循环,每一次循环都会
假设当前state=NEW且waiters为NULL,也就是说尚未任何一个线程调用get()获取执行结果,这个时候有两个线程threadA和threadB前后调用get()来获取执行结果。再假设这两个线程在加入阻塞队列进行阻塞等待以前任务都没有执行完成且threadA和threadB都没有被中断的状况下(由于若是threadA和threadB在进行阻塞等待结果以前任务就执行完成或线程自己被中断的话,awaitDone()就执行结束返回了),执行过程是这样的,以threadA为例:
在threadA和threadB都阻塞等待以后的waiters结果如图
cancel()方法会作下面几件事:
1 .判断任务当前执行状态,若是任务状态不为NEW,则说明任务或者已经执行完成,或者执行异常,不能被取消,直接返回false表示执行失败。
2. 判断须要中断任务执行线程,则
3. 若是不须要中断任务执行线程,直接把任务状态从NEW转化为CANCELLED。若是转化失败则返回false表示取消失败。这个转换过程对应上图中的四。
4. 调用finishCompletion()。
finishCompletion()
根据前面的分析,不论是任务执行异常仍是任务正常执行完毕,或者取消任务,最后都会调用finishCompletion()方法,该方法实现以下:
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
这个方法的实现比较简单,依次遍历waiters链表,唤醒节点中的线程,而后把callable置空。
被唤醒的线程会各自从awaitDone()方法中的LockSupport.park*()阻塞中返回,而后会进行新一轮的循环。在新一轮的循环中会返回执行结果(或者更确切的说是返回任务的状态)。
report()
report()方法用在get()方法中,做用是把不一样的任务状态映射成任务执行结果。实现以下:
private V report(int s) throws ExecutionException { Object x = outcome; // 1. 任务正常执行完成,返回任务执行结果 if (s == NORMAL) return (V)x; // 2. 任务被取消,抛出CancellationException异常 if (s >= CANCELLED) throw new CancellationException(); // 3. 其余状态,抛出执行异常ExecutionException throw new ExecutionException((Throwable)x); }
映射关系以下图所示:
若是任务处于NEW、COMPLETING和INTERRUPTING这三种状态的时候是执行不到report()方法的,因此不必对这三种状态进行转换。
get(long,TimeUnit)
带超时等待的获取任务结果,实现以下:
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && // 若是awaitDone()超时返回以后任务还没结束,则抛出异常 (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }
跟get()不一样点在于get(long,TimeUnit)会在awaitDone()超时返回以后抛出TimeoutException异常。
isCancelled()和isDone()
这两个方法分别用来判断任务是否被取消和任务是否执行完成,实现都比较简单,代码以下:
public boolean isCancelled() { return state >= CANCELLED; } public boolean isDone() { return state != NEW; }
根据前面的分析,这两个方法就很容易理解不用多作解释了,O(∩_∩)O。
总结下,其实FutureTask的实现仍是比较简单的,当用户实现Callable()接口定义好任务以后,把任务交给其余线程进行执行。FutureTask内部维护一个任务状态,任何操做都是围绕着这个状态进行,并随时更新任务状态。任务发起者调用get*()获取执行结果的时候,若是任务尚未执行完毕,则会把本身放入阻塞队列中而后进行阻塞等待。当任务执行完成以后,任务执行线程会依次唤醒阻塞等待的线程。调用cancel()取消任务的时候也只是简单的修改任务状态,若是须要中断任务执行线程的话则调用Thread.interrupt()中断任务执行线程。
有个值得关注的问题就是当任务还在执行的时候用户调用cancel(true)方法可否真正让任务中止执行呢?
在前面的分析中咱们直到,当调用cancel(true)方法的时候,实际执行仍是Thread.interrupt()方法,而interrupt()方法只是设置中断标志位,若是被中断的线程处于sleep()、wait()或者join()逻辑中则会抛出InterruptedException异常。
所以结论是:cancel(true)并不必定可以中止正在执行的异步任务。