所谓异步任务,就是不在当前线程中进行执行,而是另外起一个线程让其执行。那么当前线程若是想拿到其执行结果,该怎么办呢?java
若是咱们使用一个公共变量做为结果容器,两个线程共用这个值,那么应该是能够拿到结果的,可是这样一来,对业务就会形成侵入干扰了,由于你始终得考虑将这个共享变量传入到这个异步线程中去且要维持其安全性。安全
咱们知道,Future.get() 能够获取异步执行的结果,那么它是怎么作到的呢?app
要实现线程的数据交换,咱们按照进程间的通讯方式可知有: 管道、共享内存、Socket套接字。而同一个jvm的两个线程通讯,全部线程共享内存区域,则必定是经过共享内存再简单不过了。less
本文将以 ThreadPoolExecutor 线程池 来解释这个过程。异步
首先,若是想要获取一个线程的执行结果,须要调用 ThreadPoolExecutor.submit(Callable); 方法。而后该方法会返回一个 Future 对象,经过 Future.get(); 便可获取结果了。jvm
它具体是怎么实现的呢?ide
仅为返回了一个 Future<?> 的对象供下游调用!工具
// AbstractExecutorService public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); // 包装一层结果,RunnableFuture, 也实现了 Runnable 接口 // 实际上就是 FutureTask RunnableFuture<T> ftask = newTaskFor(task); // 而后交由 线程池进行调用任务了,即由 jvm 调用执行 Thread // 具体执行逻辑,在我以前的文章中也已经阐述,自行搜索 execute(ftask); // 最后,把包装对象返回便可 return ftask; } /** * Returns a {@code RunnableFuture} for the given callable task. * * @param callable the callable task being wrapped * @param <T> the type of the callable's result * @return a {@code RunnableFuture} which, when run, will call the * underlying callable and which, as a {@code Future}, will yield * the callable's result as its result and provide for * cancellation of the underlying task * @since 1.6 */ protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } // FutureTask 实例化 /** * Creates a {@code FutureTask} that will, upon running, execute the * given {@code Callable}. * * @param callable the callable task * @throws NullPointerException if the callable is null */ public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
经过上面的分析,咱们能够看到,异步线程的执行被包装成了 FutureTask, 而java的异步线程执行都是由jvm调用Thread.run()进行, 因此异步起点也应该从这里去找:oop
// FutureTask.run() public void run() { // 不容许屡次执行 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 直接调用 call() 方法,获取返回结果 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; // 执行异常,包装异常信息 setException(ex); } // 将结果设置到当前的 FutureTask 实例变量 outcome 中,这样当前线程就能够获取了 // 设置结果时,会将 state 同时变动 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } /** * Sets the result of this future to the given value unless * this future has already been set or has been cancelled. * * <p>This method is invoked internally by the {@link #run} method * upon successful completion of the computation. * * @param v the value */ protected void set(V v) { // 设置结果时,还不表明能够直接获取了,还有后续工做,因此设置为 COMPLETING 中间态 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state // 通知线程执行完成等后续工做 finishCompletion(); } } /** * Removes and signals all waiting threads, invokes done(), and * nulls out callable. */ private void finishCompletion() { // assert state > COMPLETING; // 外部看起来是一个 for, 实际上只会执行一次, 目的是为了保证内部的锁获取成功 // 若是有其余线程成功后, waiters也就会为null, 从而自身也一块儿退出了 for (WaitNode q; (q = waiters) != null;) { // 保证更新的线程安全性 // 只要锁获取成功,就会一次性更新完成,不会失败 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; // 依次唤醒等待的线程 if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; // 只有把全部 wait 线程都通知完后,才能够退出 if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } // 完成后钩子方法,默认为空,若是须要作特殊操做能够自行复写便可 done(); callable = null; // to reduce footprint } // 简单看一下异常信息的包装,与 正常结束方法相似,只是将 outcome 设置为了异常信息,完成状态设置为 EXCEPTIONAL /** * Causes this future to report an {@link ExecutionException} * with the given throwable as its cause, unless this future has * already been set or has been cancelled. * * <p>This method is invoked internally by the {@link #run} method * upon failure of the computation. * * @param t the cause of failure */ protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
在上面这些实现中,咱们也会有点迷糊,我干啥来了?ui
无论怎么样,你明白一点,全部的执行结果都被放到 FutureTask 的 outcome 变量中了,咱们若是想要知道结果,那么,只须要获取这个变量就能够了。
固然,也不可能这么简单了,起码你得知道何时获取该变量是合适的才行!接下来!
固然是用户调用 future.get() 获取了!
// Future.get() public V get() throws InterruptedException, ExecutionException { int s = state; // 只要状态值小于 COMPLETING, 就说明任务还未完成, 去等待完成 if (s <= COMPLETING) s = awaitDone(false, 0L); // 只要等待完成, 再去把结果取回便可 return report(s); } // 咱们先看一下结果的取回逻辑 report(), 果真不出意外的简单, 只管取 outcome 便可 /** * Returns result or throws exception for completed task. * * @param s completed state value */ @SuppressWarnings("unchecked") private V report(int s) throws ExecutionException { Object x = outcome; // 正常执行完成, 直接返回 if (s == NORMAL) return (V)x; // 此处会包含 CANCELLED/INTERRUPTING/INTERRUPTED if (s >= CANCELLED) throw new CancellationException(); // 业务异常则会被包装成 ExecutionException throw new ExecutionException((Throwable)x); } // 看到取结果这么简单,那么 等待结束的逻辑的呢?看起来好像没那么简单了 /** * Awaits completion or aborts on interrupt or timeout. * * @param timed true if use timed waits * @param nanos time to wait, if timed * @return state upon completion */ private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { // 中断则退出 if (Thread.interrupted()) { // 因 q 是链表的头,因此会移除全部的等待队列,即中断是对全部线程的 removeWaiter(q); throw new InterruptedException(); } int s = state; // 执行完成后,将线程置空便可,删除工做会有其余地方完成 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } // 正在处理结果,稍做等待便可 else if (s == COMPLETING) // cannot time out yet Thread.yield(); // 其余状况,先建立本身的等待标识,以便在下一次循环中进行入队等待 else if (q == null) q = new WaitNode(); // 进行一次入队等待,将 q 做为头节点 else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 限时的等待,等待超时后,直接返回当前状态便可 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } // 最长等待预约时间 LockSupport.parkNanos(this, nanos); } // 此处进行无限期等待,但当被唤醒时,必定有状态变动的时候,应该会在下一个周期结束循环 else LockSupport.park(this); } }
能够看到,等待逻辑仍是有点多的,毕竟场景多。至此,咱们已经彻底看到了一个,如何获取异步线程的执行结果实现了。总结下:
1. 实现Runnable接口,由jvm进行线程调用;
2. 包装 Callable.call()方法,带返回值,当线程被调起时,转给 call() 方法执行,并返回结果;
3. 将结果封装到当前future实例中,以备查;
4. 当用户调用get()方法时,保证状态完成状况下,最快速地返回结果;
Future.get()方法,一方面是为了获取异步线程的执行结果,另外一方面也作到了等待线程执行完成的效果。
而 Thread.join() 则纯粹是为了等待异步线程执行完成,那它们有什么殊途同归之妙吗?来看下
// Thread.join(), 经过 isAlive() 判断是否完成 /** * Waits for this thread to die. * * <p> An invocation of this method behaves in exactly the same * way as the invocation * * <blockquote> * {@linkplain #join(long) join}{@code (0)} * </blockquote> * * @throws InterruptedException * if any thread has interrupted the current thread. The * <i>interrupted status</i> of the current thread is * cleared when this exception is thrown. */ public final void join() throws InterruptedException { join(0); } /** * Waits at most {@code millis} milliseconds for this thread to * die. A timeout of {@code 0} means to wait forever. * * <p> This implementation uses a loop of {@code this.wait} calls * conditioned on {@code this.isAlive}. As a thread terminates the * {@code this.notifyAll} method is invoked. It is recommended that * applications not use {@code wait}, {@code notify}, or * {@code notifyAll} on {@code Thread} instances. * * @param millis * the time to wait in milliseconds * * @throws IllegalArgumentException * if the value of {@code millis} is negative * * @throws InterruptedException * if any thread has interrupted the current thread. The * <i>interrupted status</i> of the current thread is * cleared when this exception is thrown. */ public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0; if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); } // 无限期等待 if (millis == 0) { while (isAlive()) { // 这是个 native 方法,即由jvm进行控制 // Thread 任务执行完成后,将进行 notifyAll() // 同理下面的限时等待 wait(0); } } else { // 限时等待 while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } }
能够看到, Thread.join() 的等待逻辑是依赖于 jvm 的调度的, 经过 wait/notify 机制实现。与 Future.get() 相比,它是在 以后的,且没法获取结果。
Callable 其实就只是实现了一个 call() 方法而已,若是咱们只实现了 Runnable, 是否就拿不到返回值呢?并非,咱们能够直接指定返回值对象或者不指定,使用Runnable进行submit();
// 不指定返回值的 Runnable, 此处的返回值必定 void public Future<?> submit(Runnable task); // 指定返回值的 Runnable, 由 T 进行返回值接收 public <T> Future<T> submit(Runnable task, T result);
可是 Runnable 是怎么变成 Callable 的呢?其实就是一个 适配器模式的应用,咱们来看一下!
// AbstractExecutorService.submit() public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); // 明确返回值为 Void RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } // 一样使用 FutureTask 进行封装,只是调用了不一样的构造器 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } // FutureTask, 使用 Executors 工具类生成一个 callable, 屏蔽掉 Callable 与 Runnable 的差别 public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } // Executors 使用一个适合器类将 Runnable 封装成 Callable /** * Returns a {@link Callable} object that, when * called, runs the given task and returns the given result. This * can be useful when applying methods requiring a * {@code Callable} to an otherwise resultless action. * @param task the task to run * @param result the result to return * @param <T> the type of the result * @return a callable object * @throws NullPointerException if task null */ public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); } // 而 RunnableAdapter 也是很简单, 仅将 call() 转而调用 run() 方法便可 /** * A callable that runs given task and returns given result */ static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
简单不失优雅!这就是,大巧不工!
可是有一个点咱们能够看到,那就是 result 的获取,其实就是传入什么值,就返回值。而若是想在想要改变其结果,惟一的办法是使 result 变量 对 Runnable.run() 可见,从而在 run() 方法中改变其值。这就看你怎么用了!