future接口能够构建异步应用,但依然有其局限性。它很难直接表述多个Future 结果之间的依赖性。实际开发中,咱们常常须要达成如下目的:java
新的CompletableFuture将使得这些成为可能。编程
首先,CompletableFuture实现了Future接口,所以你能够像Future那样使用它。数组
其次,CompletableFuture并不是必定要交给线程池执行才能实现异步,你能够像下面这样实现异步运行。异步
public static void test1() throws Exception{ CompletableFuture<String> completableFuture=new CompletableFuture(); new Thread(new Runnable() { @Override public void run() { //模拟执行耗时任务 System.out.println("task doing..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } //告诉completableFuture任务已经完成 completableFuture.complete("result"); } }).start(); //获取任务结果,若是没有完成会一直阻塞等待 String result=completableFuture.get(); System.out.println("计算结果:"+result); }
若是没有意外,上面发的代码工做得很正常。可是,若是任务执行过程当中产生了异常会怎样呢?ide
很是不幸,这种状况下你会获得一个至关糟糕的结果:异常会被限制在执行任务的线程的范围内,最终会杀死该线程,而这会致使等待 get 方法返回结果的线程永久地被阻塞。客户端可使用重载版本的 get 方法,它使用一个超时参数来避免发生这样的状况。这是一种值得推荐的作法,你应该尽可能在你的代码中添加超时判断的逻辑,避免发生相似的问题。使用这种方法至少能防止程序永久地等待下去,超时发生时,程序会获得通知发生了 Timeout-Exception 。不过,也由于如此,你不能指定执行任务的线程内到底发生了什么问题。函数
为了能获取任务线程内发生的异常,你须要使用CompletableFuture 的completeExceptionally方法将致使CompletableFuture 内发生问题的异常抛出。这样,当执行任务发生异常时,调用get()方法的线程将会收到一个 ExecutionException 异常,该异常接收了一个包含失败缘由的Exception 参数。spa
public static void test2() throws Exception{ CompletableFuture<String> completableFuture=new CompletableFuture(); new Thread(new Runnable() { @Override public void run() { try { //模拟执行耗时任务 System.out.println("task doing..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } throw new RuntimeException("抛异常了"); }catch (Exception e) { //告诉completableFuture任务发生异常了 completableFuture.completeExceptionally(e); } } }).start(); //获取任务结果,若是没有完成会一直阻塞等待 String result=completableFuture.get(); System.out.println("计算结果:"+result); }
前面咱们经过编程本身建立 CompletableFuture 对象以及如何获取返回值,虽然看起来这些操做已经比较方便,但还有进一步提高的空间.CompletableFuture 类自身提供了大量精巧的工厂方法,使用这些方法能更容易地完成整个流程,还不用担忧实现的细节。supplyAsync 方法接受一个生产者(Supplier)做为参数,返回一个 CompletableFuture对象。生产者方法会交由 ForkJoinPool池中的某个执行线程( Executor )运行,可是你也可使用 supplyAsync 方法的重载版本,传递第二个参数指定线程池执行器执行生产者方法。线程
public static void test3() throws Exception { //supplyAsync内部使用ForkJoinPool线程池执行任务 CompletableFuture<String> completableFuture=CompletableFuture.supplyAsync(()->{ //模拟执行耗时任务 System.out.println("task doing..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } //返回结果 return "result"; }); System.out.println("计算结果:"+completableFuture.get()); }
allOf 工厂方法接收一个由CompletableFuture 构成的数组,数组中的全部 Completable-Future 对象执行完成以后,它返回一个 CompletableFuture<Void> 对象。这意味着,若是你须要等待多个 CompletableFuture 对象执行完毕,对 allOf 方法返回的CompletableFuture 执行 join 操做能够等待CompletableFuture执行完成。code
或者你可能但愿只要 CompletableFuture 对象数组中有任何一个执行完毕就再也不等待,在这种状况下,你可使用一个相似的工厂方法 anyOf 。该方法接收一个 CompletableFuture 对象构成的数组,返回由第一个执行完毕的 CompletableFuture 对象的返回值构成的 CompletableFuture<Object> 。对象
public static void test4() throws Exception { CompletableFuture<String> completableFuture1=CompletableFuture.supplyAsync(()->{ //模拟执行耗时任务 System.out.println("task1 doing..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } //返回结果 return "result1"; }); CompletableFuture<String> completableFuture2=CompletableFuture.supplyAsync(()->{ //模拟执行耗时任务 System.out.println("task2 doing..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } //返回结果 return "result2"; }); CompletableFuture<Object> anyResult=CompletableFuture.anyOf(completableFuture1,completableFuture2); System.out.println("第一个完成的任务结果:"+anyResult.get()); CompletableFuture<Void> allResult=CompletableFuture.allOf(completableFuture1,completableFuture2); //阻塞等待全部任务执行完成 allResult.join(); System.out.println("全部任务执行完成"); }
一般,咱们会有多个须要独立运行但又有所依赖的的任务。好比先等用于的订单处理完毕而后才发送邮件通知客户。
thenCompose 方法容许你对两个异步操做进行流水线,第一个操做完成时,将其结果做为参数传递给第二个操做。你能够建立两个CompletableFutures 对象,对第一个 CompletableFuture 对象调用thenCompose ,并向其传递一个函数。当第一个CompletableFuture 执行完毕后,它的结果将做为该函数的参数,这个函数的返回值是以第一个 CompletableFuture 的返回作输入计算出的第二个 CompletableFuture 对象。
public static void test5() throws Exception { CompletableFuture<String> completableFuture1=CompletableFuture.supplyAsync(()->{ //模拟执行耗时任务 System.out.println("task1 doing..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } //返回结果 return "result1"; }); //等第一个任务完成后,将任务结果传给参数result,执行后面的任务并返回一个表明任务的completableFuture CompletableFuture<String> completableFuture2= completableFuture1.thenCompose(result->CompletableFuture.supplyAsync(()->{ //模拟执行耗时任务 System.out.println("task2 doing..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } //返回结果 return "result2"; })); System.out.println(completableFuture2.get()); }
另外一种比较常见的状况是,你须要将两个彻底不相干的 CompletableFuture 对象的结果整合起来,并且你也不但愿等到第一个任务彻底结束才开始第二项任务。这种状况,你应该使用 thenCombine 方法,它接收名为 BiFunction 的第二参数,这个参数定义了当两个 CompletableFuture 对象完成计算后,结果如何合并。
public static void test6() throws Exception { CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> { //模拟执行耗时任务 System.out.println("task1 doing..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } //返回结果 return 100; }); //将第一个任务与第二个任务组合一块儿执行,都执行完成后,将两个任务的结果合并 CompletableFuture<Integer> completableFuture2 = completableFuture1.thenCombine( //第二个任务 CompletableFuture.supplyAsync(() -> { //模拟执行耗时任务 System.out.println("task2 doing..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } //返回结果 return 2000; }), //合并函数 (result1, result2) -> result1 + result2); System.out.println(completableFuture2.get()); }
咱们能够在每一个CompletableFuture 上注册一个操做,该操做会在 CompletableFuture 完成执行后调用它。CompletableFuture 经过 thenAccept 方法提供了这一功能,它接收CompletableFuture 执行完毕后的返回值作参数。
public static void test7() throws Exception { CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> { //模拟执行耗时任务 System.out.println("task1 doing..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } //返回结果 return 100; }); //注册完成事件 completableFuture1.thenAccept(result->System.out.println("task1 done,result:"+result)); CompletableFuture<Integer> completableFuture2= //第二个任务 CompletableFuture.supplyAsync(() -> { //模拟执行耗时任务 System.out.println("task2 doing..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } //返回结果 return 2000; }); //注册完成事件 completableFuture2.thenAccept(result->System.out.println("task2 done,result:"+result)); //将第一个任务与第二个任务组合一块儿执行,都执行完成后,将两个任务的结果合并 CompletableFuture<Integer> completableFuture3 = completableFuture1.thenCombine(completableFuture2, //合并函数 (result1, result2) -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return result1 + result2; }); System.out.println(completableFuture3.get()); }
Notice:固然也不能盲目的使用completableFuture来代替Future,就好比Future中的cancel方法在CompletableFuture中就难以代替。此处还有些须要注意的地方。