Java8系列 (七) CompletableFuture异步编程

概述

Java8以前用 Future 处理异步请求, 当你须要获取任务结果时, 一般的作法是调用  get(long timeout, TimeUnit unit) 此方法会阻塞当前的线程, 若是任务处理超时, 就会抛出一个  TimeoutException html

    @Test
    public void test1() throws InterruptedException, ExecutionException, TimeoutException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Future<String> f = executorService.submit(() -> "ceshishanghu");
        String s = f.get(3, TimeUnit.SECONDS);
        System.out.println(s);
    }

在Java8中引入了 CompletableFuture, 使用它提供的API能够不用像以前那样阻塞式或轮询的获取某个异步任务的结果, CompletableFuture 会在异步任务处理完成后自动进行回调, 让你能够链式的组合多个异步任务。java

CompletableFuture 类中提供了许多以 Async 后缀结尾的方法。一般而言,名称中不带 Async 的方法和它的前一个任务同样,在同一个线程中运行。而名称以 Async 结尾的方法会将后续的任务提交到一个线程池,因此每一个任务是由不一样的线程处理的。git

静态工厂方法

  • supplyAsync(): 异步处理任务, 有返回值
  • runAsync(): 异步处理任务, 没有返回值
  • allOf(): 须要等待全部的异步任务都执行完毕,才会返回一个新的CompletableFuture
  • anyOf(): 任意一个异步任务执行完毕,就会返回一个新的CompletableFuture
  • completedFuture(): 这种方式获取的 CompletableFuture 不是异步的,它会等待获取明确的返回结果以后再返回一个已经完成的 CompletableFuture
    @Test
    public void test2() {
        //建立一个已经有任务结果的CompletableFuture
        CompletableFuture<String> f1 = CompletableFuture.completedFuture("return value");
        //异步处理任务,有返回值
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(this::get);
        //异步处理任务,没有返回值
        CompletableFuture<Void> f3 = CompletableFuture.runAsync(System.out::println);
        //须要等待全部的异步任务都执行完毕,才会返回一个新的CompletableFuture
//        CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
        //任意一个异步任务执行完毕,就会返回一个新的CompletableFuture
        CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2, f3);
        Object result = any.join();
        System.out.println("result = " + result);//result = return value
    }

    public String get() {
        delay();
        return "异步任务结果";
    }

    public void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

上面的示例中, allOf() 由于要等待全部的异步任务执行完成,因此要延时1秒钟才会返回一个新的 CompletableFuture, 而 anyOf() 则不须要等待全部的异步任务, 由于第一个异步最早完成, 因此控制台输出  result = return value 。github

链式调用

A任务执行完毕, 继续执行B任务, B任务执行完毕, 继续执行C任务...web

    @Test
    public void test2() {
        CompletableFuture<Void> f = CompletableFuture.supplyAsync(() -> {
            //测试抛异常后,handle()方法接受并处理
       //int x = 1 / 0;
            return "这是一个栗子";
        }).handle((res, ex) -> {
            System.out.println("handle res = " + res);
            if (Objects.nonNull(ex)) {
                System.out.println("handle ex" + ex.getCause().getMessage());
            }
            return Objects.nonNull(ex) ? 0 : 1;
        }).thenApply(res -> {
            System.out.println("thenApply res = " + res);
            return res == 1 ? "success" : "error";
        }).thenAccept(res -> System.out.println("thenAccept res = " + res)
        ).thenRun(() -> System.out.println("没有参数, 异步执行一个没有返回值的任务"));
        f.join();
    }

输出结果:spring

handle res = 这是一个栗子
thenApply res = 1
thenAccept res = success
没有参数, 异步执行一个没有返回值的任务

将上面   int x = 1 / 0; 这行代码取消注释, 从新运行结果以下:mvc

handle res = null
handle ex/ by zero
thenApply res = 0
thenAccept res = error
没有参数, 异步执行一个没有返回值的任务

能够看到, handle() 方法接受前一个 CompletableFuture  的返回结果或抛出的异常做为方法入参, 通过处理后再返回一个新的结果。app

级联组合

  • thenCompose(): 对两个异步操做进行组合,第一个操做完成时,将其结果做为参数传递给第二个操做, 第二个操做会返回一个新的CompletableFuture。
  • thenCombine(): 将两个彻底无关联的异步请求的结果整合起来, 计算出一个新的值并返回
    @Test
    public void test3() {
        CompletableFuture<String> f = CompletableFuture.completedFuture("CompletableFuture 1");
        CompletableFuture<String> f1 = f.thenCompose(res -> {
            System.out.println("thenCompose res = " + res);
            return CompletableFuture.supplyAsync(() -> "CompletableFuture 2");
        });
        System.out.println(f1.join());
        CompletableFuture<Integer> f3 = CompletableFuture.completedFuture(998);
        CompletableFuture<String> f4 = f.thenCombine(f3, (str, num) -> {
            System.out.println("str = " + str + ", num= " + num);
            return str + num;
        });
        System.out.println(f4.join());
    }

输出结果:异步

thenCompose res = CompletableFuture 1
CompletableFuture 2
str = CompletableFuture 1, num= 998
CompletableFuture 1998

whenComplete

当前一个 CompletableFuture  计算完成或抛出异常时, 能够使用 whenComplete() 执行指定的任务。async

    @Test
    public void test4() {
        CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> {
            //测试抛异常后,whenComplete()方法接受并处理
            int x = 1 / 0;
            return "这是一个栗子";
        }).whenComplete((res, ex) -> {
            System.out.println("whenComplete res = " + res);
            if (Objects.nonNull(ex)) {
                System.out.println("whenComplete ex" + ex.getCause().getMessage());
            }
        });
        System.out.println("f.join() = " + f.join());
    }

输出结果以下,其中 res 对应前一个 CompletableFuture 的返回结果,ex 对应前一个 CompletableFuture 抛出的异常(若是发生异常)。

从控制台输出顺序看出,当前一个 CompletableFuture  计算完成或抛出异常时,  whenComplete() 会接受它的返回结果或抛出的异常,来作一些其余的事情,最后再返回原来的返回结果或抛出异常。类比下 try/catch 语句块中的 final 语句块。

whenComplete res = null
whenComplete ex/ by zero

java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero

    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1592)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java)
    at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.ArithmeticException: / by zero
    at com.java8.action.ChapterTest.lambda$test4$0(ChapterTest.java:22)

异常处理

只有当前一个 CompletableFuture 发生异常时,才会进入到 exceptionally() 方法,并将产生的异常做为入参。

    @Test
    public void test5() {
        CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> {
            //测试抛异常后,exceptionally()方法接受并处理
            //int x = 1 / 0;
            return "这是一个栗子";
        }).exceptionally(ex -> ex.getCause().getMessage());
        System.out.println("f.join() = " + f.join());
    }

注释  int x = 1 / 0; ,输出以下:

f.join() = 这是一个栗子

取消注释   int x = 1 / 0; , 输出以下:

f.join() = / by zero

Both系列方法

  • thenAcceptBoth(): 等待当前的 CompletableFuture 和另外一个 CompletableFuture 执行完成,将它们的返回结果做为入参去执行一个操做,没有返回值
  • runAfterBoth(): 等待当前的 CompletableFuture 和另外一个 CompletableFuture 执行完成,而后去执行一个操做,没有返回值

代码清单一

    @Test
    public void test6() {
        CompletableFuture<Integer> f1 = CompletableFuture.completedFuture(9523);
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(this::get);
        CompletableFuture<Void> both = f1.thenAcceptBoth(f2, (num, str) -> System.out.println("num = " + num + ", str = " + str));
        both.join();
    }

    public String get() {
        delay();
        return "CompletableFuture 2";
    }

    public void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

代码清单一输出结果以下:

num = 9523, str = CompletableFuture 2 

代码清单二

    @Test
    public void test7() {
        CompletableFuture<Integer> f1 = CompletableFuture.completedFuture(9523);
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "");
        CompletableFuture<Void> both = f1.runAfterBoth(f2, () -> System.out.println("执行一个任务,没有入参"));
        both.join();
    }

代码清单二输出结果以下:

执行一个任务,没有入参

Either系列

  • acceptEither: 当前的 CompletableFuture 和另外一个 CompletableFuture 任意一个执行完成,将对应的返回结果做为入参去执行一个操做,没有返回值
  • applyToEither: 当前的 CompletableFuture 和另外一个 CompletableFuture 任意一个执行完成,将对应的返回结果做为入参,使用 mapping 函数转换成一个新的值并返回
  • runAfterEither: 当前的 CompletableFuture 和另外一个 CompletableFuture 任意一个执行完成,而后去执行一个操做,没有返回值

代码清单三:

    @Test
    public void test8() {
        CompletableFuture<String> f1 = CompletableFuture.completedFuture("CompletableFuture 1");
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(this::get);
        CompletableFuture<Void> both = f1.acceptEither(f2, System.out::println);
        both.join();
    }

    public String get() {
        delay();
        return "CompletableFuture 2";
    }

    public void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

代码清单三输出结果:

CompletableFuture 1

代码清单四:

    @Test
    public void test9() {
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(this::get);
        CompletableFuture<String> f2 = CompletableFuture.completedFuture("CompletableFuture 2");
        CompletableFuture<Integer> f3 = f1.applyToEither(f2, res -> {
            System.out.println("res = " + res);
            return res.length();
        });
        System.out.println("f3.join() = " + f3.join());
    }

    public String get() {
        delay();//这里会延时一秒钟
        return "CompletableFuture 1";
    }

代码清单四输出结果:

res = CompletableFuture 2
f3.join() = 19

代码清单五:

    @Test
    public void test10() {
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(this::get);
        CompletableFuture<Void> f2 = CompletableFuture.allOf();
        CompletableFuture<Void> f3 = f1.runAfterEither(f2, () -> System.out.println("执行一个任务,没有入参"));
        f3.join();
    }

    public String get() {
        delay();//这里会延时一秒钟
        return "CompletableFuture 1";
    }

代码清单五输出结果:

执行一个任务,没有入参

使用自定义的执行器来处理多个异步任务

在实际应用场景中可能会遇到这种状况,假如你须要同时处理大量的异步任务,且这些异步任务互相不依赖,你只要最后把它们的结果组装起来就行,这该怎么实现呢?

下面给出了一个使用默认执行器的示例,经过Stream流同时建立 9 个异步任务,获取它们的结果并组装后返回,其中 Runtime.getRuntime().availableProcessors() 表示Java虚拟机可用的处理器个数,在我以前的文章 Java8系列 (二) Stream流 中有介绍过。

代码清单六:

    @Test
    public void test11() {
        List<String> list = Arrays.asList("王小波书店", "杭州沈记古旧书店", "猫的天空之城概念书店", "纯真年代书吧", "南山书屋", "西西弗书店", "新华书店", "钟书阁", "云门书屋");
        System.out.println("当前机器有" + Runtime.getRuntime().availableProcessors() + "个可用的处理器");
        long start = System.nanoTime();
        List<CompletableFuture<String>> futures = list.stream()
                .map(str -> CompletableFuture.supplyAsync(() -> this.calculateLength(str)))
                .collect(Collectors.toList());
        System.out.println("get futures "+(System.nanoTime() - start) / 1000_000 + " msecs");
        String result = futures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.joining(",", "[", "]"));
        System.out.println("get result "+(System.nanoTime() - start) / 1000_000 + " msecs");
        System.out.println(result);
    }

    public String calculateLength(String str) {
        delay();
        return str;
    }

    public void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

运行代码清单六,输出结果:

当前机器有4个可用的处理器
get futures 95 msecs
get result 3098 msecs
[王小波书店,杭州沈记古旧书店,猫的天空之城概念书店,纯真年代书吧,南山书屋,西西弗书店,新华书店,钟书阁,云门书屋]

能够看到,虽然使用了异步处理,但仍是花了 3098 毫秒才执行完成全部任务。这是由于 CompletableFuture 内部采用的是通用线程池 ForkJoinPool.commonPool() , 默认都使用固定数目的线程, 具体线程数取决于  Runtime.getRuntime().availableProcessors()  的返回值。

我这里测试的机器显示通用线程池中处于可用状态的线程数为 4,一次只能同时处理 4 个任务,后面的5个异步任务只能等到前面某一个操做完成释放出空闲线程才能继续, 所以总的会消耗约 3 秒钟的时间。

咱们将上面的代码进行重构,使用自定义的执行器,经过自定义的执行器你能够指定线程池的大小。其中线程数的设定能够参考公式  Nthreads = NCPU * UCPU * (1 + W/C) 

    @Test
    public void test12() {
        List<String> list = Arrays.asList("王小波书店", "杭州沈记古旧书店", "猫的天空之城概念书店", "纯真年代书吧", "南山书屋", "西西弗书店", "新华书店", "钟书阁", "云门书屋");
        final ExecutorService executor = Executors.newFixedThreadPool(Math.min(list.size(), 100), r -> {
            Thread thread = new Thread(r);
            //守护线程不会组织程序的终止
            thread.setDaemon(true);
            return thread;
        });
        System.out.println("当前机器有" + Runtime.getRuntime().availableProcessors() + "个可用的处理器, 当前处理异步请求的线程池大小为 " + Math.min(list.size(), 100));
        long start = System.nanoTime();
        List<CompletableFuture<String>> futures = list.stream()
                .map(str -> CompletableFuture.supplyAsync(() -> this.calculateLength(str), executor))
                .collect(Collectors.toList());
        System.out.println("get futures " + (System.nanoTime() - start) / 1000_000 + " msecs");
        String result = futures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.joining(",", "[", "]"));
        System.out.println("get result " + (System.nanoTime() - start) / 1000_000 + " msecs");
        System.out.println(result);
    }

    public String calculateLength(String str) {
        delay();
        return str;
    }

    public void delay() {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

输出结果以下:

当前机器有4个可用的处理器, 当前处理异步请求的线程池大小为 9
get futures 38 msecs
get result 1039 msecs
[王小波书店,杭州沈记古旧书店,猫的天空之城概念书店,纯真年代书吧,南山书屋,西西弗书店,新华书店,钟书阁,云门书屋]

能够看到,使用自定义的执行器调大线程池大小后,总的运行时间只要 1039 毫秒。

将CompletableFuture做为Controller的返回值

上面还存在一个问题,虽然如今能够同时处理多个异步任务,可是若是须要将异步结果返回给另外一个服务,那不是还得经过 join() 阻塞的获取到返回值后才能再返回么?

自Spring Boot 1.3 (Spring 4.2) 以后开始支持 CompletableFuture 或 CompletionStage 做为 Controller 的返回值,她很好的解决了上面的异步阻塞问题,只要将  CompletableFuture 做为 Controller 的返回值,在异步任务执行完成后,它会自动响应结果给另外一个服务。

@RestController
public class AsyncController {

    @GetMapping("/redirect")
    public CompletableFuture<ModelAndView> redirect() {
        return CompletableFuture.supplyAsync(() -> {
            this.delay();
            RedirectView redirectView = new RedirectView("https://www.cnblogs.com/qingshanli/");
            redirectView.addStaticAttribute("hint", "CompletableFuture组装ModelAndView视图,异步返回结果");
            return new ModelAndView(redirectView);
        });
    }

    @GetMapping("/async")
    public CompletableFuture<String> async() {
        System.out.println("async method start");
        return CompletableFuture.supplyAsync(() -> {
            this.delay();
            return "CompletableFuture做为Controller的返回值,异步返回结果";
        }).whenComplete((res, ex) -> System.out.println("async method completely, res = " + res + ", ex = " + ex));
    }

    public void delay() {
        try {
            Thread.sleep(3000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
} 

启动项目,Postman 访问 http://localhost:8080/async,截图以下:

Postman 访问 http://localhost:8080/redirect,截图以下:

参考资料

https://github.com/AndreasKl/spring-boot-mvc-completablefuture

https://nickebbitt.github.io/blog/2017/03/22/async-web-service-using-completable-future

https://www.humansreadcode.com/spring-boot-completablefuture/

Java8 实战

做者:张小凡
出处:https://www.cnblogs.com/qingshanli/ 本文版权归做者和博客园共有,欢迎转载,但未经做者赞成必须保留此段声明,且在文章页面明显位置给出原文链接,不然保留追究法律责任的权利。若是以为还有帮助的话,能够点一下右下角的【推荐】。

相关文章
相关标签/搜索