FutureTask可用于异步获取执行结果或取消执行任务的场景。经过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,以后能够在外部经过FutureTask的get方法异步获取执行结果,所以,FutureTask很是适合用于耗时的计算,主线程能够在完成本身的任务后,再去获取结果。另外,FutureTask还能够确保即便调用了屡次run方法,它都只会执行一次Runnable或者Callable任务,或者经过cancel取消FutureTask的执行等。java
类图结构以下所示:异步
线程池使用 FutureTask 时候须要注意的一点事,FutureTask 使用不当可能会形成调用线程一直阻塞,如何避免?ide
线程池使用 FutureTask 的时候若是拒绝策略设置为了 DiscardPolicy
和DiscardOldestPolicy
而且在被拒绝的任务的 Future 对象上调用无参 get 方法那么调用线程会一直被阻塞。函数
下面先经过一个简单的例子来复现问题,代码以下:this
public class FutureTest { //(1)线程池单个线程,线程池队列元素个数为1 private final static ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(1),new ThreadPoolExecutor.DiscardPolicy()); public static void main(String[] args) throws Exception { //(2)添加任务one Future futureOne = executorService.submit(new Runnable() { @Override public void run() { System.out.println("start runable one"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } }); //(3)添加任务two Future futureTwo = executorService.submit(new Runnable() { @Override public void run() { System.out.println("start runable two"); } }); //(4)添加任务three Future futureThree=null; try { futureThree = executorService.submit(new Runnable() { @Override public void run() { System.out.println("start runable three"); } }); } catch (Exception e) { System.out.println(e.getLocalizedMessage()); } System.out.println("task one " + futureOne.get());//(5)等待任务one执行完毕 System.out.println("task two " + futureTwo.get());//(6)等待任务two执行完毕 System.out.println("task three " + (futureThree==null?null:futureThree.get()));// (7)等待任务three执行完毕 executorService.shutdown();//(8)关闭线程池,阻塞直到全部任务执行完毕 }
运行结果以下:spa
代码 (1) 建立了一个单线程而且队列元素个数为 1 的线程池,而且拒绝策略设置为了DiscardPolicy
线程
代码(2)向线程池提交了一个任务 one,那么这个任务会使用惟一的一个线程进行执行,任务在打印 start runable one
后会阻塞该线程 5s.3d
代码(3)向线程池提交了一个任务 two,这时候会把任务 two 放入到阻塞队列code
代码(4)向线程池提交任务 three,因为队列已经满了则会触发拒绝策略丢弃任务 three, 从执行结果看在任务 one 阻塞的 5s 内,主线程执行到了代码 (5) 等待任务 one 执行完毕,当任务 one 执行完毕后代码(5)返回,主线程打印出 task one null。任务 one 执行完成后线程池的惟一线程会去队列里面取出任务 two 并执行因此输出 start runable two 而后代码(6)会返回,这时候主线程输出 task two null,而后执行代码(7)等待任务 three 执行完毕,从执行结果看代码(7)会一直阻塞不会返回,至此问题产生,若是把拒绝策略修改成 DiscardOldestPolicy 也会存在有一个任务的 get 方法一直阻塞只是如今是任务 two 被阻塞。可是若是拒绝策略设置为默认的 AbortPolicy 则会正常返回,而且会输出以下结果:对象
要分析这个问题须要看下线程池的 submit 方法里面作了什么,submit 方法源码以下:
public Future<?> submit(Runnable task) { ... //(1)装饰Runnable为Future对象 RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); //(6)返回future对象 return ftask; } protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); }
public void execute(Runnable command) { ... //(2) 若是线程个数消息核心线程数则新增处理线程处理 int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //(3)若是当前线程个数已经达到核心线程数则任务放入队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } //(4)尝试新增处理线程进行处理 else if (!addWorker(command, false)) reject(command);//(5)新增失败则调用拒绝策略 }
代码(1)装饰 Runnable 为 FutureTask 对象,而后调用线程池的 execute 方法。
代码 (2) 若是线程个数消息核心线程数则新增处理线程处理
代码(3)若是当前线程个数已经达到核心线程数则任务放入队列
代码(4)尝试新增处理线程进行处理,失败则进行代码(5),否者直接使用新线程处理
代码(5)执行具体拒绝策略,从这里也能够看出拒绝策略执行是使用的业务线程。
因此要分析上面例子中问题所在只须要看步骤(5)对被拒绝任务的影响,这里先看下拒绝策略 DiscardPolicy 的源码,以下:
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }
}
可知拒绝策略 rejectedExecution 方法里面什么都没作,因此代码(4)调用 submit 后会返回一个 future 对象,这里有必要在从新说 future 是有状态的,FutureTask 内部有一个state用来展现任务的状态,而且是volatile修饰的,future 的状态枚举值以下:
/** Possible state transitions: * NEW -> COMPLETING -> NORMAL 正常的状态转移 * NEW -> COMPLETING -> EXCEPTIONAL 异常 * NEW -> CANCELLED 取消 * NEW -> INTERRUPTING -> INTERRUPTED 中断 */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
在代码(1)的时候使用 newTaskFor 方法转换 Runnable 任务为 FutureTask,而 FutureTask 的构造函数里面设置的状态就是 New。FutureTask的构造函数源码以下:
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
把FutureTask提交到线程池或者线程执行start时候会调用run方法,源码以下:
public void run() { //若是当前不是new状态,或者当前cas设置当前线程失败则返回,只有一个线程能够成功。 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { //当前状态为new 则调用任务的call方法执行任务 Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex);完成NEW -> COMPLETING -> EXCEPTIONAL 状态转移 } //执行任务成功则保存结果更新状态,unpark全部等待线程。 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } protected void set(V v) { //状态从new->COMPLETING if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; //状态从COMPLETING-》NORMAL UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state //unpark全部等待线程。 finishCompletion(); } }
因此使用 DiscardPolicy 策略提交任务后返回了一个状态值为NEW的future对象。那么咱们下面就要看下当future的无参get()方法的时候,future变为何状态才会返回,这时候就要看一下FutureTask的get方法的源码,源码以下:
public V get() throws InterruptedException, ExecutionException { int s = state; //当状态值<=COMPLETING时候须要等待,否者调用report返回 if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { //若是被中断,则抛异常 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } //组建单列表 int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); //超时则返回 if (nanos <= 0L) { removeWaiter(q); return state; } //否者设置park超时时间 LockSupport.parkNanos(this, nanos); } else //直接挂起当前线程 LockSupport.park(this); } } private V report(int s) throws ExecutionException { Object x = outcome; //状态值为NORMAL正常返回 if (s == NORMAL) return (V)x; //状态值大于等于CANCELLED则抛异常 if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
也就是说当 future 的状态 > COMPLETING 时候调用 get 方法才会返回,而明显 DiscardPolicy 策略在拒绝元素的时候并无设置该 future 的状态,后面也没有其余机会能够设置该 future 的状态,因此 future 的状态一直是 NEW,因此一直不会返回,同理 DiscardOldestPolicy 策略也是这样的问题,最老的任务被淘汰时候没有设置被淘汰任务对于 future 的状态。、
在submit任务后还能够调用futuretask的cancel来取消任务:
public boolean cancel(boolean mayInterruptIfRunning) { //只有任务是new的才能取消 if (state != NEW) return false; //运行时容许中断 if (mayInterruptIfRunning) { //完成new->INTERRUPTING if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING)) return false; Thread t = runner; if (t != null) t.interrupt(); //完成INTERRUPTING->INTERRUPTED UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state } //不容许中断则直接new->CANCELLED else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED)) return false; finishCompletion(); return true; }
那么默认的 AbortPolicy 策略为啥没问题呢?
也就是说当 future 的状态 > COMPLETING 时候调用 get 方法才会返回,而明显 DiscardPolicy 策略在拒绝元素的时候并无设置该 future 的状态,后面也没有其余机会能够设置该 future 的状态,因此 future 的状态一直是 NEW,因此一直不会返回,同理 DiscardOldestPolicy 策略也是这样的问题,最老的任务被淘汰时候没有设置被淘汰任务对于 future 的状态。
因此当使用 Future 的时候,尽可能使用带超时时间的 get 方法,这样即便使用了 DiscardPolicy 拒绝策略也不至于一直等待,等待超时时间到了会自动返回的,若是非要使用不带参数的 get 方法则能够重写 DiscardPolicy 的拒绝策略在执行策略时候设置该 Future 的状态大于 COMPLETING 便可,可是查看 FutureTask 提供的方法发现只有 cancel 方法是 public 的而且能够设置 FutureTask 的状态大于 COMPLETING,重写拒绝策略具体代码能够以下:
/** * Created by cong on 2018/7/13. */ public class MyRejectedExecutionHandler implements RejectedExecutionHandler { public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) { if (!threadPoolExecutor.isShutdown()) { if(null != runnable && runnable instanceof FutureTask){ ((FutureTask) runnable).cancel(true); } } } }
使用这个策略时候因为从 report 方法知道在 cancel 的任务上调用 get() 方法会抛出异常因此代码(7)须要使用 try-catch 捕获异常代码(7)修改成以下:
package com.hjc; import java.util.concurrent.*; /** * Created by cong on 2018/7/13. */ public class FutureTest { //(1)线程池单个线程,线程池队列元素个数为1 private final static ThreadPoolExecutor executorService = new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(1), new MyRejectedExecutionHandler()); public static void main(String[] args) throws Exception { //(2)添加任务one Future futureOne = executorService.submit(new Runnable() { public void run() { System.out.println("start runable one"); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } }); //(3)添加任务two Future futureTwo = executorService.submit(new Runnable() { public void run() { System.out.println("start runable two"); } }); //(4)添加任务three Future futureThree = null; try { futureThree = executorService.submit(new Runnable() { public void run() { System.out.println("start runable three"); } }); } catch (Exception e) { System.out.println(e.getLocalizedMessage()); } System.out.println("task one " + futureOne.get());//(5)等待任务one执行完毕 System.out.println("task two " + futureTwo.get());//(6)等待任务two执行完毕 try{ System.out.println("task three " + (futureThree==null?null:futureThree.get()));// (7)等待任务three }catch(Exception e){ System.out.println(e.getLocalizedMessage()); } executorService.shutdown();//(8)关闭线程池,阻塞直到全部任务执行完毕 } }
运行结果以下:
固然这相比正常状况下多了一个异常捕获,其实最好的状况是重写拒绝策略时候设置 FutureTask 的状态为 NORMAL,可是这须要重写 FutureTask 方法了,由于 FutureTask 并无提供接口进行设置。