CompletableFuture 的同步与异步

CompletableFuture 类声明了 CompletionStage 接口,CompletionStage 接口实际上提供了同步或异步运行计算的舞台。java

所谓异步调用其实就是实现一个可无需等待被调用函数的返回值而让操做继续运行的方法。在 Java 语言中,简单的讲就是另启一个线程来完成调用中的部分计算,使调用继续运行或返回,而不须要等待计算结果。但调用者仍须要获取线程的计算结果。dom

CompletableFuture 简单的异步运算场景

CompletableFuture 提供了以下的异步方法,异步

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {
    return asyncSupplyStage(screenExecutor(executor), supplier);
}

public static CompletableFuture<Void> runAsync(Runnable runnable) {
    return asyncRunStage(asyncPool, runnable);
}

public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) {
    return asyncRunStage(screenExecutor(executor), runnable);
}

supplyAsync 返回带有任务结果的CompletableFuture,而runAsync返回CompletableFuture<Void>。async

Executor参数能够手动指定线程池,不然默认ForkJoinPool.commonPool()系统级公共线程池。ide

注意:ForkJoinPool.commonPool() 是 Daemon Thread(守护线程) 函数

只要当前JVM实例中尚存在任何一个非守护线程(用户线程)没有结束,守护线程就所有工做;this

只有当用户线程结束时,JVM推出,守护线程随着JVM一同结束工做。spa

@Test
public void test() throws InterruptedException {
    CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
        System.out.println("runAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon());
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });

    System.out.println("done=" + cf.isDone());
    TimeUnit.SECONDS.sleep(4);
    System.out.println("done=" + cf.isDone());
}

输出,线程

done=false
runAsync=ForkJoinPool.commonPool-worker-1|true
done=true

在这段代码中,runAsync 是异步执行的 ,经过 Thread.currentThread().isDaemon() 打印的结果就能够知道是Daemon线程异步执行的。code

 

CompletableFuture 同步执行示例

CompletableFuture中不带Async的同步方法以下,

public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action);
public CompletableFuture<Void> thenRun(Runnable action);

这些方法都是同步执行的

@Test
public void testSync11() {
    CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApply(s -> {
        randomSleep();
        System.out.println("thenApply=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon());
        return s.toUpperCase();
    });
    // gotNow 若是成功就返回结果
    System.out.println(cf.getNow(null));
    // 一直等待成功,而后返回结果
    System.out.println(cf.join());
}

输出以下,

thenApply=main|false
MESSAGE
MESSAGE

首先经过 completedFuture 方法获取一个结果已经完成的Future,而后执行同步方法thenApply,由main线程执行,会阻塞当前的main线程 ,最后getNow方法获取到结果。

 

CompletableFuture 异步执行示例

CompletableFuture中异步执行的方法都是带Async 结尾的,能够制定执行异步任务的线程池,也能够不指定,若是不指定,默认使用ForkJoinPool.commonPool() 线程池。

public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);

public CompletableFuture<Void> thenRunAsync(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor);

如下使用的两个方法都是异步执行任务的方法

@Test
public void testAsync1() {
    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        randomSleep();
        System.out.println("supplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon());
        return "message";
    }).thenApplyAsync(s -> {
        randomSleep();
        System.out.println("thenApplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon());
        return s.toUpperCase();
    });
    // gotNow 若是成功就返回结果
    System.out.println(cf.getNow(null));
    // 一直等待成功,而后返回结果
    System.out.println(cf.join());
}

输出以下,

null
supplyAsync=ForkJoinPool.commonPool-worker-1|true
thenApplyAsync=ForkJoinPool.commonPool-worker-1|true
MESSAGE

当执行 cf.gotNow 方法的时候,异步任务尚未执行完成,因此返回 null 。执行 cf.join 方法,阻塞一直等到异步任务结果返回。

 

thenApply 是由哪一个线程执行的

thenApply 不带async结尾,是一个同步方法,但可能仍是由执行任务的线程池来执行,或者是当前main线程来执行。

@Test
public void testAsync125() {
    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        //没有sleep
        System.out.println("supplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon());
        return "message";
    }).thenApply(s -> {
        // thenApplyAsync=main|false 使用调用者线程来进行处理
        System.out.println("thenApply=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon());
        return s.toUpperCase();
    });
    // gotNow 若是成功就返回结果
    System.out.println(cf.getNow(null));
    // 一直等待成功,而后返回结果
    System.out.println(cf.join());
}

@Test
public void testAsync126() throws InterruptedException {
    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        randomSleep();
        System.out.println("supplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon());
        return "message";
    });

    TimeUnit.SECONDS.sleep(2);

    // 使用调用者线程 当前线程main 来进行处理thenApply 转换操做
    cf = cf.thenApply(s -> {
        System.out.println("thenApply=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon());
        return s.toUpperCase();
    });
    // gotNow 若是成功就返回结果
    System.out.println(cf.getNow(null));
    // 一直等待成功,而后返回结果
    System.out.println(cf.join());
}

@Test
public void testAsync124() {
    CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> {
        randomSleep();
        System.out.println("supplyAsync=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon());
        return "message";
    }).thenApply(s -> {
        System.out.println("thenApply=" + Thread.currentThread().getName() + "|" + Thread.currentThread().isDaemon());
        return s.toUpperCase();
    });
    // gotNow 若是成功就返回结果
    System.out.println(cf.getNow(null));
    // 一直等待成功,而后返回结果
    System.out.println(cf.join());
}

输出以下,

supplyAsync=ForkJoinPool.commonPool-worker-1|true
thenApplyAsync=main|false
MESSAGE
MESSAGE

//////
supplyAsync=ForkJoinPool.commonPool-worker-1|true
thenApply=main|false
MESSAGE
MESSAGE

//////
null
supplyAsync=ForkJoinPool.commonPool-worker-1|true
thenApply=ForkJoinPool.commonPool-worker-1|true
MESSAGE

在testAsync125方法中,thenApply 回调方法是由当前main线程执行的;

在testAsync126方法中,thenApply 回调方法是由当前main线程执行的;

在testAsync124方法中,thenApply 方法是由执行任务的线程池的线程来执行的,thenApply 虽然是一个同步方法,但其调用是经过 ForkJoinPool.commonPool 线程池异步执行的。

因此要注意的是 若是在thenApply 方法中执行比较耗时的操做,会阻塞调用者线程或者主线程。

 

CompletableFuture allOf 方法同步执行效果

When we need to execute multiple Futures in parallel, we usually want to wait for all of them to execute and then process their combined results.

The CompletableFuture.allOf static method allows to wait for completion of all of the Futures provided as a var-arg:

CompletableFuture<String> future1
    = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Hello";
});

CompletableFuture<String> future2
    = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Beautiful";
});

CompletableFuture<String> future3
    = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(4);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "World";
});

System.out.println("f1=" + future1.isDone());
System.out.println("f2=" + future2.isDone());
System.out.println("f3=" + future3.isDone());
CompletableFuture<Void> combinedFuture
    = CompletableFuture.allOf(future1, future2, future3);

System.out.println("========");
System.out.println("f1=" + future1.isDone());
System.out.println("f2=" + future2.isDone());
System.out.println("f3=" + future3.isDone());

// 等待全部的future 执行完成
combinedFuture.join();

System.out.println("========");
System.out.println("f1=" + future1.isDone());
System.out.println("f2=" + future2.isDone());
System.out.println("f3=" + future3.isDone());
f1=false
f2=false
f3=false
========
f1=false
f2=false
f3=false
========
f1=true
f2=true
f3=true

经过 combinedFuture.join()  方法等待全部的异步任务执行完成。当其全部的CompletableFuture均完成结果时,combinedFuture就会处于完成状态

Notice that the return type of the CompletableFuture.allOf() is a CompletableFuture<Void>. The limitation of this method is that it does not return the combined results of all Futures. Instead you have to manually get results from Futures. Fortunately, CompletableFuture.join() method and Java 8 Streams API makes it simple:

String combined = Stream.of(future1, future2, future3)
    .map(CompletableFuture::join)
    .collect(Collectors.joining(" "));

System.out.println(combined);

更简化后完整连贯的代码,

@Test
public void testAllOf2() {
    CompletableFuture<String> future1
        = CompletableFuture.supplyAsync(() -> "Hello");

    CompletableFuture<String> future2
        = CompletableFuture.supplyAsync(() -> "Beautiful");

    CompletableFuture<String> future3
        = CompletableFuture.supplyAsync(() -> "World");

    CompletableFuture.allOf(future1, future2, future3)
        .thenApply((v) -> Stream.of(future1, future2, future3)
            .map(CompletableFuture::join)
            .collect(Collectors.joining(" ")))
        .thenAccept(System.out::println);

}

========END========

相关文章
相关标签/搜索