这篇博客回顾JAVA8的CompletionStage
API以及其在JAVA库中的标准实现CompletableFuture
。将会经过几个例子来展现API的各类行为。java
由于CompletableFuture
是CompletionInterface
接口的实现,因此咱们首先要了解该接口的契约。它表明某个同步或异步计算的一个阶段。你能够把它理解为是一个为了产生有价值最终结果的计算的流水线上的一个单元。这意味着多个ComletionStage
指令能够连接起来从而一个阶段的完成能够触发下一个阶段的执行。面试
除了实现了CompletionStage
接口,Completion
还继承了Future
,这个接口用于实现一个未开始的异步事件。由于可以显式的完成Future
,因此取名为CompletableFuture
。微信
这个简单的示例中建立了一个已经完成的预先设置好结果的CompletableFuture
。一般做为计算的起点阶段。app
static void completedFutureExample() { CompletableFuture cf = CompletableFuture.completedFuture("message"); assertTrue(cf.isDone()); assertEquals("message", cf.getNow(null)); }
getNow
方法会返回完成后的结果(这里就是message),若是还未完成,则返回传入的默认值null
。dom
下面的例子解释了如何建立一个异步运行Runnable
的stage。异步
static void runAsyncExample() { CompletableFuture cf = CompletableFuture.runAsync(() -> { assertTrue(Thread.currentThread().isDaemon()); randomSleep(); }); assertFalse(cf.isDone()); sleepEnough(); assertTrue(cf.isDone()); }
这个例子想要说明两个事情:ide
CompletableFuture
中以Async
为结尾的方法将会异步执行Executor
的状况下),异步执行会使用ForkJoinPool
实现,该线程池使用一个后台线程来执行Runnable
任务。注意这只是特定于CompletableFuture
实现,其它的CompletableStage
实现能够重写该默认行为。下面的例子引用了第一个例子中已经完成的CompletableFuture
,它将引用生成的字符串结果并将该字符串大写。ui
static void thenApplyExample() { CompletableFuture cf = CompletableFuture.completedFuture("message").thenApply(s -> { assertFalse(Thread.currentThread().isDaemon()); return s.toUpperCase(); }); assertEquals("MESSAGE", cf.getNow(null)); }
这里的关键词是thenApply
:this
then
是指在当前阶段正常执行完成后(正常执行是指没有抛出异常)进行的操做。在本例中,当前阶段已经完成并获得值message
。Apply
是指将一个Function
做用于以前阶段得出的结果Function
是阻塞的,这意味着只有当大写操做执行完成以后才会执行getNow()
方法。spa
经过在方法后面添加Async
后缀,该CompletableFuture
链将会异步执行(使用ForkJoinPool.commonPool())
static void thenApplyAsyncExample() { CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> { assertTrue(Thread.currentThread().isDaemon()); randomSleep(); return s.toUpperCase(); }); assertNull(cf.getNow(null)); assertEquals("MESSAGE", cf.join()); }
异步方法的一个好处是能够提供一个Executor
来执行CompletableStage
。这个例子展现了如何使用一个固定大小的线程池来实现大写操做。
static ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() { int count = 1; @Override public Thread newThread(Runnable runnable) { return new Thread(runnable, "custom-executor-" + count++); } }); static void thenApplyAsyncWithExecutorExample() { CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> { assertTrue(Thread.currentThread().getName().startsWith("custom-executor-")); assertFalse(Thread.currentThread().isDaemon()); randomSleep(); return s.toUpperCase(); }, executor); assertNull(cf.getNow(null)); assertEquals("MESSAGE", cf.join()); }
若是下一个Stage接收了当前Stage的结果可是在计算中无需返回值(好比其返回值为void),那么它将使用方法thenAccept
并传入一个Consumer
接口。
static void thenAcceptExample() { StringBuilder result = new StringBuilder(); CompletableFuture.completedFuture("thenAccept message") .thenAccept(s -> result.append(s)); assertTrue("Result was empty", result.length() > 0); }
Consumer
将会同步执行,因此咱们无需在返回的CompletableFuture
上执行join操做。
一样,使用Asyn后缀实现:
static void thenAcceptAsyncExample() { StringBuilder result = new StringBuilder(); CompletableFuture cf = CompletableFuture.completedFuture("thenAcceptAsync message") .thenAcceptAsync(s -> result.append(s)); cf.join(); assertTrue("Result was empty", result.length() > 0); }
咱们如今来模拟一个出现异常的场景。为了简洁性,咱们仍是将一个字符串大写,可是咱们会模拟延时进行该操做。咱们会使用thenApplyAsyn(Function, Executor)
,第一个参数是大写转化方法,第二个参数是一个延时executor,它会延时一秒钟再将操做提交给ForkJoinPool
。
static void completeExceptionallyExample() { CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase, CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)); CompletableFuture exceptionHandler = cf.handle((s, th) -> { return (th != null) ? "message upon cancel" : ""; }); cf.completeExceptionally(new RuntimeException("completed exceptionally")); assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally()); try { cf.join(); fail("Should have thrown an exception"); } catch(CompletionException ex) { // just for testing assertEquals("completed exceptionally", ex.getCause().getMessage()); } assertEquals("message upon cancel", exceptionHandler.join()); }
message
的CompletableFuture
对象。而后咱们调用thenApplyAsync
方法,该方法会返回一个新的CompletableFuture
。这个方法用异步的方式执行大写操做。这里还展现了如何使用delayedExecutor(timeout, timeUnit)
方法来延时异步操做。exceptionHandler
,这个阶段会处理一切异常并返回另外一个消息message upon cancel
。CompletionException
。它还会触发handler
阶段。API补充:
<U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)
返回一个新的CompletionStage,不管以前的Stage是否正常运行完毕。传入的参数包括上一个阶段的结果和抛出异常。
和计算时异常处理很类似,咱们能够经过Future
接口中的cancel(boolean mayInterruptIfRunning)
来取消计算。
static void cancelExample() { CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase, CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)); CompletableFuture cf2 = cf.exceptionally(throwable -> "canceled message"); assertTrue("Was not canceled", cf.cancel(true)); assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally()); assertEquals("canceled message", cf2.join()); }
API补充
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
返回一个新的CompletableFuture,若是出现异常,则为该方法中执行的结果,不然就是正常执行的结果。
下面的例子建立了一个CompletableFuture
对象并将Function
做用于已完成的两个Stage中的任意一个(没有保证哪个将会传递给Function)。这两个阶段分别以下:一个将字符串大写,另外一个小写。
static void applyToEitherExample() { String original = "Message"; CompletableFuture cf1 = CompletableFuture.completedFuture(original) .thenApplyAsync(s -> delayedUpperCase(s)); CompletableFuture cf2 = cf1.applyToEither( CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)), s -> s + " from applyToEither"); assertTrue(cf2.join().endsWith(" from applyToEither")); }
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T,U> fn)
返回一个全新的CompletableFuture,包含着this或是other操做完成以后,在两者中的任意一个执行fn
和前一个例子相似,将Function
替换为Consumer
static void acceptEitherExample() { String original = "Message"; StringBuilder result = new StringBuilder(); CompletableFuture cf = CompletableFuture.completedFuture(original) .thenApplyAsync(s -> delayedUpperCase(s)) .acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)), s -> result.append(s).append("acceptEither")); cf.join(); assertTrue("Result was empty", result.toString().endsWith("acceptEither")); }
注意这里的两个Stage都是同步运行的,第一个stage将字符串转化为大写以后,第二个stage将其转化为小写。
static void runAfterBothExample() { String original = "Message"; StringBuilder result = new StringBuilder(); CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).runAfterBoth( CompletableFuture.completedFuture(original).thenApply(String::toLowerCase), () -> result.append("done")); assertTrue("Result was empty", result.length() > 0); }
Biconsumer支持同时对两个Stage的结果进行操做。
static void thenAcceptBothExample() { String original = "Message"; StringBuilder result = new StringBuilder(); CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).thenAcceptBoth( CompletableFuture.completedFuture(original).thenApply(String::toLowerCase), (s1, s2) -> result.append(s1 + s2)); assertEquals("MESSAGEmessage", result.toString()); }
若是CompletableFuture
想要合并两个阶段的结果而且返回值,咱们可使用方法thenCombine
。这里的计算流都是同步的,因此最后的getNow()
方法会得到最终结果,即大写操做和小写操做的结果的拼接。
static void thenCombineExample() { String original = "Message"; CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s)) .thenCombine(CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)), (s1, s2) -> s1 + s2); assertEquals("MESSAGEmessage", cf.getNow(null)); }
和以前的例子相似,只是这里用了不一样的方法:即两个阶段的操做都是异步的。那么thenCombine
也会异步执行,及时它没有Async后缀。
static void thenCombineAsyncExample() { String original = "Message"; CompletableFuture cf = CompletableFuture.completedFuture(original) .thenApplyAsync(s -> delayedUpperCase(s)) .thenCombine(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)), (s1, s2) -> s1 + s2); assertEquals("MESSAGEmessage", cf.join()); }
咱们可使用thenCompose
来完成前两个例子中的操做。
static void thenComposeExample() { String original = "Message"; CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s)) .thenCompose(upper -> CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)) .thenApply(s -> upper + s)); assertEquals("MESSAGEmessage", cf.join()); }
static void anyOfExample() { StringBuilder result = new StringBuilder(); List messages = Arrays.asList("a", "b", "c"); List<CompletableFuture> futures = messages.stream() .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s))) .collect(Collectors.toList()); CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((res, th) -> { if(th == null) { assertTrue(isUpperCase((String) res)); result.append(res); } }); assertTrue("Result was empty", result.length() > 0); }
static void allOfExample() { StringBuilder result = new StringBuilder(); List messages = Arrays.asList("a", "b", "c"); List<CompletableFuture> futures = messages.stream() .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s))) .collect(Collectors.toList()); CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])) .whenComplete((v, th) -> { futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null)))); result.append("done"); }); assertTrue("Result was empty", result.length() > 0); }
static void allOfAsyncExample() { StringBuilder result = new StringBuilder(); List messages = Arrays.asList("a", "b", "c"); List<CompletableFuture> futures = messages.stream() .map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s))) .collect(Collectors.toList()); CompletableFuture allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])) .whenComplete((v, th) -> { futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null)))); result.append("done"); }); allOf.join(); assertTrue("Result was empty", result.length() > 0); }
下面展现了一个实践CompletableFuture的场景:
cars()
方法异步得到Car
列表。它将会返回一个CompletionStage<List<Car>>
。cars()
方法应当使用一个远程的REST端点来实现。rating(manufactureId)
来异步获取每辆车的评分。allOf()
来进入最终Stage,它将在这两个阶段完成后执行whenComplete()
,打印出车辆的评分。cars().thenCompose(cars -> { List<CompletionStage> updatedCars = cars.stream() .map(car -> rating(car.manufacturerId).thenApply(r -> { car.setRating(r); return car; })).collect(Collectors.toList()); CompletableFuture done = CompletableFuture .allOf(updatedCars.toArray(new CompletableFuture[updatedCars.size()])); return done.thenApply(v -> updatedCars.stream().map(CompletionStage::toCompletableFuture) .map(CompletableFuture::join).collect(Collectors.toList())); }).whenComplete((cars, th) -> { if (th == null) { cars.forEach(System.out::println); } else { throw new RuntimeException(th); } }).toCompletableFuture().join();
Java CompletableFuture 详解
Guide To CompletableFuture
想要了解更多开发技术,面试教程以及互联网公司内推,欢迎关注个人微信公众号!将会不按期的发放福利哦~