Java 8 CompletableFuture 教程

Java 8 有大量的新特性和加强如 Lambda 表达式StreamsCompletableFuture等。在本篇文章中我将详细解释清楚CompletableFuture以及它全部方法的使用。html

什么是CompletableFuture?

在Java中CompletableFuture用于异步编程,异步编程是编写非阻塞的代码,运行的任务在一个单独的线程,与主线程隔离,而且会通知主线程它的进度,成功或者失败。java

在这种方式中,主线程不会被阻塞,不须要一直等到子线程完成。主线程能够并行的执行其余任务。web

使用这种并行方式,能够极大的提升程序的性能。express

Future vs CompletableFuture

CompletableFuture 是 Future API的扩展。编程

Future 被用于做为一个异步计算结果的引用。提供一个 isDone() 方法来检查计算任务是否完成。当任务完成时,get() 方法用来接收计算任务的结果。api

Callbale和 Future 教程能够学习更多关于 Future 知识.缓存

Future API 是很是好的 Java 异步编程进阶,可是它缺少一些很是重要和有用的特性。oracle

Future 的局限性

  1. 不能手动完成
    当你写了一个函数,用于经过一个远程API获取一个电子商务产品最新价格。由于这个 API 太耗时,你把它容许在一个独立的线程中,而且从你的函数中返回一个 Future。如今假设这个API服务宕机了,这时你想经过该产品的最新缓存价格手工完成这个Future 。你会发现没法这样作。
  2. Future 的结果在非阻塞的状况下,不能执行更进一步的操做
    Future 不会通知你它已经完成了,它提供了一个阻塞的 get() 方法通知你结果。你没法给 Future 植入一个回调函数,当 Future 结果可用的时候,用该回调函数自动的调用 Future 的结果。
  3. 多个 Future 不能串联在一块儿组成链式调用
    有时候你须要执行一个长时间运行的计算任务,而且当计算任务完成的时候,你须要把它的计算结果发送给另一个长时间运行的计算任务等等。你会发现你没法使用 Future 建立这样的一个工做流。
  4. 不能组合多个 Future 的结果
    假设你有10个不一样的Future,你想并行的运行,而后在它们运行未完成后运行一些函数。你会发现你也没法使用 Future 这样作。
  5. 没有异常处理
    Future API 没有任务的异常处理结构竟然有如此多的限制,幸亏咱们有CompletableFuture,你可使用 CompletableFuture 达到以上全部目的。

CompletableFuture 实现了 FutureCompletionStage接口,而且提供了许多关于建立,链式调用和组合多个 Future 的便利方法集,并且有普遍的异常处理支持。异步

建立 CompletableFuture

1. 简单的例子
可使用以下无参构造函数简单的建立 CompletableFuture:async

CompletableFuture<String> completableFuture = new CompletableFuture<String>();

这是一个最简单的 CompletableFuture,想获取CompletableFuture 的结果可使用 CompletableFuture.get() 方法:

String result = completableFuture.get()

get() 方法会一直阻塞直到 Future 完成。所以,以上的调用将被永远阻塞,由于该Future一直不会完成。

你可使用 CompletableFuture.complete() 手工的完成一个 Future:

completableFuture.complete("Future's Result")

全部等待这个 Future 的客户端都将获得一个指定的结果,而且 completableFuture.complete() 以后的调用将被忽略。

2. 使用 runAsync() 运行异步计算
若是你想异步的运行一个后台任务而且不想改任务返回任务东西,这时候可使用 CompletableFuture.runAsync()方法,它持有一个Runnable 对象,并返回 CompletableFuture<Void>

// Run a task specified by a Runnable Object asynchronously.
CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
    @Override
    public void run() {
        // Simulate a long-running Job
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        System.out.println("I'll run in a separate thread than the main thread.");
    }
});

// Block and wait for the future to complete
future.get()

你也能够以 lambda 表达式的形式传入 Runnable 对象:

// Using Lambda Expression
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    // Simulate a long-running Job   
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    System.out.println("I'll run in a separate thread than the main thread.");
});

在本文中,我使用lambda表达式会比较频繁,若是之前你没有使用过,建议你也多使用lambda 表达式。

3. 使用 supplyAsync() 运行一个异步任务而且返回结果
当任务不须要返回任何东西的时候, CompletableFuture.runAsync() 很是有用。可是若是你的后台任务须要返回一些结果应该要怎么样?

CompletableFuture.supplyAsync() 就是你的选择。它持有supplier<T> 而且返回CompletableFuture<T>T 是经过调用 传入的supplier取得的值的类型。

// Run a task specified by a Supplier object asynchronously
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
    @Override
    public String get() {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        return "Result of the asynchronous computation";
    }
});

// Block and get the result of the Future
String result = future.get();
System.out.println(result);

Supplier<T> 是一个简单的函数式接口,表示supplier的结果。它有一个get()方法,该方法能够写入你的后台任务中,而且返回结果。

你可使用lambda表达式使得上面的示例更加简明:

// Using Lambda Expression
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return "Result of the asynchronous computation";
});
一个关于Executor 和Thread Pool笔记
你可能想知道,咱们知道 runAsync() supplyAsync()方法在单独的线程中执行他们的任务。可是咱们不会永远只建立一个线程。
CompletableFuture能够从全局的 ForkJoinPool.commonPool()得到一个线程中执行这些任务。
可是你也能够建立一个线程池并传给 runAsync() supplyAsync()方法来让他们从线程池中获取一个线程执行它们的任务。
CompletableFuture API 的全部方法都有两个变体-一个接受 Executor做为参数,另外一个不这样:
// Variations of runAsync() and supplyAsync() methods
static CompletableFuture<Void>  runAsync(Runnable runnable)
static CompletableFuture<Void>  runAsync(Runnable runnable, Executor executor)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

建立一个线程池,并传递给其中一个方法:

Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return "Result of the asynchronous computation";
}, executor);

在 CompletableFuture 转换和运行

CompletableFuture.get()方法是阻塞的。它会一直等到Future完成而且在完成后返回结果。
可是,这是咱们想要的吗?对于构建异步系统,咱们应该附上一个回调给CompletableFuture,当Future完成的时候,自动的获取结果。
若是咱们不想等待结果返回,咱们能够把须要等待Future完成执行的逻辑写入到回调函数中。

可使用 thenApply(), thenAccept()thenRun()方法附上一个回调给CompletableFuture。

1. thenApply()
可使用 thenApply() 处理和改变CompletableFuture的结果。持有一个Function<R,T>做为参数。Function<R,T>是一个简单的函数式接口,接受一个T类型的参数,产出一个R类型的结果。

// Create a CompletableFuture
CompletableFuture<String> whatsYourNameFuture = CompletableFuture.supplyAsync(() -> {
   try {
       TimeUnit.SECONDS.sleep(1);
   } catch (InterruptedException e) {
       throw new IllegalStateException(e);
   }
   return "Rajeev";
});

// Attach a callback to the Future using thenApply()
CompletableFuture<String> greetingFuture = whatsYourNameFuture.thenApply(name -> {
   return "Hello " + name;
});

// Block and get the result of the future.
System.out.println(greetingFuture.get()); // Hello Rajeev

你也能够经过附加一系列的thenApply()在回调方法 在CompletableFuture写一个连续的转换。这样的话,结果中的一个 thenApply方法就会传递给该系列的另一个 thenApply方法。

CompletableFuture<String> welcomeText = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Rajeev";
}).thenApply(name -> {
    return "Hello " + name;
}).thenApply(greeting -> {
    return greeting + ", Welcome to the CalliCoder Blog";
});

System.out.println(welcomeText.get());
// Prints - Hello Rajeev, Welcome to the CalliCoder Blog

2. thenAccept() 和 thenRun()
若是你不想从你的回调函数中返回任何东西,仅仅想在Future完成后运行一些代码片断,你可使用thenAccept() thenRun()方法,这些方法常常在调用链的最末端的最后一个回调函数中使用。
CompletableFuture.thenAccept() 持有一个Consumer<T> ,返回一个CompletableFuture<Void>。它能够访问CompletableFuture的结果:

// thenAccept() example
CompletableFuture.supplyAsync(() -> {
    return ProductService.getProductDetail(productId);
}).thenAccept(product -> {
    System.out.println("Got product detail from remote service " + product.getName())
});

虽然thenAccept()能够访问CompletableFuture的结果,但thenRun()不能访Future的结果,它持有一个Runnable返回CompletableFuture<Void>:

// thenRun() example
CompletableFuture.supplyAsync(() -> {
    // Run some computation  
}).thenRun(() -> {
    // Computation Finished.
});
异步回调方法的笔记
CompletableFuture提供的全部回调方法都有两个变体:
`// thenApply() variants
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)`
这些异步回调变体经过在独立的线程中执行回调任务帮助你进一步执行并行计算。
如下示例:
CompletableFuture.supplyAsync(() -> {
    try {
       TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
      throw new IllegalStateException(e);
    }
    return "Some Result"
}).thenApply(result -> {
    /* 
      Executed in the same thread where the supplyAsync() task is executed
      or in the main thread If the supplyAsync() task completes immediately (Remove sleep() call to verify)
    */
    return "Processed Result"
})

在以上示例中,在thenApply()中的任务和在supplyAsync()中的任务执行在相同的线程中。任何supplyAsync()当即执行完成,那就是执行在主线程中(尝试删除sleep测试下)。
为了控制执行回调任务的线程,你可使用异步回调。若是你使用thenApplyAsync()回调,将从ForkJoinPool.commonPool()获取不一样的线程执行。

CompletableFuture.supplyAsync(() -> {
    return "Some Result"
}).thenApplyAsync(result -> {
    // Executed in a different thread from ForkJoinPool.commonPool()
    return "Processed Result"
})

此外,若是你传入一个ExecutorthenApplyAsync()回调中,,任务将从Executor线程池获取一个线程执行。

Executor executor = Executors.newFixedThreadPool(2);
CompletableFuture.supplyAsync(() -> {
    return "Some result"
}).thenApplyAsync(result -> {
    // Executed in a thread obtained from the executor
    return "Processed Result"
}, executor);

组合两个CompletableFuture

1. 使用 thenCompose() 组合两个独立的future
假设你想从一个远程API中获取一个用户的详细信息,一旦用户信息可用,你想从另一个服务中获取他的贷方。
考虑下如下两个方法getUserDetail() getCreditRating()的实现:

CompletableFuture<User> getUsersDetail(String userId) {
    return CompletableFuture.supplyAsync(() -> {
        UserService.getUserDetails(userId);
    });    
}

CompletableFuture<Double> getCreditRating(User user) {
    return CompletableFuture.supplyAsync(() -> {
        CreditRatingService.getCreditRating(user);
    });
}

如今让咱们弄明白当使用了thenApply()后是否会达到咱们指望的结果-

CompletableFuture<CompletableFuture<Double>> result = getUserDetail(userId)
.thenApply(user -> getCreditRating(user));

在更早的示例中,Supplier函数传入thenApply将返回一个简单的值,可是在本例中,将返回一个CompletableFuture。以上示例的最终结果是一个嵌套的CompletableFuture。
若是你想获取最终的结果给最顶层future,使用 thenCompose()方法代替-

CompletableFuture<Double> result = getUserDetail(userId)
.thenCompose(user -> getCreditRating(user));

所以,规则就是-若是你的回调函数返回一个CompletableFuture,可是你想从CompletableFuture链中获取一个直接合并后的结果,这时候你可使用thenCompose()

2. 使用thenCombine()组合两个独立的 future
虽然thenCompose()被用于当一个future依赖另一个future的时候用来组合两个future。thenCombine()被用来当两个独立的Future都完成的时候,用来作一些事情。

System.out.println("Retrieving weight.");
CompletableFuture<Double> weightInKgFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return 65.0;
});

System.out.println("Retrieving height.");
CompletableFuture<Double> heightInCmFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return 177.8;
});

System.out.println("Calculating BMI.");
CompletableFuture<Double> combinedFuture = weightInKgFuture
        .thenCombine(heightInCmFuture, (weightInKg, heightInCm) -> {
    Double heightInMeter = heightInCm/100;
    return weightInKg/(heightInMeter*heightInMeter);
});

System.out.println("Your BMI is - " + combinedFuture.get());

当两个Future都完成的时候,传给``thenCombine()的回调函数将被调用。

组合多个CompletableFuture

咱们使用thenCompose() thenCombine()把两个CompletableFuture组合在一块儿。如今若是你想组合任意数量的CompletableFuture,应该怎么作?咱们可使用如下两个方法组合任意数量的CompletableFuture。

static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

1. CompletableFuture.allOf()
CompletableFuture.allOf的使用场景是当你一个列表的独立future,而且你想在它们都完成后并行的作一些事情。

假设你想下载一个网站的100个不一样的页面。你能够串行的作这个操做,可是这很是消耗时间。所以你想写一个函数,传入一个页面连接,返回一个CompletableFuture,异步的下载页面内容。

CompletableFuture<String> downloadWebPage(String pageLink) {
    return CompletableFuture.supplyAsync(() -> {
        // Code to download and return the web page's content
    });
}

如今,当全部的页面已经下载完毕,你想计算包含关键字CompletableFuture页面的数量。可使用CompletableFuture.allOf()达成目的。

List<String> webPageLinks = Arrays.asList(...)    // A list of 100 web page links

// Download contents of all the web pages asynchronously
List<CompletableFuture<String>> pageContentFutures = webPageLinks.stream()
        .map(webPageLink -> downloadWebPage(webPageLink))
        .collect(Collectors.toList());


// Create a combined Future using allOf()
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
        pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()])
);

使用CompletableFuture.allOf()的问题是它返回CompletableFuture<Void>。可是咱们能够经过写一些额外的代码来获取全部封装的CompletableFuture结果。

// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list -
CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v -> {
   return pageContentFutures.stream()
           .map(pageContentFuture -> pageContentFuture.join())
           .collect(Collectors.toList());
});

花一些时间理解下以上代码片断。当全部future完成的时候,咱们调用了future.join(),所以咱们不会在任何地方阻塞。

join()方法和get()方法很是相似,这惟一不一样的地方是若是最顶层的CompletableFuture完成的时候发生了异常,它会抛出一个未经检查的异常。

如今让咱们计算包含关键字页面的数量。

// Count the number of web pages having the "CompletableFuture" keyword.
CompletableFuture<Long> countFuture = allPageContentsFuture.thenApply(pageContents -> {
    return pageContents.stream()
            .filter(pageContent -> pageContent.contains("CompletableFuture"))
            .count();
});

System.out.println("Number of Web Pages having CompletableFuture keyword - " + 
        countFuture.get());

2. CompletableFuture.anyOf()

CompletableFuture.anyOf()和其名字介绍的同样,当任何一个CompletableFuture完成的时候【相同的结果类型】,返回一个新的CompletableFuture。如下示例:

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 1";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 2";
});

CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(3);
    } catch (InterruptedException e) {
       throw new IllegalStateException(e);
    }
    return "Result of Future 3";
});

CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(future1, future2, future3);

System.out.println(anyOfFuture.get()); // Result of Future 2

在以上示例中,当三个中的任何一个CompletableFuture完成, anyOfFuture就会完成。由于future2的休眠时间最少,所以她最早完成,最终的结果将是future2的结果。

CompletableFuture.anyOf()传入一个Future可变参数,返回CompletableFuture<Object>。CompletableFuture.anyOf()的问题是若是你的CompletableFuture返回的结果是不一样类型的,这时候你讲会不知道你最终CompletableFuture是什么类型。

CompletableFuture 异常处理

咱们探寻了怎样建立CompletableFuture,转换它们,并组合多个CompletableFuture。如今让咱们弄明白当发生错误的时候咱们应该怎么作。

首先让咱们明白在一个回调链中错误是怎么传递的。思考下如下回调链:

CompletableFuture.supplyAsync(() -> {
    // Code which might throw an exception
    return "Some result";
}).thenApply(result -> {
    return "processed result";
}).thenApply(result -> {
    return "result after further processing";
}).thenAccept(result -> {
    // do something with the final result
});

若是在原始的supplyAsync()任务中发生一个错误,这时候没有任何thenApply会被调用而且future将以一个异常结束。若是在第一个thenApply发生错误,这时候第二个和第三个将不会被调用,一样的,future将以异常结束。

1. 使用 exceptionally() 回调处理异常
exceptionally()回调给你一个从原始Future中生成的错误恢复的机会。你能够在这里记录这个异常并返回一个默认值。

Integer age = -1;

CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
    if(age < 0) {
        throw new IllegalArgumentException("Age can not be negative");
    }
    if(age > 18) {
        return "Adult";
    } else {
        return "Child";
    }
}).exceptionally(ex -> {
    System.out.println("Oops! We have an exception - " + ex.getMessage());
    return "Unknown!";
});

System.out.println("Maturity : " + maturityFuture.get());

2. 使用 handle() 方法处理异常
API提供了一个更通用的方法 - handle()从异常恢复,不管一个异常是否发生它都会被调用。

Integer age = -1;

CompletableFuture<String> maturityFuture = CompletableFuture.supplyAsync(() -> {
    if(age < 0) {
        throw new IllegalArgumentException("Age can not be negative");
    }
    if(age > 18) {
        return "Adult";
    } else {
        return "Child";
    }
}).handle((res, ex) -> {
    if(ex != null) {
        System.out.println("Oops! We have an exception - " + ex.getMessage());
        return "Unknown!";
    }
    return res;
});

System.out.println("Maturity : " + maturityFuture.get());

若是异常发生,res参数将是 null,不然,ex将是 null。

相关文章
相关标签/搜索