计算机程序的思惟逻辑 (94) - 组合式异步编程

本系列文章经补充和完善,已修订整理成书《Java编程的逻辑》(马俊昌著),由机械工业出版社华章分社出版,于2018年1月上市热销,读者好评如潮!各大网店和书店有售,欢迎购买:京东自营连接 html

前面两节讨论了Java 8中的函数式数据处理,那是对38节55节介绍的容器类的加强,它能够将对集合数据的多个操做以流水线的方式组合在一块儿。本节继续讨论Java 8的新功能,主要是一个新的类CompletableFuture,它是对65节83节介绍的并发编程的加强,它能够方便地将多个有必定依赖关系的异步任务以流水线的方式组合在一块儿,大大简化多异步任务的开发。java

以前介绍了那么多并发编程的内容,还有什么问题不能解决?CompletableFuture到底能解决什么问题?与以前介绍的内容有什么关系?具体如何使用?基本原理是什么?本节进行详细讨论,咱们先来看它要解决的问题。git

异步任务管理

在现代软件开发中,系统功能愈来愈复杂,管理复杂度的方法就是分而治之,系统的不少功能可能会被切分为小的服务,对外提供Web API,单独开发、部署和维护。好比,在一个电商系统中,可能有专门的产品服务、订单服务、用户服务、推荐服务、优惠服务、搜索服务等,在对外具体展现一个页面时,可能要调用多个服务,而多个调用之间可能还有必定的依赖,好比,显示一个产品页面,须要调用产品服务,也可能须要调用推荐服务获取与该产品有关的其余推荐,还可能须要调用优惠服务获取该产品相关的促销优惠,而为了调用优惠服务,可能须要先调用用户服务以获取用户的会员级别。github

另外,现代软件常常依赖不少第三方的服务,好比地图服务、短信服务、天气服务、汇率服务等,在实现一个具体功能时,可能要访问多个这样的服务,这些访问之间可能存在着必定的依赖关系。编程

为了提升性能,充分利用系统资源,这些对外部服务的调用通常都应该是异步的、尽可能并发的。咱们在77节介绍过异步任务执行服务,使用ExecutorService能够方便地提交单个独立的异步任务,能够方便地在须要的时候经过Future接口获取异步任务的结果,但对于多个尤为是有必定依赖关系的异步任务,这种支持就不够了。swift

因而,就有了CompletableFuture,它是一个具体的类,实现了两个接口,一个是Future,另外一个是CompletionStage,Future表示异步任务的结果,而CompletionStage字面意思是完成阶段,多个CompletionStage能够以流水线的方式组合起来,对于其中一个CompletionStage,它有一个计算任务,但可能须要等待其余一个或多个阶段完成才能开始,它完成后,可能会触发其余阶段开始运行。CompletionStage提供了大量方法,使用它们,能够方便地响应任务事件,构建任务流水线,实现组合式异步编程。bash

具体怎么使用呢?下面咱们会逐步说明,CompletableFuture也是一个Future,咱们先来看与Future相似的地方。微信

与Future/FutureTask对比

基本的任务执行服务

咱们先经过示例来简要回顾下异步任务执行服务和Future,在异步任务执行服务中,用Callable或Runnable表示任务,以Callable为例,一个模拟的外部任务为:并发

private static Random rnd = new Random();

static int delayRandom(int min, int max) {
    int milli = max > min ? rnd.nextInt(max - min) : 0;
    try {
        Thread.sleep(min + milli);
    } catch (InterruptedException e) {
    }
    return milli;
}

static Callable<Integer> externalTask = () -> {
    int time = delayRandom(20, 2000);
    return time;
};
复制代码

externalTask表示外部任务,咱们使用了Lambda表达式,不熟悉能够参看91节,delayRandom用于模拟延时。app

假定有一个异步任务执行服务,其代码为:

private static ExecutorService executor =
        Executors.newFixedThreadPool(10);
复制代码

经过任务执行服务调用外部服务,通常返回Future,表示异步结果,示例代码为:

public static Future<Integer> callExternalService(){
    return executor.submit(externalTask);
}
复制代码

在主程序中,结合异步任务和本地调用的示例代码为:

public static void master() {
    // 执行异步任务
    Future<Integer> asyncRet = callExternalService();

    // 执行其余任务 ...

    // 获取异步任务的结果,处理可能的异常
    try {
        Integer ret = asyncRet.get();
        System.out.println(ret);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
}
复制代码

基本的CompletableFuture

使用CompletableFuture能够实现相似功能,不过,它不支持使用Callable表示异步任务,而支持Runnable和Supplier,Supplier替代Callable表示有返回结果的异步任务,与Callale的区别是,它不能抛出受检异常,若是会发生异常,能够抛出运行时异常。

使用Supplier表示异步任务,代码与Callable相似,替换变量类型便可,即:

static Supplier<Integer> externalTask = () -> {
    int time = delayRandom(20, 2000);
    return time;
};
复制代码

使用CompletableFuture调用外部服务的代码能够为:

public static Future<Integer> callExternalService(){
    return CompletableFuture.supplyAsync(externalTask, executor);
}
复制代码

supplyAsync是一个静态方法,其定义为:

public static <U> CompletableFuture<U> supplyAsync( Supplier<U> supplier, Executor executor) 复制代码

它接受两个参数supplier和executor,内部,它使用executor执行supplier表示的任务,返回一个CompletableFuture,调用后,任务被异步执行,这个方法当即返回。

supplyAsync还有一个不带executor参数的方法:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) 复制代码

没有executor,任务被谁执行呢?与系统环境和配置有关,通常来讲,若是可用的CPU核数大于2,会使用Java 7引入的Fork/Join任务执行服务,即ForkJoinPool.commonPool(),该任务执行服务背后的工做线程数通常为CPU核数减1,即Runtime.getRuntime().availableProcessors()-1,不然,会使用ThreadPerTaskExecutor,它会为每一个任务建立一个线程。

对于CPU密集型的运算任务,使用Fork/Join任务执行服务是合适的,但对于通常的调用外部服务的异步任务,Fork/Join多是不合适的,由于它的并行度比较低,可能会让本能够并发的多任务串行运行,这时,应该提供Executor参数。

后面咱们还会看到不少以Async结尾命名的方法,通常都有两个版本,一个带Executor参数,另外一个不带,其含义是相同的,就再也不重复介绍了。

对于类型为Runnable的任务,构建CompletableFuture的方法为:

public static CompletableFuture<Void> runAsync( Runnable runnable) public static CompletableFuture<Void> runAsync( Runnable runnable, Executor executor) 复制代码

它与supplyAsync是相似的,具体就不赘述了。

CompletableFuture对Future的基本加强

Future有的接口,CompletableFuture都是支持的,不过,CompletableFuture还有一些额外的相关方法,好比:

public T join() public boolean isCompletedExceptionally() public T getNow(T valueIfAbsent) 复制代码

join与get方法相似,也会等待任务结束,但它不会抛出受检异常,若是任务异常结束了,join会将异常包装为运行时异常CompletionException抛出。

Future有isDone方法检查任务是否结束了,但不知道任务是正常结束仍是异常结束,isCompletedExceptionally方法能够判断任务是不是异常结束了。

getNow与join相似,区别是,若是任务尚未结束,它不会等待,而是会返回传入的参数valueIfAbsent。

进一步理解Future/CompletableFuture

前面例子都使用了任务执行服务,其实,任务执行服务与异步结果Future不是绑在一块儿的,能够本身建立线程返回异步结果,为进一步理解,咱们看些示例。

使用FutureTask调用外部服务,代码能够为:

public static Future<Integer> callExternalService() {
    FutureTask<Integer> future = new FutureTask<>(externalTask);
    new Thread() {
        public void run() {
            future.run();
        }
    }.start();
    return future;
}
复制代码

内部本身建立了一个线程,线程调用FutureTask的run方法,咱们在77节分析过FutureTask的代码,run方法会调用externalTask的call方法,并保存结果或碰到的异常,唤醒等待结果的线程。

使用CompletableFuture,也能够直接建立线程,并返回异步结果,代码能够为:

public static Future<Integer> callExternalService() {
    CompletableFuture<Integer> future = new CompletableFuture<>();
    new Thread() {
        public void run() {
            try {
                future.complete(externalTask.get());
            } catch (Exception e) {
                future.completeExceptionally(e);
            }
        }
    }.start();
    return future;
}
复制代码

这里使用了CompletableFuture的两个方法:

public boolean complete(T value) public boolean completeExceptionally(Throwable ex) 复制代码

这两个方法显式设置任务的状态和结果,complete设置任务成功完成,结果为value,completeExceptionally设置任务异常结束,异常为ex。Future接口没有对应的方法,FutureTask有相关方法但不是public的(是protected)。设置完后,它们都会触发其余依赖它们的CompletionStage。具体会触发什么呢?咱们接下来再看。

响应结果或异常

使用Future,咱们只能经过get获取结果,而get可能会须要阻塞等待,而经过CompletionStage,能够注册回调函数,当任务完成或异常结束时自动触发执行,有两类注册方法,whenComplete和handle,咱们分别来看下。

whenComplete

whenComplete的声明为:

public CompletableFuture<T> whenComplete( BiConsumer<? super T, ? super Throwable> action) 复制代码

参数action表示回调函数,无论前一个阶段是正常结束仍是异常结束,它都会被调用,函数类型是BiConsumer,接受两个参数,第一个参数是正常结束时的结果值,第二个参数是异常结束时的异常,BiConsumer没有返回值。whenComplete的返回值仍是CompletableFuture,它不会改变原阶段的结果,还能够在其上继续调用其余函数。看个简单的示例:

CompletableFuture.supplyAsync(externalTask).whenComplete((result, ex) -> {
    if (result != null) {
        System.out.println(result);
    }
    if (ex != null) {
        ex.printStackTrace();
    }
}).join();
复制代码

result表示前一个阶段的结果,ex表示异常,只可能有一个不为null。

whenComplete注册的函数具体由谁执行呢?通常而言,这要看注册时任务的状态,若是注册时任务尚未结束,则注册的函数会由执行任务的线程执行,在该线程执行完任务后执行注册的函数,若是注册时任务已经结束了,则由当前线程(即调用注册函数的线程)执行。

若是不但愿当前线程执行,避免可能的同步阻塞,可使用其余两个异步注册方法:

public CompletableFuture<T> whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action) public CompletableFuture<T> whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action, Executor executor) 复制代码

与前面介绍的以Async结尾的方法同样,对第一个方法,注册函数action会由默认的任务执行服务(即ForkJoinPool.commonPool()或ThreadPerTaskExecutor执行),对第二个方法,会由参数中指定的executor执行。

handle

whenComplete只是注册回调函数,不改变结果,它返回了一个CompletableFuture,但这个CompletableFuture的结果与调用它的CompletableFuture是同样的,还有一个相似的注册方法handle,其声明为:

public <U> CompletableFuture<U> handle( BiFunction<? super T, Throwable, ? extends U> fn) 复制代码

回调函数是一个BiFunction,也是接受两个参数,一个是正常结果,另外一个是异常,但BiFunction有返回值,在handle返回的CompletableFuture中,结果会被BiFunction的返回值替代,即便原来有异常,也会被覆盖,好比:

String ret =
    CompletableFuture.supplyAsync(()->{
        throw new RuntimeException("test");
    }).handle((result, ex)->{
        return "hello";
    }).join();
System.out.println(ret);
复制代码

输出为"hello"。异步任务抛出了异常,但经过handle方法,改变告终果。

与whenComplete相似,handle也有对应的异步注册方法handleAsync,具体咱们就不探讨了。

exceptionally

whenComplete和handle都是既响应正常完成也响应异常,若是只对异常感兴趣,可使用exceptionally,其声明为:

public CompletableFuture<T> exceptionally( Function<Throwable, ? extends T> fn) 复制代码

它注册的回调函数是Function,接受的参数为异常,返回一个值,与handle相似,它也会改变结果,具体就不举例了。

除了响应结果和异常,使用CompletableFuture,能够方便地构建有多种依赖关系的任务流,咱们先来看简单的依赖单一阶段的状况。

构建依赖单一阶段的任务流

thenRun

在一个阶段正常完成后,执行下一个任务,看个简单示例:

Runnable taskA = () -> System.out.println("task A");
Runnable taskB = () -> System.out.println("task B");
Runnable taskC = () -> System.out.println("task C");

CompletableFuture.runAsync(taskA)
    .thenRun(taskB)
    .thenRun(taskC)
    .join();
复制代码

这里,有三个异步任务taskA, taskB和taskC,经过thenRun天然地描述了它们的依赖关系,thenRun是同步版本,有对应的异步版本thenRunAsync:

public CompletableFuture<Void> thenRunAsync(Runnable action) public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor) 复制代码

在thenRun构建的任务流中,只有前一个阶段没有异常结束,下一个阶段的任务才会执行,若是前一个阶段发生了异常,全部后续阶段都不会运行,结果会被设为相同的异常,调用join会抛出运行时异常CompletionException。

thenRun指定的下一个任务类型是Runnable,它不须要前一个阶段的结果做为参数,也没有返回值,因此,在thenRun返回的CompletableFuture中,结果类型为Void,即没有结果。

thenAccept/thenApply

若是下一个任务须要前一个阶段的结果做为参数,可使用thenAccept或thenApply方法:

public CompletableFuture<Void> thenAccept( Consumer<? super T> action) public <U> CompletableFuture<U> thenApply( Function<? super T,? extends U> fn) 复制代码

thenAccept的任务类型是Consumer,它接受前一个阶段的结果做为参数,没有返回值。thenApply的任务类型是Function,接受前一个阶段的结果做为参数,返回一个新的值,这个值会成为thenApply返回的CompletableFuture的结果值。看个简单示例:

Supplier<String> taskA = () -> "hello";
Function<String, String> taskB = (t) -> t.toUpperCase();
Consumer<String> taskC = (t) -> System.out.println("consume: " + t);

CompletableFuture.supplyAsync(taskA)
    .thenApply(taskB)
    .thenAccept(taskC)
    .join();
复制代码

taskA的结果是"hello",传递给了taskB,taskB转换结果为"HELLO",再把结果给taskC,taskC进行了输出,因此输出为:

consume: HELLO
复制代码

CompletableFuture中有不少名称带有run, accept或apply的方法,它们通常与任务的类型相对应,run与Runnable对应,accept与Consumer对应,apply与Function对应,后续就不赘述了。

thenCompose

与thenApply相似,还有一个方法thenCompose,声明为:

public <U> CompletableFuture<U> thenCompose( Function<? super T, ? extends CompletionStage<U>> fn) 复制代码

这个任务类型也是Function,也是接受前一个阶段的结果,返回一个新的结果,不过,这个转换函数fn的返回值类型是CompletionStage,也就是说,它的返回值也是一个阶段,若是使用thenApply,结果就会变为CompletableFuture<CompletableFuture<U>>,而使用thenCompose,会直接返回fn返回的CompletionStage,thenCompose与thenApply的区别,就如同Stream API中flatMap与map的区别,看个简单的示例:

Supplier<String> taskA = () -> "hello";
Function<String, CompletableFuture<String>> taskB = (t) ->
    CompletableFuture.supplyAsync(() -> t.toUpperCase());
Consumer<String> taskC = (t) -> System.out.println("consume: " + t);

CompletableFuture.supplyAsync(taskA)
    .thenCompose(taskB)
    .thenAccept(taskC)
    .join();
复制代码

以上代码中,taskB是一个转换函数,但它本身也执行了异步任务,返回类型也是CompletableFuture,因此使用了thenCompose。

构建依赖两个阶段的任务流

依赖两个都完成

thenRun, thenAccept, thenApply和thenCompose用于在一个阶段完成后执行另外一个任务,CompletableFuture还有一些方法用于在两个阶段都完成后执行另外一个任务,方法是:

public CompletableFuture<Void> runAfterBoth( CompletionStage<?> other, Runnable action public <U,V> CompletableFuture<V> thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) public <U> CompletableFuture<Void> thenAcceptBoth( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) 复制代码

runAfterBoth对应的任务类型是Runnable,thenCombine对应的任务类型是BiFunction,接受前两个阶段的结果做为参数,返回一个结果,thenAcceptBoth对应的任务类型是BiConsumer,接受前两个阶段的结果做为参数,但不返回结果。它们都有对应的异步和带Executor参数的版本,用于指定下一个任务由谁执行,具体就不赘述了。当前阶段和参数指定的另外一个阶段other没有依赖关系,并发执行,当两个都执行结束后,开始执行指定的另外一个任务。

看个简单的示例,任务A和B执行结束后,执行任务C合并结果,代码为:

Supplier<String> taskA = () -> "taskA";
CompletableFuture<String> taskB = CompletableFuture.supplyAsync(() -> "taskB");
BiFunction<String, String, String> taskC = (a, b) -> a + "," + b;

String ret = CompletableFuture.supplyAsync(taskA)
        .thenCombineAsync(taskB, taskC)
        .join();
System.out.println(ret);
复制代码

输出为:

taskA,taskB
复制代码

依赖两个阶段中的一个

前面的方法要求两个阶段都完成后才执行下一个任务,若是只须要其中任意一个阶段完成,可使用下面的方法:

public CompletableFuture<Void> runAfterEither( CompletionStage<?> other, Runnable action) public <U> CompletableFuture<U> applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn) public CompletableFuture<Void> acceptEither( CompletionStage<? extends T> other, Consumer<? super T> action) 复制代码

它们都有对应的异步和带Executor参数的版本,用于指定下一个任务由谁执行,具体就不赘述了。当前阶段和参数指定的另外一个阶段other没有依赖关系,并发执行,只要当其中一个执行完了,就会启动参数指定的另外一个任务,具体就不赘述了。

构建依赖多个阶段的任务流

若是依赖的阶段不止两个,可使用以下方法:

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) 复制代码

它们是静态方法,基于多个CompletableFuture构建了一个新的CompletableFuture。

对于allOf,当全部子CompletableFuture都完成时,它才完成,若是有的CompletableFuture异常结束了,则新的CompletableFuture的结果也是异常,不过,它并不会由于有异常就提早结束,而是会等待全部阶段结束,若是有多个阶段异常结束,新的CompletableFuture中保存的异常是最后一个的。新的CompletableFuture会持有异常结果,但不会保存正常结束的结果,若是须要,能够从每一个阶段中获取。看个简单的示例:

CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {
    delayRandom(100, 1000);
    return "helloA";
}, executor);

CompletableFuture<Void> taskB = CompletableFuture.runAsync(() -> {
    delayRandom(2000, 3000);
}, executor);

CompletableFuture<Void> taskC = CompletableFuture.runAsync(() -> {
    delayRandom(30, 100);
    throw new RuntimeException("task C exception");
}, executor);

CompletableFuture.allOf(taskA, taskB, taskC).whenComplete((result, ex) -> {
    if (ex != null) {
        System.out.println(ex.getMessage());
    }
    if (!taskA.isCompletedExceptionally()) {
        System.out.println("task A " + taskA.join());
    }
});
复制代码

taskC会首先异常结束,但新构建的CompletableFuture会等待其余两个结束,都结束后,能够经过子阶段(如taskA)的方法检查子阶段的状态和结果。

对于anyOf返回的CompletableFuture,当第一个子CompletableFuture完成或异常结束时,它相应地完成或异常结束,结果与第一个结束的子CompletableFuture同样,具体就不举例了。

小结

本节介绍了Java 8中的组合式异步编程CompletableFuture:

  • 它是对Future的加强,但能够响应结果或异常事件,有不少方法构建异步任务流
  • 根据任务由谁执行,通常有三类对应方法,名称不带Async的方法由当前线程或前一个阶段的线程执行,带Async但没有指定Executor的方法由默认Excecutor执行(ForkJoinPool.commonPool()或ThreadPerTaskExecutor),带Async且指定Executor参数的方法由指定的Executor执行
  • 根据任务类型,通常也有三类对应方法,名称带run的对应Runnable,带accept的对应Consumer,带apply的对应Function

使用CompletableFuture,能够简洁天然地表达多个异步任务之间的依赖关系和执行流程,大大简化代码,提升可读性。

下一节,咱们探讨Java 8对日期和时间API的加强。

(与其余章节同样,本节全部代码位于 github.com/swiftma/pro…,位于包shuo.laoma.java8.c94下)


未完待续,查看最新文章,敬请关注微信公众号“老马说编程”(扫描下方二维码),从入门到高级,深刻浅出,老马和你一块儿探索Java编程及计算机技术的本质。用心原创,保留全部版权。

相关文章
相关标签/搜索