最近工做中须要用到异步编程实现一个多数据集并发异步查询的功能,之前也尝试使用过CompletableFuture
,但没有深刻原理探究,也没有概括总结,今天正好有时间,集中学习下。编程
本文结构为:痛点->解决方案->应用场景->流程示例-->概括总结并发
在Java8之前,异步编程的实现大概是下边这样子的:app
class App { ExecutorService executor = ... ArchiveSearcher searcher = ... void showSearch(final String target) throws InterruptedException { Future<String> future = executor.submit(new Callable<String>() { public String call() { return searcher.search(target); } }); displayOtherThings(); // do other things while searching try { displayText(future.get()); // use future } catch (ExecutionException ex) { cleanup(); return; } } }
future.get()方法会阻塞直到计算完成,而后返回结果。
这样一来,若是咱们须要在计算完成后执行后续操做,则只有两个选择:异步
isDone()
返回true
。但这样会极大消耗计算资源。为了解决这个痛点,Java8新增了CompletableFuture
类,顾名思义这是一个"可完成"的Future
。async
CompletableFuture<T>
实现了Future<T>
, CompletionStage<T>
,这样保证了咱们能够继续使用Future
的方法,同时复用了CompletionStage
的各类功能。异步编程
下面就从一些常见的应用场景入手,逐个分析最适用的解决方案。固然有时对于部分场景,会有多个可选方案,这时候就须要从扩展性、可维护性、性能以及易读性等方面综合考虑,作出选择。oop
Runnable
对象,并调用runAsync
方法:public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable); } static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) { if (f == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); e.execute(new AsyncRun(d, f)); return d; }
runAsync
方法将返回一个CompletableFuture<Void>
对象,CompletableFuture
不包含任何类型的返回结果。性能
Supplier<U>
对象,并调用supplyAsync
方法:public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) { if (f == null) throw new NullPointerException(); CompletableFuture<U> d = new CompletableFuture<U>(); e.execute(new AsyncSupply<U>(d, f)); return d; }
supplyAsync
方法将返回一个CompletableFuture<U>
对象,该CompletableFuture
会包含一个由泛型U
指定类型的计算结果。学习
Runnable
对象,并调用thenRun
方法:public CompletableFuture<Void> thenRun(Runnable action) { return uniRunStage(null, action); } public CompletableFuture<Void> thenRunAsync(Runnable action) { return uniRunStage(asyncPool, action); } private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) { if (f == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); if (e != null || !d.uniRun(this, f, null)) { UniRun<T> c = new UniRun<T>(e, d, this, f); push(c); c.tryFire(SYNC); } return d; }
thenRun
方法将会返回一个不包含返回值的CompletableFuture
。thenRun
方法和thenRunAsync
方法的区别是,thenRun
会在前置CompletableFuture
的线程内执行,而thenRunAsync
会在一个新线程中执行。测试
Consumer<? super T>
对象,并调用thenAccept
方法:public CompletableFuture<Void> thenAccept(Consumer<? super T> action) { return uniAcceptStage(null, action); } public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) { return uniAcceptStage(asyncPool, action); } private CompletableFuture<Void> uniAcceptStage(Executor e, Consumer<? super T> f) { if (f == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); if (e != null || !d.uniAccept(this, f, null)) { UniAccept<T> c = new UniAccept<T>(e, d, this, f); push(c); c.tryFire(SYNC); } return d; }
thenAccept
方法将会返回一个不包含返回值的CompletableFuture
。thenAccept
方法和thenAcceptAsync
方法的区别同thenRun
方法和thenRunAsync
方法。
Function<? super T,? extends U>
对象,并调用thenApply
方法:public <U> CompletableFuture<U> thenApply( Function<? super T,? extends U> fn) { return uniApplyStage(null, fn); } public <U> CompletableFuture<U> thenApplyAsync( Function<? super T,? extends U> fn) { return uniApplyStage(asyncPool, fn); } private <V> CompletableFuture<V> uniApplyStage( Executor e, Function<? super T,? extends V> f) { if (f == null) throw new NullPointerException(); CompletableFuture<V> d = new CompletableFuture<V>(); if (e != null || !d.uniApply(this, f, null)) { UniApply<T,V> c = new UniApply<T,V>(e, d, this, f); push(c); c.tryFire(SYNC); } return d; }
thenApply
方法将会返回一个包含U
类型返回值的CompletableFuture
。thenApply
方法和thenApplyAsync
方法的区别同thenRun
方法和thenRunAsync
方法。
CompletableFuture
的处理结果,则可使用Function<? super T, ? extends CompletionStage<U>>
对象,并调用thenCompose
方法:public <U> CompletableFuture<U> thenCompose( Function<? super T, ? extends CompletionStage<U>> fn) { return uniComposeStage(null, fn); } public <U> CompletableFuture<U> thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn) { return uniComposeStage(asyncPool, fn); } private <V> CompletableFuture<V> uniComposeStage( Executor e, Function<? super T, ? extends CompletionStage<V>> f) { ... Object r; Throwable x; if (e == null && (r = result) != null) { // try to return function result directly if (r instanceof AltResult) { if ((x = ((AltResult)r).ex) != null) { return new CompletableFuture<V>(encodeThrowable(x, r)); } r = null; } try { @SuppressWarnings("unchecked") T t = (T) r; CompletableFuture<V> g = f.apply(t).toCompletableFuture(); Object s = g.result; if (s != null) return new CompletableFuture<V>(encodeRelay(s)); CompletableFuture<V> d = new CompletableFuture<V>(); UniRelay<V> copy = new UniRelay<V>(d, g); g.push(copy); copy.tryFire(SYNC); return d; } catch (Throwable ex) { return new CompletableFuture<V>(encodeThrowable(ex)); } } CompletableFuture<V> d = new CompletableFuture<V>(); UniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f); push(c); c.tryFire(SYNC); return d; }
thenCombine
方法将会返回一个包含CompletableFuture
类型返回值的CompletableFuture
。thenCompose
方法和thenComposeAsync
方法的区别同thenRun
方法和thenRunAsync
方法。
CompletableFuture
和指定的CompletableFuture
所有完成后触发,但不但愿接收前边两个CompletableFuture
的输出结果T
、U
,且处理完成后不返回结果,则可使用Runnable
,并调用runAfterBoth
方法public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action) { return biRunStage(null, other, action); } public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action) { return biRunStage(asyncPool, other, action); } private CompletableFuture<Void> biRunStage(Executor e, CompletionStage<?> o, Runnable f) { CompletableFuture<?> b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); if (e != null || !d.biRun(this, b, f, null)) { BiRun<T,?> c = new BiRun<>(e, d, this, b, f); bipush(b, c); c.tryFire(SYNC); } return d; }
runAfterBoth
方法会返回一个不包含返回值的CompletableFuture
。runAfterBoth
方法和runAfterBothAsync
方法的区别同thenRun
方法和thenRunAsync
方法。
CompletableFuture
和指定的CompletableFuture
所有完成后触发,并接收前边两个CompletableFuture
的输出结果T
、U
,但处理完成后不返回结果,则可使用BiConsumer<? super T, ? super U>
,并调用thenAcceptBoth
方法:public <U> CompletableFuture<Void> thenAcceptBoth( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) { return biAcceptStage(null, other, action); } public <U> CompletableFuture<Void> thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) { return biAcceptStage(asyncPool, other, action); } private <U> CompletableFuture<Void> biAcceptStage( Executor e, CompletionStage<U> o, BiConsumer<? super T,? super U> f) { CompletableFuture<U> b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); if (e != null || !d.biAccept(this, b, f, null)) { BiAccept<T,U> c = new BiAccept<T,U>(e, d, this, b, f); bipush(b, c); c.tryFire(SYNC); } return d; }
thenAcceptBoth
方法会返回一个不包含返回值的CompletableFuture
。thenAcceptBoth
方法和thenAcceptBothAsync
方法的区别同thenRun
方法和thenRunAsync
方法。
CompletableFuture
和指定的CompletableFuture
所有完成后触发,并接收前边两个CompletableFuture
的输出结果T
、U
,且处理完成后返回V
做为处理结果,则可使用BiFunction<? super T,? super U,? extends V>
,并调用thenCombine
方法:public <U,V> CompletableFuture<V> thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) { return biApplyStage(null, other, fn); } public <U,V> CompletableFuture<V> thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) { return biApplyStage(asyncPool, other, fn); } private <U,V> CompletableFuture<V> biApplyStage( Executor e, CompletionStage<U> o, BiFunction<? super T,? super U,? extends V> f) { CompletableFuture<U> b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); CompletableFuture<V> d = new CompletableFuture<V>(); if (e != null || !d.biApply(this, b, f, null)) { BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f); bipush(b, c); c.tryFire(SYNC); } return d; }
thenCombine
方法会返回一个包含V
类型返回值的CompletableFuture
。thenCombine
方法和thenCombineAsync
方法的区别同thenRun
方法和thenRunAsync
方法。
CompletableFuture
和指定的CompletableFuture
任意一个完成后触发,但不但愿接收前边两个CompletableFuture
的输出结果T
、U
,且处理完成后不返回结果,则可使用Runnable
,并调用runAfterEither
方法public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action) { return orRunStage(null, other, action); } public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action) { return orRunStage(asyncPool, other, action); } private CompletableFuture<Void> orRunStage(Executor e, CompletionStage<?> o, Runnable f) { CompletableFuture<?> b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); if (e != null || !d.orRun(this, b, f, null)) { OrRun<T,?> c = new OrRun<>(e, d, this, b, f); orpush(b, c); c.tryFire(SYNC); } return d; }
runAfterEither
方法将返回一个不包含返回值的CompletableFuture
。runAfterEither
方法和runAfterEitherAsync
方法的区别同thenRun
方法和thenRunAsync
方法。
CompletableFuture
和指定的CompletableFuture
任意一个完成时触发,并接收前边两个CompletableFuture
的输出结果T
、U
,但处理完成后不返回结果,则可使用Consumer<? super T>
,并调用acceptEither
方法:public CompletableFuture<Void> acceptEither( CompletionStage<? extends T> other, Consumer<? super T> action) { return orAcceptStage(null, other, action); } public CompletableFuture<Void> acceptEitherAsync( CompletionStage<? extends T> other, Consumer<? super T> action) { return orAcceptStage(asyncPool, other, action); } private <U extends T> CompletableFuture<Void> orAcceptStage( Executor e, CompletionStage<U> o, Consumer<? super T> f) { CompletableFuture<U> b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); if (e != null || !d.orAccept(this, b, f, null)) { OrAccept<T,U> c = new OrAccept<T,U>(e, d, this, b, f); orpush(b, c); c.tryFire(SYNC); } return d; }
acceptEither
方法将返回一个不包含返回值的CompletableFuture
。acceptEither
方法和acceptEitherAsync
方法的区别同thenRun
方法和thenRunAsync
方法。
CompletableFuture
和指定的CompletableFuture
任意一个完成后触发,并接收前边两个CompletableFuture
的输出结果T
、U
,处理完成后返回V
做为处理结果,则可使用Function<? super T, U>
,并调用applyToEither
方法:public <U> CompletableFuture<U> applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn) { return orApplyStage(null, other, fn); } public <U> CompletableFuture<U> applyToEitherAsync( CompletionStage<? extends T> other, Function<? super T, U> fn) { return orApplyStage(asyncPool, other, fn); } private <U extends T,V> CompletableFuture<V> orApplyStage( Executor e, CompletionStage<U> o, Function<? super T, ? extends V> f) { CompletableFuture<U> b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); CompletableFuture<V> d = new CompletableFuture<V>(); if (e != null || !d.orApply(this, b, f, null)) { OrApply<T,U,V> c = new OrApply<T,U,V>(e, d, this, b, f); orpush(b, c); c.tryFire(SYNC); } return d; }
applyToEither
方法将返回一个包含V
类型返回值的CompletableFuture
。applyToEither
方法和applyToEitherAsync
方法的区别同thenRun
方法和thenRunAsync
方法。
CompletableFuture
所有执行完成后再触发,则应该将全部待等待的CompletableFuture
任务所有传入allOf
方法:public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) { return andTree(cfs, 0, cfs.length - 1); } /** Recursively constructs a tree of completions. */ static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs, int lo, int hi) { CompletableFuture<Void> d = new CompletableFuture<Void>(); CompletableFuture<?> a, b; int mid = (lo + hi) >>> 1; if ((a = (lo == mid ? cfs[lo] : andTree(cfs, lo, mid))) == null || (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : andTree(cfs, mid+1, hi))) == null) throw new NullPointerException(); if (!d.biRelay(a, b)) { BiRelay<?,?> c = new BiRelay<>(d, a, b); a.bipush(b, c); c.tryFire(SYNC); } return d; }
allOf
方法将会返回一个不包含返回值的CompletableFuture
,方便后置操做。
CompletableFuture
中任意一个执行完成后就触发,则应该将全部待等待的CompletableFuture
任务所有传入anyOf
方法:public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) { return orTree(cfs, 0, cfs.length - 1); } /** Recursively constructs a tree of completions. */ static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs, int lo, int hi) { CompletableFuture<Object> d = new CompletableFuture<Object>(); CompletableFuture<?> a, b; int mid = (lo + hi) >>> 1; if ((a = (lo == mid ? cfs[lo] : orTree(cfs, lo, mid))) == null || (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : orTree(cfs, mid+1, hi))) == null) throw new NullPointerException(); if (!d.orRelay(a, b)) { OrRelay<?,?> c = new OrRelay<>(d, a, b); a.orpush(b, c); c.tryFire(SYNC); } return d; }
anyOf
方法将返回率先完成的CompletableFuture
的返回结果,方便后置操做。
BiConsumer<? super T, ? super Throwable>
对象,并调用whenComplete
方法:public CompletableFuture<T> whenComplete( BiConsumer<? super T, ? super Throwable> action) { return uniWhenCompleteStage(null, action); } public CompletableFuture<T> whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action) { return uniWhenCompleteStage(asyncPool, action); } private CompletableFuture<T> uniWhenCompleteStage( Executor e, BiConsumer<? super T, ? super Throwable> f) { if (f == null) throw new NullPointerException(); CompletableFuture<T> d = new CompletableFuture<T>(); if (e != null || !d.uniWhenComplete(this, f, null)) { UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f); push(c); c.tryFire(SYNC); } return d; }
whenComplete
方法将会返回一个不包含返回值的CompletableFuture
。whenComplete
方法和whenCompleteAsync
方法的区别同thenRun
方法和thenRunAsync
方法。
BiFunction<? super T, Throwable, ? extends U>
对象,并调用handle
方法:public <U> CompletableFuture<U> handle( BiFunction<? super T, Throwable, ? extends U> fn) { return uniHandleStage(null, fn); } public <U> CompletableFuture<U> handleAsync( BiFunction<? super T, Throwable, ? extends U> fn) { return uniHandleStage(asyncPool, fn); } private <V> CompletableFuture<V> uniHandleStage( Executor e, BiFunction<? super T, Throwable, ? extends V> f) { if (f == null) throw new NullPointerException(); CompletableFuture<V> d = new CompletableFuture<V>(); if (e != null || !d.uniHandle(this, f, null)) { UniHandle<T,V> c = new UniHandle<T,V>(e, d, this, f); push(c); c.tryFire(SYNC); } return d; }
handle
方法将会返回一个包含U
类型返回值的CompletableFuture
。handle
方法和handleAsync
方法的区别同thenRun
方法和thenRunAsync
方法。
Function<Throwable, ? extends T>
对象,并调用exceptionally
方法:public CompletableFuture<T> exceptionally( Function<Throwable, ? extends T> fn) { return uniExceptionallyStage(fn); } private CompletableFuture<T> uniExceptionallyStage( Function<Throwable, ? extends T> f) { if (f == null) throw new NullPointerException(); CompletableFuture<T> d = new CompletableFuture<T>(); if (!d.uniExceptionally(this, f, null)) { UniExceptionally<T> c = new UniExceptionally<T>(d, this, f); push(c); c.tryFire(SYNC); } return d; }
exceptionally
方法将会返回一个包含T
类型返回值的CompletableFuture
。
public static void main(String[] args) { System.out.println(testCF()); } private static String testCF() { Map<String, Object> results = new HashMap<>(); List<CompletableFuture<Void>> completableFutures = Lists.newArrayList(); System.out.println(Thread.currentThread() + "testCF start ...... "); for (int i = 1; i <= 5; i++) { int finalI = i; CompletableFuture<Void> completableFuture = CompletableFuture .supplyAsync( () -> { System.out.println(Thread.currentThread() + "supplyAsync Job" + finalI); try { Thread.sleep(finalI * 2000); } catch (InterruptedException e) { throw new RuntimeException(e); } // if (true) { // throw new RuntimeException("测试异常" + finalI); // } return "Job" + finalI; }) .thenAcceptAsync( (queryResult) -> { System.out.println(Thread.currentThread() + "thenAcceptAsync " + queryResult); results.put(String.valueOf(finalI), queryResult); }) .whenCompleteAsync( (aVoid, throwable) -> { if (Objects.nonNull(throwable)) { System.err.println(Thread.currentThread() + "whenCompleteAsync Job" + throwable.getMessage()); } System.out.println(Thread.currentThread() + "whenCompleteAsync Job" + finalI); }) .exceptionally( throwable -> { System.err.println("exceptionally " + throwable.getMessage()); throw new DaportalException(throwable.getMessage()); }); completableFutures.add(completableFuture); } System.out.println(Thread.currentThread() + "for loop completed ...... "); CompletableFuture[] voidCompletableFuture = {}; return CompletableFuture .allOf(completableFutures.toArray(voidCompletableFuture)) .handle( (aVoid, throwable) -> { System.out.println(Thread.currentThread() + "handle ...... "); if (Objects.nonNull(throwable)) { System.err.println("handle " + throwable.getMessage()); throw new DaportalException(throwable.getMessage()); } try { return StringUtil.MAPPER.writeValueAsString(results); } catch (JsonProcessingException e) { throw new DaportalException(e.getMessage()); } }) .join(); }
执行输出:
Thread[main,5,main]testCF start ...... Thread[ForkJoinPool.commonPool-worker-1,5,main]supplyAsync Job1 Thread[ForkJoinPool.commonPool-worker-2,5,main]supplyAsync Job2 Thread[ForkJoinPool.commonPool-worker-3,5,main]supplyAsync Job3 Thread[main,5,main]for loop completed ...... Thread[ForkJoinPool.commonPool-worker-1,5,main]thenAcceptAsync Job1 Thread[ForkJoinPool.commonPool-worker-1,5,main]supplyAsync Job4 Thread[ForkJoinPool.commonPool-worker-2,5,main]whenCompleteAsync Job1 Thread[ForkJoinPool.commonPool-worker-2,5,main]supplyAsync Job5 Thread[ForkJoinPool.commonPool-worker-3,5,main]thenAcceptAsync Job2 Thread[ForkJoinPool.commonPool-worker-3,5,main]thenAcceptAsync Job3 Thread[ForkJoinPool.commonPool-worker-3,5,main]whenCompleteAsync Job2 Thread[ForkJoinPool.commonPool-worker-3,5,main]whenCompleteAsync Job3 Thread[ForkJoinPool.commonPool-worker-1,5,main]thenAcceptAsync Job4 Thread[ForkJoinPool.commonPool-worker-1,5,main]whenCompleteAsync Job4 Thread[ForkJoinPool.commonPool-worker-2,5,main]thenAcceptAsync Job5 Thread[ForkJoinPool.commonPool-worker-1,5,main]whenCompleteAsync Job5 Thread[ForkJoinPool.commonPool-worker-1,5,main]handle ...... {"1":"Job1","2":"Job2","3":"Job3","4":"Job4","5":"Job5"} Process finished with exit code 0
换行越多表明时间间隔越长
下面是整个流程中各个异步线程的工做状况:
CompletableFuture
方法都将返回一个CompletableFuture
,以此来支持链式编程。async
后缀的方法会从新入栈并等待调度,并使用CAS乐观锁提高并发性能:/** Returns true if successfully pushed c onto stack. */ final boolean tryPushStack(Completion c) { Completion h = stack; lazySetNext(c, h); return UNSAFE.compareAndSwapObject(this, STACK, h, c);// CAS乐观锁 }
async
后缀的方法都会包含一个加Executor
参数的重载方法,用于指定外部线程池,默认commonPool
大小是3(这一点能够从后续的“流程示例”得出):private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); ...... static Executor screenExecutor(Executor e) { if (!useCommonPool && e == ForkJoinPool.commonPool()) return asyncPool; if (e == null) throw new NullPointerException(); return e; }