可取消的异步计算。该类提供了 Future的基本实现,其中包括启动和取消计算的方法,查询计算是否完成以及检索计算结果的方法。只有在计算完成后才能检索结果;若是计算尚未完成,{getcode}方法将会被阻塞。一旦计算完成,计算不能被从新启动或取消(除非计算是使用调用的runAndReset()。java
该类实现自RunableFuture接口,其中RunableFuture接口又继承自Runable和Future。因此能够理解为:FutureTask是一个能够计算Future结果的一个Future实现,git
因为FutureTask间接或直接实现了Runable和Future接口,因此其具备以下特征:github
能够像一个普通的任务同样,使用线程池提交一个任务并执行。数据库
ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(new FutureTask<Integer>(new Callable<Integer>() { @Override public Integer call() throws Exception { return 100; } }));
能够像一个普通的任务同样,使用Thread来执行,但能够异步获取结果。缓存
FutureTask futureTask = new FutureTask<Integer>(new Callable<Integer>() { @Override public Integer call() throws Exception { return 100; } }); new Thread(futureTask).start(); futureTask.get();
考虑一种使用Cache的场景:通常状况下,对于热点数据咱们都会使用cache保存数据,只有当cache失效了,才会进行耗时的网络调用或者数据库查询。可是当cache失效时,同时有多个该key的查询,那么在短期内可能会有多个相同的耗时查询,瞬间对系统性能会有必定的损失,为了解决这种状况能够采起缓存FutureTask的方式解决:网络
思路借鉴:https://github.com/javacreed/...异步
//获取缓存的客户端 public class CacheClient { public static <T> T getCache(int id){ return null; } } //Service层逻辑 public class CacheService { private static ConcurrentMap<Integer,FutureTask<User>> cacheFuture = new ConcurrentHashMap<>(); public User getUserInfo(int id) { Future<User> future = createFutureIfAbsent(id); try { return future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return null; } private Future<User> createFutureIfAbsent(final int id) { Future<User> future = cacheFuture.get(id); if (future == null) { FutureTask<User> futureTask = new FutureTask<User>(new Callable<User>() { @Override public User call() throws Exception { return CacheClient.getCache(id); } }); future = cacheFuture.putIfAbsent(id, futureTask); if (future == null) { future = futureTask; futureTask.run(); } } return future; } public class User { private int id; private String name; private String age; 。。。 } }
FutureTask做为一个可运行的Future,其运行过程当中存在状态的迁移过程,FutureTask的运行状态有:ide
状态跃迁:性能
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
FutureTask有两个构造方法,虽然两个构造方法的入参略有不一样,可是在底层执行时都是按照Callback任务来构建的。并在此过程初始化当前的任务状态为:NEWthis
下面将从核心方法开始,逐渐分析FutureTask的原理:
run():任务执行
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
该方法的逻辑很简单,主要完成了以下任务:
1.首先判断任务的有效性:1)该任务的状态是否为初始状态:NEW,2)把运行任务的线程设置给成员变量runner。
2.执行任务。
3.根据执行结果设置状态。
get()/get(long timeout, TimeUnit unit)
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }```
该方法的逻辑更简单:首先判断当前的状态,而后就会调用awaitDone()方法等待结果,当等待超时就会抛出TimeOutException,不然调用report()将结果报告出去。下面看看等待结果是如何处理的:
private int awaitDone(boolean timed, long nanos) throws InterruptedException { //首先计算出该任务的最终结束时间 final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { //判断当前线程是否被中断 if (Thread.interrupted()) { //从等待队列中删除该线程的等待节点 removeWaiter(q); throw new InterruptedException(); } int s = state; //若是状态>COMPLETING,说明任务已经结束了,无论是否正常结束,都是能够返回的 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } //若是当前状态仍是COMPLETING,说明结果来没有返回呢,那就让出CPU else if (s == COMPLETING) // cannot time out yet Thread.yield(); //若是当前任务尚未生成等待节点,那么就建立一个以当前线程的等待节点。 else if (q == null) q = new WaitNode(); else if (!queued) //采用头插法构建等待队列 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); //任务执行超时了,那么就删除等待队列 if (nanos <= 0L) { removeWaiter(q); return state; } //尚未超时,那么就将当前线程park LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); }
该方法虽然篇幅很大,可是完成的任务也是很简单的,主要能够总结以下:
NOTE:为何要使用这个waiter
?[单独文章分析:]