Java8 - 使用CompletableFuture(一)

Future 的使用 

自Java 5开始添加了Future,用来描述一个异步计算的结果。获取一个结果时方法较少,要么经过轮询isDone(),确认完成后调用get()获取值,要么调用get()设置一个超时时间。可是get()方法会阻塞调用线程,这种阻塞的方式显然和咱们的异步编程的初衷相违背。如:html

package com.common.future;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class FutureRunnerTest {

    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newSingleThreadExecutor();

        //提交一个 Callable
        Future<Integer> f = es.submit(() -> {
            // 长时间的异步计算
            Thread.sleep(2000L);
            System.out.println("长时间的异步计算");
            return 100;
        });

        // 轮询
        while (true) {
            System.out.println("f.isDone");
            if (f.isDone()) {
                try {
                    System.out.println(f.get());
                    es.shutdown();
                    break;
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
            Thread.sleep(100L);
        }
    }
}

虽然Future以及相关使用方法提供了异步执行任务的能力,可是对于结果的获取倒是很不方便,只能经过阻塞或者轮询的方式获得任务的结果。阻塞的方式显然和咱们的异步编程的初衷相违背,轮询的方式又会耗费无谓的CPU资源,并且也不能及时地获得计算结果。java

Future 的局限性

  1. 不能手动完成
    当你写了一个函数,用于经过一个远程API获取一个电子商务产品最新价格。由于这个 API 太耗时,你把它容许在一个独立的线程中,而且从你的函数中返回一个 Future。如今假设这个API服务宕机了,这时你想经过该产品的最新缓存价格手工完成这个Future 。你会发现没法这样作。
  2. Future 的结果在非阻塞的状况下,不能执行更进一步的操做
    Future 不会通知你它已经完成了,它提供了一个阻塞的 get() 方法通知你结果。你没法给 Future 植入一个回调函数,当 Future 结果可用的时候,用该回调函数自动的调用 Future 的结果。
  3. 多个 Future 不能串联在一块儿组成链式调用
    有时候你须要执行一个长时间运行的计算任务,而且当计算任务完成的时候,你须要把它的计算结果发送给另一个长时间运行的计算任务等等。你会发现你没法使用 Future 建立这样的一个工做流。
  4. 不能组合多个 Future 的结果
    假设你有10个不一样的Future,你想并行的运行,而后在它们运行未完成后运行一些函数。你会发现你也没法使用 Future 这样作。
  5. 没有异常处理
    Future API 没有任务的异常处理结构竟然有如此多的限制,幸亏咱们有CompletableFuture,你可使用 CompletableFuture 达到以上全部目的。

CompletableFuture 实现了 Future 和 CompletionStage接口,而且提供了许多关于建立,链式调用和组合多个 Future 的便利方法集,并且有普遍的异常处理支持。web

 

建立CompletableFuture对象

在该类中提供了四个静态方法建立CompletableFuture对象:编程

  1. 使用new 建立CompletableFuture 
  2. 使用completedFuture方法运行一个成功的CompletableFuture
  3. 使用 runAsync() 运行异步计算
  4. 使用 supplyAsync() 运行一个异步任务而且返回结果
package com.common.future;

import java.util.concurrent.*;

public class CompletableFutureRunnerTest {

    // 建立一个固定大小的线程池子
    static ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() {
        int count = 1;

        @Override
        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, "custom-executor-" + count++);
        }
    });

    public static void sleep() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 直接使用new 建立
        CompletableFuture newCompletable = new CompletableFuture();

        // 经过给定的值 建立一个 已经完成的 future
        CompletableFuture<String> cf = CompletableFuture.completedFuture("message");
        System.out.println(cf.isDone());
        System.out.println(cf.getNow(null));

        // 使用 runAsync() 运行异步计算
        CompletableFuture<Void> completableFuture01 = CompletableFuture.runAsync(() -> {
            sleep();
            System.out.println("runAsync...");
        });

        CompletableFuture<Void> completableFuture02 = CompletableFuture.runAsync(() -> {
            sleep();
            System.out.println("runAsync...");
        }, executor);

        // 使用 supplyAsync() 运行一个异步任务而且返回结果
        CompletableFuture<String> completableFuture03 = CompletableFuture.supplyAsync(() -> {
            sleep();
            System.out.println("supplyAsync03...");
            return "hello world";
        });

        System.out.println(completableFuture03.isDone());
        
        // Block and wait for the future to complete
        System.out.println(completableFuture03.get());

        CompletableFuture<String> completableFuture04 = CompletableFuture.supplyAsync(() -> {
            sleep();
            System.out.println("supplyAsync04...");
            return "hello world";
        }, executor);

        System.out.println(completableFuture04.isDone());
        System.out.println(completableFuture04.get());

    }
}

上面示例中的isDone() ,get() 方法都是 继承于 Future 接口中的,通常状况下,使用CompletableFuture 不须要调用isDone() 方法判断是否完成,也不须要调用get 方法获取异步执行的结果的。缓存

 

CompletableFuture 计算结果完成时的处理

当CompletableFuture的计算结果完成时,有以下三个方法进行处理:异步

CompletableFuture<String> completableFuture03 = CompletableFuture.supplyAsync(() -> {
    sleep();
    System.out.println("supplyAsync03...");
    return "hello world";
});

System.out.println(completableFuture03.isDone());
System.out.println(completableFuture03.get());

completableFuture03.whenComplete((t, ex) -> {
    if (t != null) {
        System.out.println(t);
    } else {
        ex.printStackTrace();
    }
});

completableFuture03.whenCompleteAsync((t, ex) -> {
    if (t != null) {
        System.out.println(t);
    } else {
        ex.printStackTrace();
    }
});

completableFuture03.whenCompleteAsync((t, ex) -> {
    if (t != null) {
        System.out.println(t);
    } else {
        ex.printStackTrace();
    }
}, executor);

若是抛出了异常,对异常的处理以下所示,ide

CompletableFuture<Integer> completableFuture05 = CompletableFuture.supplyAsync(() -> {
    sleep();
    return 1 / 0;
});

completableFuture05.whenComplete((t, ex) -> {
    if (t != null) {
        System.out.println(t);
    } else {
        ex.printStackTrace();
    }
});

这里首先判断 t 的值是否为空,若是为空直接打印异常的堆栈信息异步编程

java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
	at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.ArithmeticException: / by zero
	at com.common.future.CompletableFutureRunnerTest.lambda$main$4(CompletableFutureRunnerTest.java:93)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
	... 5 more

还有一个专门处理异常状况的方法,如 exceptionally,函数

CompletableFuture<Integer> completableFuture05 = CompletableFuture.supplyAsync(() -> {
    sleep();
    return 1 / 0;
});

completableFuture05.exceptionally((e) -> {
    e.printStackTrace();
    return 0;
}).whenComplete((t, e) -> {
    if (t != null) {
        System.out.println(t);
    } else {
        e.printStackTrace();
    }
}).join();

 

进行转换-thenApply 操做

public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);

这一组函数的功能是当原来的CompletableFuture计算完后,将结果传递给函数fn,将fn的结果做为新的CompletableFuture计算结果。所以它的功能至关于将CompletableFuture<T>转换成CompletableFuture<U>网站

不以Async结尾的方法由原来的线程计算,以Async结尾的方法由默认的线程池ForkJoinPool.commonPool()或者指定的线程池executor运行。看下面的例子,

package com.common.future;

import java.util.concurrent.CompletableFuture;

public class ThenApplyTest {

    public static void main(String[] args) {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 100);
        String f = future.thenApplyAsync(i -> i * 10).thenApply(i -> i.toString()).join();
        System.out.println(f);

        CompletableFuture.supplyAsync(() -> "hello world".substring(0, 5))
            .thenApply(String::length)
            .whenComplete((r, t) -> {
                if (t == null) {
                    System.out.println("the length = " + r);
                }
            });

    }
}

 

进行消费-thenAccept & thenRun操做

若是你不想从你的回调函数中返回任何东西,仅仅想在Future完成后运行一些代码片断,你可使用thenAccept() 和 thenRun()方法,这些方法常常在调用链的最末端的最后一个回调函数中使用。

public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);

public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor);

CompletableFuture.thenAccept() 持有一个Consumer<T> ,返回一个CompletableFuture<Void>。它能够访问CompletableFuture的结果:

// thenAccept() example
CompletableFuture.supplyAsync(() -> {
    return ProductService.getProductDetail(productId);
}).thenAccept(product -> {
    System.out.println("Got product detail from remote service " + product.getName())
});

thenAcceptBoth以及相关方法提供了相似的功能,当两个CompletionStage都正常完成计算的时候,就会执行提供的action,它用来组合另一个异步的结果。

//thenAccept
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor);

以下面的使用示例,

CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "first";
    }).thenAcceptBoth(CompletableFuture.completedFuture("second"), (first, second) -> System.out.println(first + " : " + second)).join();

虽然thenAccept()能够访问CompletableFuture的结果,但thenRun()不能访Future的结果,它持有一个Runnable返回CompletableFuture<Void>:

// thenRun() example
CompletableFuture.supplyAsync(() -> {
    // Run some computation  
}).thenRun(() -> {
    // Computation Finished.
});

 

组合两个CompletableFuture

1. 使用thenCompose()组合两个独立的future

假设你想从一个远程API中获取一个用户的详细信息,一旦用户信息可用,你想从另一个服务中获取他的贷款利率。
考虑下如下两个方法getUserDetail() getCreditRating()的实现:

static CompletableFuture<User> getUsersDetail(String userId) {
    return CompletableFuture.supplyAsync(() -> {
        return new User("12312121", "xiaoming", 12);
    });
}

static CompletableFuture<Double> getCreditRating(User user) {
    return CompletableFuture.supplyAsync(() -> {
        if (user.getUserId() == null || user.getUserId().equals("")) {
            return 0.0;
        } else {
            return 0.1;
        }
    });
}

如今让咱们弄明白当使用了thenApply()后是否会达到咱们指望的结果-

// 若是使用 thenApply ,则返回了一个 最终结果是一个嵌套的CompletableFuture。
CompletableFuture<CompletableFuture<Double>> res =
    getUsersDetail("testUserId").thenApply(user -> getCreditRating(user));

在更早的示例中,Supplier函数传入thenApply将返回一个简单的值,可是在本例中,将返回一个CompletableFuture。以上示例的最终结果是一个嵌套的CompletableFuture。
若是你想获取最终的结果给最顶层future,使用 thenCompose()方法代替-

// 若是你想获取最终的结果给最顶层future,使用 thenCompose()方法代替-
CompletableFuture<Double> result = getUsersDetail("testUserId")
    .thenCompose(user -> getCreditRating(user));

所以,规则就是-若是你的回调函数返回一个CompletableFuture,可是你想从CompletableFuture链中获取一个直接合并后的结果,这时候你可使用thenCompose()

2. 使用thenCombine()组合两个独立的 future

虽然thenCompose()被用于当一个future依赖另一个future的时候用来组合两个future。thenCombine()被用来当两个独立的Future都完成的时候,用来作一些事情。

System.out.println("Retrieving weight.");
CompletableFuture<Double> weightInKgFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return 65.0;
});

System.out.println("Retrieving height.");
CompletableFuture<Double> heightInCmFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return 177.8;
});

System.out.println("Calculating BMI.");
CompletableFuture<Double> combinedFuture = weightInKgFuture
    .thenCombine(heightInCmFuture, (weightInKg, heightInCm) -> {
        Double heightInMeter = heightInCm / 100;
        return weightInKg / (heightInMeter * heightInMeter);
    });

System.out.println("Your BMI is - " + combinedFuture.get());

当两个Future都完成的时候,传给``thenCombine()的回调函数将被调用。

 

组合多个CompletableFuture

咱们使用thenCompose() 和 thenCombine()把两个CompletableFuture组合在一块儿。如今若是你想组合任意数量的CompletableFuture,应该怎么作?咱们可使用如下两个方法组合任意数量的CompletableFuture。

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);

CompletableFuture.allOf()

CompletableFuture.allOf的使用场景是当你有一个列表的独立future,而且你想在它们都完成后并行的作一些事情。

假设你想下载一个网站的100个不一样的页面。你能够串行的作这个操做,可是这很是消耗时间。所以你想写一个函数,传入一个页面连接,返回一个CompletableFuture,异步的下载页面内容。

public static CompletableFuture<String> downloadWebPage(String pageLink) {
    return CompletableFuture.supplyAsync(() -> {
        // Code to download and return the web page's content
        try {
            String html = Jsoup.connect(pageLink).execute().body();
            return html;
        } catch (IOException e) {
            e.printStackTrace();
        }
        return "";
    });
}

如今,当全部的页面已经下载完毕,你想计算包含关键字CompletableFuture页面的数量。可使用CompletableFuture.allOf()达成目的。

List<String> webPageLinks = Arrays.asList("https://my.oschina.net/xinxingegeya/blog/674006",
    "https://my.oschina.net/xinxingegeya/blog/637773", "https://my.oschina.net/xinxingegeya/blog/2222079");

List<CompletableFuture<String>> pageContentFutures = webPageLinks.stream()
    .map(webPageLink -> downloadWebPage(webPageLink))
    .collect(Collectors.toList());

// Create a combined Future using allOf()
CompletableFuture<Void> allFutures = CompletableFuture.allOf(pageContentFutures.toArray(new CompletableFuture[0]));

使用CompletableFuture.allOf()的问题是它返回CompletableFuture<Void>。可是咱们能够经过写一些额外的代码来获取全部封装的CompletableFuture结果。

// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list -
CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v -> {
    return pageContentFutures.stream()
        .map(pageContentFuture -> pageContentFuture.join())
        .collect(Collectors.toList());
});

花一些时间理解下以上代码片断。当全部future完成的时候,咱们调用了future.join(),所以咱们不会在任何地方阻塞。

join()方法和get()方法很是相似,这惟一不一样的地方是若是最顶层的CompletableFuture完成的时候发生了异常,它会抛出一个未经检查的异常。

如今让咱们计算包含关键字页面的数量。

// Count the number of web pages having the "CompletableFuture" keyword.
CompletableFuture<Long> countFuture = allPageContentsFuture.thenApply(pageContents -> {
    return pageContents.stream()
        .filter(pageContent -> pageContent.contains("CompletableFuture"))
        .count();
});

System.out.println("Number of Web Pages having CompletableFuture keyword - " + countFuture.get());

完整的代码以下,

package common.future;

import org.jsoup.Jsoup;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

public class MoreCompletableFutureTest {

    public static CompletableFuture<String> downloadWebPage(String pageLink) {
        return CompletableFuture.supplyAsync(() -> {
            // Code to download and return the web page's content
            try {
                String html = Jsoup.connect(pageLink).execute().body();
                return html;
            } catch (IOException e) {
                e.printStackTrace();
            }
            return "";
        });
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        List<String> webPageLinks = Arrays.asList("https://my.oschina.net/xinxingegeya/blog/674006",
            "https://my.oschina.net/xinxingegeya/blog/637773", "https://my.oschina.net/xinxingegeya/blog/2222079");

        List<CompletableFuture<String>> pageContentFutures = webPageLinks.stream()
            .map(webPageLink -> downloadWebPage(webPageLink))
            .collect(Collectors.toList());

        // Create a combined Future using allOf()
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(pageContentFutures.toArray(new CompletableFuture[0]));

        // When all the Futures are completed, call `future.join()` to get their results and collect the results in a list -
        CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v -> {
            return pageContentFutures.stream()
                .map(pageContentFuture -> pageContentFuture.join())
                .collect(Collectors.toList());
        });

        // Count the number of web pages having the "CompletableFuture" keyword.
        CompletableFuture<Long> countFuture = allPageContentsFuture.thenApply(pageContents -> {
            return pageContents.stream()
                .filter(pageContent -> pageContent.contains("CompletableFuture"))
                .count();
        });

        System.out.println("Number of Web Pages having CompletableFuture keyword - " + countFuture.get());

    }
}

CompletableFuture.anyOf()

CompletableFuture.anyOf()和其名字介绍的同样,当任何一个CompletableFuture完成的时候【相同的结果类型】,返回一个新的CompletableFuture。如下示例:

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 1";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 2";
});

CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 3";
});

CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);

System.out.println(anyOfFuture.get()); // Result of Future 2

在以上示例中,当三个中的任何一个CompletableFuture完成, anyOfFuture就会完成。由于future2的休眠时间最少,所以她最早完成,最终的结果将是future2的结果。

CompletableFuture.anyOf()传入一个Future可变参数,返回CompletableFuture<Object>。CompletableFuture.anyOf()的问题是若是你的CompletableFuture返回的结果是不一样类型的,这时候你讲会不知道你最终CompletableFuture是什么类型。

======END======

相关文章
相关标签/搜索