前言
如今大部分的CPU都是多核,咱们都知道想要提高咱们应用程序的运行效率,就必须得充分利用多核CPU的计算能力;Java早已经为咱们提供了多线程的API,可是实现方式略微麻烦,今天咱们就来看看Java8在这方面提供的改善。服务器
假设场景
如今你须要为在线教育平台提供一个查询用户详情的API,该接口须要返回用户的基本信息,标签信息,这两个信息存放在不一样位置,须要远程调用来获取这两个信息;为了模拟远程调用,咱们须要在代码里面延迟 1s;网络
public interface RemoteLoader { String load(); default void delay() { try { Thread.sleep(1000L); } catch (InterruptedException e) { e.printStackTrace(); } } } public class CustomerInfoService implements RemoteLoader { public String load() { this.delay(); return "基本信息"; } } public class LearnRecordService implements RemoteLoader { public String load() { this.delay(); return "学习信息"; } }
同步方式实现版本
若是咱们采用同步的方式来完成这个API接口,咱们的实现代码:多线程
@Test public void testSync() { long start = System.currentTimeMillis(); List<remoteloader> remoteLoaders = Arrays.asList(new CustomerInfoService(), new LearnRecordService()); List<string> customerDetail = remoteLoaders.stream().map(RemoteLoader::load).collect(toList()); System.out.println(customerDetail); long end = System.currentTimeMillis(); System.out.println("总共花费时间:" + (end - start)); }
不出所料,由于调用的两个接口都是延迟了 1s ,因此结果大于2秒 异步
Future实现的版本
接下来咱们把这个例子用Java7提供的Future
来实现异步的版本,看下效果如何呢?代码以下:ide
@Test public void testFuture() { long start = System.currentTimeMillis(); ExecutorService executorService = Executors.newFixedThreadPool(2); List<remoteloader> remoteLoaders = Arrays.asList(new CustomerInfoService(), new LearnRecordService()); List<future<string>> futures = remoteLoaders.stream() .map(remoteLoader -> executorService.submit(remoteLoader::load)) .collect(toList()); List<string> customerDetail = futures.stream() .map(future -> { try { return future.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } return null; }) .filter(Objects::nonNull) .collect(toList()); System.out.println(customerDetail); long end = System.currentTimeMillis(); System.out.println("总共花费时间:" + (end - start)); }
此次咱们采用多线程的方式来改造了咱们这个例子,结果仍是比较满意的,时间大概花费了1s多一点 学习
> 注意:这里我分红了两个Stream,如何合在一块儿用同一个Stream,那么在用future.get()
的时候会致使阻塞,至关于提交一个任务执行完后才提交下一个任务,这样达不到异步的效果优化
这里咱们能够看到虽然Future
达到了咱们预期的效果,可是若是须要实现将两个异步的结果进行合并处理就稍微麻一些,这里就不细说,后面主要看下CompletableFuture
在这方面的改进this
Java8并行流
以上咱们用的是Java8以前提供的方法来实现,接下来咱们来看下Java8中提供的并行流来实习咱们这个例子效果怎样呢?线程
@Test public void testParallelStream() { long start = System.currentTimeMillis(); List<remoteloader> remoteLoaders = Arrays.asList(new CustomerInfoService(), new LearnRecordService()); List<string> customerDetail = remoteLoaders.parallelStream().map(RemoteLoader::load).collect(toList()); System.out.println(customerDetail); long end = System.currentTimeMillis(); System.out.println("总共花费时间:" + (end - start)); }
运行的结果仍是至关的满意,花费时间 1s 多点 code
和Java8以前的实现对比,咱们发现整个代码会更加的简洁;
接下来咱们把咱们的例子改变一下,查询用户详情的接口还须要返回视频观看记录,用户的标签信息,购买订单
public class WatchRecordService implements RemoteLoader { @Override public String load() { this.delay(); return "观看记录"; } } public class OrderService implements RemoteLoader { @Override public String load() { this.delay(); return "订单信息"; } } public class LabelService implements RemoteLoader { @Override public String load() { this.delay(); return "标签信息"; } }
咱们继续使用Java8提供的并行流来实现,看下运行的结果是否理想
@Test public void testParallelStream2() { long start = System.currentTimeMillis(); List<remoteloader> remoteLoaders = Arrays.asList( new CustomerInfoService(), new LearnRecordService(), new LabelService(), new OrderService(), new WatchRecordService()); List<string> customerDetail = remoteLoaders.parallelStream().map(RemoteLoader::load).collect(toList()); System.out.println(customerDetail); long end = System.currentTimeMillis(); System.out.println("总共花费时间:" + (end - start)); }
可是此次运行的结果不是太理想,花费时间超过了2秒
CompletableFuture
基本的用法
@Test public void testCompletableFuture() { CompletableFuture<string> future = new CompletableFuture<>(); new Thread(() -> { doSomething(); future.complete("Finish"); //任务执行完成后 设置返回的结果 }).start(); System.out.println(future.join()); //获取任务线程返回的结果 } private void doSomething() { System.out.println("doSomething..."); }
这种用法还有个问题,就是任务出现了异常,主线程会无感知,任务线程不会把异常给抛出来;这会致使主线程会一直等待,一般咱们也须要知道出现了什么异常,作出对应的响应;改进的方式是在任务中try-catch全部的异常,而后调用future.completeExceptionally(e)
,代码以下:
@Test public void testCompletableFuture() throws ExecutionException, InterruptedException { CompletableFuture<string> future = new CompletableFuture<>(); new Thread(() -> { try { doSomething(); future.complete("Finish"); } catch (Exception e) { future.completeExceptionally(e); } }).start(); System.out.println(future.get()); } private void doSomething() { System.out.println("doSomething..."); throw new RuntimeException("Test Exception"); }
从如今来看CompletableFuture
的使用过程须要处理的事情不少,不太简洁,你会以为看起来很麻烦;可是这只是表象,Java8其实对这个过程进行了封装,提供了不少简洁的操做方式;接下来咱们看下如何改造上面的代码
@Test public void testCompletableFuture2() throws ExecutionException, InterruptedException { CompletableFuture<string> future = CompletableFuture.supplyAsync(() -> { doSomething(); return "Finish"; }); System.out.println(future.get()); }
这里咱们采用了supplyAsync
,这下看起来简洁了许多,世界都明亮了; Java8不只提供容许任务返回结果的supplyAsync
,还提供了没有返回值的runAsync
;让咱们能够更加的关注业务的开发,不须要处理异常错误的管理
CompletableFuture异常处理
若是说主线程须要关心任务到底发生了什么异常,须要对其做出相应操做,这个时候就须要用到exceptionally
@Test public void testCompletableFuture2() throws ExecutionException, InterruptedException { CompletableFuture<string> future = CompletableFuture .supplyAsync(() -> { doSomething(); return "Finish"; }) .exceptionally(throwable -> "Throwable exception message:" + throwable.getMessage()); System.out.println(future.get()); }
使用CompletableFuture来完成咱们查询用户详情的API接口
@Test public void testCompletableFuture3() throws ExecutionException, InterruptedException { long start = System.currentTimeMillis(); List<remoteloader> remoteLoaders = Arrays.asList( new CustomerInfoService(), new LearnRecordService(), new LabelService(), new OrderService(), new WatchRecordService()); List<completablefuture<string>> completableFutures = remoteLoaders .stream() .map(loader -> CompletableFuture.supplyAsync(loader::load)) .collect(toList()); List<string> customerDetail = completableFutures .stream() .map(CompletableFuture::join) .collect(toList()); System.out.println(customerDetail); long end = System.currentTimeMillis(); System.out.println("总共花费时间:" + (end - start)); }
这里依然是采用的两个Stream来完成的,执行的结果以下:
这个结果不太满意,和并行流的结果差很少,消耗时间 2秒多点;在这种场景下咱们用CompletableFuture
作了这么多工做,可是效果不理想,难道就有没有其余的方式可让它在快一点吗?
为了解决这个问题,咱们必须深刻了解下并行流和CompletableFuture
的实现原理,它们底层使用的线程池的大小都是CPU的核数Runtime.getRuntime().availableProcessors()
;那么咱们来尝试一下修改线程池的大小,看看效果如何?
自定义线程池,优化CompletableFuture
使用并行流没法自定义线程池,可是CompletableFuture
能够
@Test public void testCompletableFuture4() throws ExecutionException, InterruptedException { long start = System.currentTimeMillis(); List<remoteloader> remoteLoaders = Arrays.asList( new CustomerInfoService(), new LearnRecordService(), new LabelService(), new OrderService(), new WatchRecordService()); ExecutorService executorService = Executors.newFixedThreadPool(Math.min(remoteLoaders.size(), 50)); List<completablefuture<string>> completableFutures = remoteLoaders .stream() .map(loader -> CompletableFuture.supplyAsync(loader::load, executorService)) .collect(toList()); List<string> customerDetail = completableFutures .stream() .map(CompletableFuture::join) .collect(toList()); System.out.println(customerDetail); long end = System.currentTimeMillis(); System.out.println("总共花费时间:" + (end - start)); }
咱们使用自定义线程池,设置最大的线程池数量50,来看下执行的结果
这下执行的结果比较满意了,1秒多点;理论上来讲这个结果能够一直持续,直到达到线程池的大小50
并行流和CompletableFuture
二者该如何选择
这二者如何选择主要看任务类型,建议
- 若是你的任务是计算密集型的,而且没有I/O操做的话,那么推荐你选择Stream的并行流,实现简单并行效率也是最高的
- 若是你的任务是有频繁的I/O或者网络链接等操做,那么推荐使用
CompletableFuture
,采用自定义线程池的方式,根据服务器的状况设置线程池的大小,尽量的让CPU忙碌起来
CompletableFuture
的其余经常使用方法
- thenApply、thenApplyAsync: 假如任务执行完成后,还须要后续的操做,好比返回结果的解析等等;能够经过这两个方法来完成
- thenCompose、thenComposeAsync: 容许你对两个异步操做进行流水线的操做,当第一个操做完成后,将其结果传入到第二个操做中
- thenCombine、thenCombineAsync:容许你把两个异步的操做整合;好比把第一个和第二个操做返回的结果作字符串的链接操做
总结
- Java8并行流的使用方式
- CompletableFuture的使用方式、异常处理机制,让咱们有机会管理任务执行中发送的异常
- Java8并行流和
CompletableFuture
二者该如何选择 CompletableFuture
的经常使用方法
原创不易 转载请注明出处:https://silently9527.cn/archives/48