从CompletableFuture到异步编程设计

从CompletableFuture到异步编程设计,笔者就分为2部分来分享CompletableFuture异步编程设计,前半部分总结下CompletableFuture使用实践,后半部分分享下CompletableFuture实现原理和异步编程设计机制。html

(ps:本文内容较多,请耐心阅读。若是读者了解CompletableFuture使用的话,能够直接看后半部份内容;若是熟悉CompletableFuture及异步编程设计的话,能够直接翻到文档末尾点个“推荐”就行了,由于你已经掌握了Java异步设计精髓了 :) ,如有不正确地方,感谢评论区指正交流~ )java

Java8新增了CompletableFuture类,该类实现了CompletionStage和Future接口,简化了Java异步编程能力,该类方法较多,其实套路只有一个,那就是任务执行完成以后执行“回调”。git

CompletableFuture使用实践

Java8新增的CompletableFuture 提供对异步计算的支持,能够经过回调的方式处理计算结果。CompletableFuture 类实现了CompletionStage和Future接口,因此还能够像以前使用Future那样使用CompletableFuture ,尽管已再也不推荐这样用了。github

CompletableFuture的建立编程

// 建立一个带result的CompletableFuture
CompletableFuture<String> future = CompletableFuture.completedFuture("result");
future.get();
 
// 默认建立的CompletableFuture是没有result的,这时调用future.get()会一直阻塞下去知道有result或者出现异常
future = new CompletableFuture<>();
try {
    future.get(1, TimeUnit.SECONDS);
} catch (Exception e) {
    // no care
}
 
// 给future填充一个result
future.complete("result");
assert "result".equals(future.get());
 
// 给future填充一个异常
future = new CompletableFuture<>();
future.completeExceptionally(new RuntimeException("exception"));
try {
    future.get();
} catch (Exception e) {
    assert "exception".equals(e.getCause().getMessage());
}

上面的示例是本身设置future的result,通常状况下咱们都是让其余线程或者线程池来执行future这些异步任务。除了直接建立CompletableFuture 对象外(不推荐这样使用),还可使用以下4个方法建立CompletableFuture 对象:api

// runAsync是Runnable任务,不带返回值的,若是入参有executor,则使用executor来执行异步任务
public static CompletableFuture<Void>  runAsync(Runnable runnable)
public static CompletableFuture<Void>  runAsync(Runnable runnable, Executor executor)
// supplyAsync是待返回结果的异步任务
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier, Executor executor)

// 使用示例
CompletableFuture.runAsync(() -> {
    System.out.println("hello world");
}, executor);
CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
});

若是入参不带executor,则默认使用ForkJoinPool.commonPool()做为执行异步任务的线程池;不然使用executor执行任务。oracle

CompletableFuture的完成动做app

public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)

// 使用示例
CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
}).whenCompleteAsync((result, e) -> {
    System.out.println(result + " " + e);
}).exceptionally((e) -> {
    System.out.println("exception " + e);
    return "exception";
});

action是Action类型,从上面能够看出它既能够处理正常返回值也能够处理异常,whenComplete会在任务执行完成后直接在当前线程内执行action动做,后缀带Async的方法是交给其余线程执行action(若是是线程池,执行action的可能和以前执行异步任务的是同一个线程),入参带executor的交给executor线程池来执行action动做,当发生异常时,会在当前线程内执行exceptionally方法。框架

除了用上面的whenComplete来执行完成动做以外,还可使用handle方法,该方法能够返回一个新的CompletableFuture的返回类型。异步

public <U> CompletableFuture<U>  handle(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U>  handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U>  handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)

// handle方法示例:
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
});
CompletableFuture<Integer> f2 = f1.handle((r, e) -> {
    System.out.println("handle");
    return 1;
});

 除了使用handle方法来执行CompletableFuture返回类型转换以外,还可使用thenApply方法,两者不一样的是前者会处理正常返回值和异常,所以能够屏蔽异常,避免继续抛出;然后者只能处理正常返回值,一旦有异常就会抛出。

public <U> CompletableFuture<U>  thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>  thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>  thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

// thenApply方法示例:
CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
}).thenApply((r) -> {
    System.out.println(r);
    return "aaa";
}).thenApply((r) -> {
    System.out.println(r);
    return 1;
});

注意,上面的handle、thenApply都是返回新的CompletableFuture类型,若是只是为了在CompletableFuture完成以后执行某些消费动做,而不返回新的CompletableFuture类型,则可使用thenAccept方法。

public CompletableFuture<Void>  thenAccept(Consumer<? super T> action)
public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action, Executor executor)

// thenAccept方法示例:
CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
}).thenAccept(r -> {
    System.out.println(r);
}).thenAccept(r -> {
    // 这里的r为Void(null)了
    System.out.println(r);
});

上面的handle、thenApply和thenAppept都是对上一个CompletableFuture执行完的结果进行某些操做。那么可不能够同时对2个CompletableFuture执行结果执行某些操做呢?其实也是能够的,使用thenAppeptBoth方法便可。注意,thenAppeptBoth和handle/thenApply/thenAppep的流程是同样的,只不过thenAppeptBoth中包含了另外一个CompletableFuture对象(注意,这里另外一个CompletableFuture对象的执行可并非上一个CompletableFuture执行结束才开始执行的)。

public <U> CompletableFuture<Void>   thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void>   thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
public <U> CompletableFuture<Void>   thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)
public     CompletableFuture<Void>  runAfterBoth(CompletionStage<?> other,  Runnable action)


// thenAcceptBoth方法示例:
CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
}).thenAcceptBoth(CompletableFuture.completedFuture("result2"), (r1, r2) -> {
    System.out.println(r1 + "-" + r2);
});

注意,thenAcceptBoth方法是没有返回值的(CompletableFuture<Void>),若是想用thenAcceptBoth这样的功能而且还带有返回值的CompletableFuture,那么thenCombine方法就该上场了。

public <U,V> CompletableFuture<V>    thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V>    thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V>    thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

// thenCombine方法示例
CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
}).thenCombine(CompletableFuture.completedFuture("result2"), (r1, r2) -> {
    System.out.println(r1 + "-" + r2);
    return r1 + "-" + r2;
});

thenAcceptBoth和runAfterBoth是当两个CompletableFuture都计算完成,而下面的方法是当任意一个CompletableFuture计算完成的时候就会执行。

public CompletableFuture<Void>  acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void>  acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void>  acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)
 
public <U> CompletableFuture<U>  applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U>  applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
public <U> CompletableFuture<U>  applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)

若是当想在多个CompletableFuture都计算完成或者多个CompletableFuture中的一个计算完成后执行某个动做,可以使用方法 allOf 和 anyOf。

public static CompletableFuture<Void>      allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object>    anyOf(CompletableFuture<?>... cfs)

若是当任务完成时并不想用CompletableFuture的结果,可使用thenRun方法来执行一个Runnable。

public CompletableFuture<Void>  thenRun(Runnable action)
public CompletableFuture<Void>  thenRunAsync(Runnable action)
public CompletableFuture<Void>  thenRunAsync(Runnable action, Executor executor)

以上方法都是在方法中返回一个值(或者不返回值),其实还能够返回一个CompletableFuture,是否是很像类的组合同样。

public <U> CompletableFuture<U>  thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U>  thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U>  thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)

// thenCompose方法示例:
CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
}).thenCompose(r -> {
    System.out.println(r);
    return CompletableFuture.supplyAsync(() -> {
        System.out.println(r + " result2");
        return r + " result2";
    });
});

// 上面的代码和下面的代码效果是同样的
CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world");
    return "result";
}).thenApply(r -> {
    System.out.println(r);
    return r;
}).thenApplyAsync(r -> {
    System.out.println(r + " result2");
    return r + " result2";
});

 CompletableFuture实现机制

先抛开 CompletableFuture 不谈,若是程序中使用了线程池,如何才能在某个任务执行完成以后执行某些动做呢?其实Java线程池自己已经提供了任务执行先后的hook方法(beforeExecute和afterExecute),以下:

public class ThreadPoolExecutor extends AbstractExecutorService {
    // ...
    protected void beforeExecute(Thread t, Runnable r) { }
    protected void afterExecute(Runnable r, Throwable t) { }
    // ...
}

咱们只须要自定义线程池继承ThreadPoolExecutor ,而后重写beforeExecute和afterExecute方法便可,在afterExecute里能够执行一些动做。关于重写ThreadPoolExecutor 的一个示例可点击ListenableThreadPoolExecutor查看。

那么CompletableFuture 的实现机制是怎样的呢?其实,和上面的所说的“afterExecute机制”是相似的(本质是同样的,回调机制),也是在任务执行完成后执行某些动做,以下代码:

CompletableFuture.supplyAsync(() -> {
    // callable任务
    System.out.println("hello world");
    return "result";
}).thenApply(r -> {
    // 任务完成以后的动做(回调方法),相似于ThreadPoolExecutor.afterExecute方法
    System.out.println(r);
    return r;
});

上面的示例代码其实主要完成了3个步骤,这3个步骤其实也是CompletableFuture的实现流程:

  1. 执行任务
  2. 添加任务完成以后的动做(回调方法)
  3. 执行回调

下面笔者就以上面的示例代码,按照这3个步骤依次进行分析,此时建议读者打开idea,写个demo进行debug,这里篇幅有限,笔者就只讲解主要流程代码,其余代码自行阅读便可 :) 

一、执行任务

 执行任务的主要逻辑就是 AsyncSupply.run 方法:

public void run() {
    CompletableFuture<T> d; Supplier<T> f;
    // dep是当前CompletableFuture,fn是任务执行逻辑
    if ((d = dep) != null && (f = fn) != null) {
        dep = null; fn = null;
        if (d.result == null) {
            try {
                // 1 任务执行 & result cas设置
                d.completeValue(f.get());
            } catch (Throwable ex) {
                // 1.1 result cas异常设置
                d.completeThrowable(ex);
            }
        }
        // 2 任务完成,可能涉及到回调的执行
        d.postComplete();
    }
}

二、添加回调

添加回调方法的流程是从 thenApply 开始的:

public <U> CompletableFuture<U> thenApply(
    Function<? super T,? extends U> fn) {
    return uniApplyStage(null, fn);
}
private <V> CompletableFuture<V> uniApplyStage(
    Executor e, Function<? super T,? extends V> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<V> d =  new CompletableFuture<V>();
    if (e != null || !d.uniApply(this, f, null)) {
        // 当上一个CompletableFuture未完成时,将该CompletableFuture添加
        // 到上一个CompletableFuture的statck中
        UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
        push(c);
        c.tryFire(SYNC);
    }
    return d;
}

CompletableFuture.statck 是 UniCompletion 类型的,该类型以下:

UniCompletion<T,V> {
    volatile Completion next;      // Treiber stack link
    Executor executor;                 // executor to use (null if none)
    CompletableFuture<V> dep;          // the dependent to complete
    CompletableFuture<T> src;          // source for action
}

3、执行回调

执行回调是从CompletableFuture.postComplete 开始的:

final void postComplete() {
    /*
     * On each step, variable f holds current dependents to pop
     * and run.  It is extended along only one path at a time,
     * pushing others to avoid unbounded recursion.
     */
    CompletableFuture<?> f = this; Completion h;
    while ((h = f.stack) != null ||
           (f != this && (h = (f = this).stack) != null)) {
        CompletableFuture<?> d; Completion t;
        // cas设置h.next到当前CompletableFuture.statck
        if (f.casStack(h, t = h.next)) {
            if (t != null) {
                if (f != this) {
                    pushStack(h);
                    continue;
                }
                h.next = null;    // detach
            }
            f = (d = h.tryFire(NESTED)) == null ? this : d;
        }
    }
}

// UniAccept
final CompletableFuture<Void> tryFire(int mode) {
    CompletableFuture<Void> d; CompletableFuture<T> a;
    if ((d = dep) == null ||
        !d.uniAccept(a = src, fn, mode > 0 ? null : this)) // 执行回调
        return null;
    dep = null; src = null; fn = null;
    // 返回当前CompletableFuture 或者 递归调用postComplete
    return d.postFire(a, mode);
}

看完上面3个步骤,是否是还不太清楚多个CompletableFuture之间的执行流程呢,说实话笔者第一次看的时候也是这样的 :(,下面咱们换个例子并给出图示来看:

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("hello world f1");
    sleep(1); // TimeUnit.SECONDS.sleep(1)
    return "result f1";
});
CompletableFuture<String> f2 = f1.thenApply(r -> {
    System.out.println(r);
    sleep(1);
    return "f2";
});
CompletableFuture<String> f3 = f2.thenApply(r -> {
    System.out.println(r);
    sleep(1);
    return "f2";
});

CompletableFuture<String> f4 = f1.thenApply(r -> {
    System.out.println(r);
    sleep(1);
    return "f2";
});
CompletableFuture<String> f5 = f4.thenApply(r -> {
    System.out.println(r);
    sleep(1);
    return "f2";
});
CompletableFuture<String> f6 = f5.thenApply(r -> {
    System.out.println(r);
    sleep(1);
    return "f2";
});

上面代码对应的CompletableFuture及其Completion关系以下图:

结合上图和postComplete流程,能够看出执行回调的顺序是:f1 -> f4 -> f5 -> f6 -> f2 -> f3。(若是这里没看懂,能够回过头再看下postComplete方法的源码~)

异步编程设计

分析完了CompletableFuture,相信你们都已经对CompletableFuture的设计与实现有了进一步的理解。那么对于异步编程有哪些实际应用场景,其本质究竟是什么呢?

异步处理的本质其实就是回调(系统层借助于指针来实现,准确来讲是函数指针),用户提供一个回调方法,回调函数不是由该函数的实现方直接调用,而是在特定的事件或条件发生时由另外的一方调用的,用于对该事件或条件进行响应。从“宏观”来看,CompletableFuture的实现其实很简单,就是回调,即在任务执行完成以后进行回调,回调中可能涉及到其余操做,好比下一个回调或者执行下一个任务。

异步编程在应用场景较多,不少语言,好比Node.js,采用回调的方式实现异步编程。Java的一些框架,好比Netty,本身扩展了Java的 Future接口,提供了addListener等多个扩展方法:

ServerBootstrap boot = new ServerBootstrap();
boot.group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .localAddress(8080)
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ch.pipeline().addLast(new EchoHandler());
        }
    });

dubbo中consumer对于RPC response的处理是基于回调机制的,Google guava也提供了通用的扩展Future:ListenableFuture、SettableFuture 以及辅助类Futures等,方便异步编程。

final String name = ...;
inFlight.add(name);
ListenableFuture<Result> future = service.query(name);
future.addListener(new Runnable() {
  public void run() {
    processedCount.incrementAndGet();
    inFlight.remove(name);
    lastProcessed.set(name);
    logger.info("Done with {0}", name);
  }
}, executor);

 

参考资料:

一、Java CompletableFuture 详解

二、https://www.cnblogs.com/aniao/p/aniao_cf.html

三、https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html

相关文章
相关标签/搜索