CompletableFuture 是Java 8 新增长的Api,该类实现,Future和CompletionStage两个接口,提供了很是强大的Future的扩展功能,能够帮助咱们简化异步编程的复杂性,提供了函数式编程的能力,能够经过回调的方式处理计算结果,而且提供了转换和组合CompletableFuture的方法。java
public T get()编程
该方法为阻塞方法,会等待计算结果完成app
public T get(long timeout,TimeUnit unit)dom
有时间限制的阻塞方法异步
public T getNow(T valueIfAbsent)函数式编程
当即获取方法结果,若是没有计算结束则返回传的值异步编程
public T join()函数
和 get() 方法相似也是主动阻塞线程,等待计算结果。和get() 方法有细微的差异spa
public class ThreadUtil {
public static void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
}
}
复制代码
public static void main(String[] args) {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(200);
return 10 / 0;
});
// System.out.println(future.join());
// System.out.println(future.get());
System.out.println(future.getNow(10));
}
复制代码
public boolean complete(T value)线程
当即完成计算,并把结果设置为传的值,返回是否设置成功
若是 CompletableFuture 没有关联任何的Callback、异步任务等,若是调用get方法,那会一直阻塞下去,可使用complete方法主动完成计算
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = new CompletableFuture<>();
// future.get();
future.complete(10);
}
复制代码
建立一个异步任务
public static <U> CompletableFuture<U> completedFuture(U value)
建立一个有初始值的CompletableFuture
public static CompletableFuture runAsync(Runnable runnable)
public static CompletableFuture runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
以上四个方法中,以 Async 结尾而且没有 Executor 参数的,会默认使用 ForkJoinPool.commonPool() 做为它的线程池执行异步代码。 以run开头的,由于以 Runable 类型为参数因此没有返回值。示例:
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> System.out.println("runAsync"));
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "supplyAsync");
System.out.println(future1.get());
System.out.println(future2.get());
}
复制代码
结果:
runAsync
null
supplyAsync
复制代码
当CompletableFuture的计算结果完成,或者抛出异常的时候,咱们能够执行特定的Action。主要是下面的方法:
参数类型为 BiConsumer<? super T, ? super Throwable> 会获取上一步计算的计算结果和异常信息。以Async结尾的方法可能会使用其它的线程去执行,若是使用相同的线程池,也可能会被同一个线程选中执行,如下皆相同。
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(100);
return 20;
}).whenCompleteAsync((v, e) -> {
System.out.println(v);
System.out.println(e);
});
System.out.println(future.get());
}
复制代码
public CompletableFuture exceptionally(Function<Throwable,? extends T> fn)
该方法是对异常状况的处理,当函数异常时应该的返回值
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(100);
return 10 / 0;
}).whenCompleteAsync((v, e) -> {
System.out.println(v);
System.out.println(e);
}).exceptionally((e) -> {
System.out.println(e.getMessage());
return 30;
});
System.out.println(future.get());
}
复制代码
handle 方法和whenComplete方法相似,只不过接收的是一个 BiFunction<? super T,Throwable,? extends U> fn 类型的参数,所以有 whenComplete 方法和 转换的功能 (thenApply)
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture
.supplyAsync(() -> 10 / 0)
.handle((t, e) -> {
System.out.println(e.getMessage());
return 10;
});
System.out.println(future.get());
}
复制代码
CompletableFuture 因为有回调,能够没必要等待一个计算完成而阻塞着调用县城,能够在一个结果计算完成以后紧接着执行某个Action。咱们能够将这些操做串联起来。
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture
.supplyAsync(() -> 1)
.thenApply((a) -> {
System.out.println(a);//1
return a * 10;
}).thenApply((a) -> {
System.out.println(a);//10
return a + 10;
}).thenApply((a) -> {
System.out.println(a);//20
return a - 5;
});
System.out.println(future.get());//15
}
复制代码
这些方法不是立刻执行的,也不会阻塞,而是前一个执行完成后继续执行下一个。
和 handle 方法的区别是,handle 会处理正常计算值和异常,不会抛出异常。而 thenApply 只会处理正常计算值,有异常则抛出。
单纯的去消费结果而不会返回新的值,因些计算结果为 Void;
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture
.supplyAsync(() -> 1)
.thenAccept(System.out::println) //消费 上一级返回值 1
.thenAcceptAsync(System.out::println); //上一级没有返回值 输出null
System.out.println(future.get()); //消费函数没有返回值 输出null
}
复制代码
和 thenAccept 相比,参数类型多了一个 CompletionStage<? extends U> other,以上方法会接收上一个CompletionStage返回值,和当前的一个。
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture
.supplyAsync(() -> 1)
.thenAcceptBoth(CompletableFuture.supplyAsync(() -> 2), (a, b) -> {
System.out.println(a);
System.out.println(b);
}).get();
}
复制代码
public CompletableFuture runAfterBoth(CompletionStage<?> other, Runnable action)
runAfterBoth 和以上方法不一样,传一个 Runnable 类型的参数,不接收上一级的返回值
更完全的:
以上是完全的纯消费,彻底忽略计算结果
以上接收类型为 Function<? super T,? extends CompletionStage<U>> fn ,fn 接收上一级返回的结果,并返回一个新的 CompletableFuture
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture
.supplyAsync(() -> 1)
.thenApply((a) -> {
ThreadUtil.sleep(1000);
return a + 10;
})
.thenCompose((s) -> {
System.out.println(s); //11
return CompletableFuture.supplyAsync(() -> s * 5);
});
System.out.println(future.get());//55
}
复制代码
两个CompletionStage是并行执行的,它们之间并无前后依赖顺序,other并不会等待先前的CompletableFuture执行完毕后再执行。
public static void main(String[] args) throws ExecutionException, InterruptedException {
Random random = new Random();
CompletableFuture<Integer> future = CompletableFuture
.supplyAsync(() -> {
ThreadUtil.sleep(random.nextInt(1000));
System.out.println("supplyAsync");
return 2;
}).thenApply((a) -> {
ThreadUtil.sleep(random.nextInt(1000));
System.out.println("thenApply");
return a * 3;
})
.thenCombine(CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(random.nextInt(1000));
System.out.println("thenCombineAsync");
return 10;
}), (a, b) -> {
System.out.println(a);
System.out.println(b);
return a + b;
});
System.out.println(future.get());
}
复制代码
thenCombine 和 supplyAsync 不必定哪一个先哪一个后,是并行执行的。
先看示例:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Random random = new Random();
CompletableFuture
.supplyAsync(() -> {
ThreadUtil.sleep(random.nextInt(1000));
return "A";
})
.acceptEither(CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(random.nextInt(1000));
return "B";
}), System.out::println)
.get();
}
复制代码
以上代码有时输出A,有时输出B,哪一个Future先执行完就会根据它的结果计算。
acceptEither方法是当任意一个 CompletionStage 完成的时候,action 这个消费者就会被执行。这个方法返回 CompletableFuture
applyToEither 方法是当任意一个 CompletionStage 完成的时候,fn会被执行,它的返回值会看成新的CompletableFuture<U>的计算结果。
acceptEither 没有返回值,applyToEither 有返回值
这个方法的意思是把有方法都执行完才往下执行,没有返回值
public static void main(String[] args) throws ExecutionException, InterruptedException {
Random random = new Random();
CompletableFuture.allOf(
CompletableFuture.runAsync(() -> {
ThreadUtil.sleep(random.nextInt(1000));
System.out.println(1);
}),
CompletableFuture.runAsync(() -> {
ThreadUtil.sleep(random.nextInt(1000));
System.out.println(2);
}))
.get();
}
复制代码
有时输出1 2 有时输出 2 1
任务一个方法执行完都往下执行,返回一个Object类型的值
public static void main(String[] args) throws ExecutionException, InterruptedException {
Random random = new Random();
Object obj = CompletableFuture.anyOf(
CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(random.nextInt(1000));
return 1;
}),
CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(random.nextInt(1000));
return 2;
})).get();
System.out.println(obj);
}
复制代码
输出结果有时为1 有时间为 2