组合式异步编程java
package com.example.demo.future; import org.junit.jupiter.api.Test; import java.util.List; import java.util.UUID; import java.util.concurrent.*; import java.util.stream.Collectors; import java.util.stream.IntStream; public class FutureTest { /** * 运程数据 * * @return */ private static String data() { try { Thread.sleep(1020); //拟处理远程处理时间 } catch (InterruptedException e) { e.printStackTrace(); } return String.format("{id:%s,orderNo:'%s'}", Math.round(Math.random() * 10000), UUID.randomUUID().toString()); } /** * 获取远程的订单 * * @return */ Future<String> getRemoteOrder() { ExecutorService executorService = Executors.newCachedThreadPool(); return executorService.submit(FutureTest::data); } /** * 第二种实现 * * @return */ Future<String> getRemoteOrderII() { CompletableFuture<String> future = new CompletableFuture<>(); new Thread(() -> { try { future.complete(data()); //正常获取值 } catch (Exception e) { future.completeExceptionally(e); //以异常抛出结束 } }).start(); return future; } /** * 第三种实现 * 这种方法彻底等价于第二种 ,低层会交由 ForkJoinPool 去执行。你也能够使用本身的 Executor * * @return */ Future<String> getRemoteOrderIII() { return CompletableFuture.supplyAsync(FutureTest::data); } @Test public void test1() throws InterruptedException, ExecutionException, TimeoutException { // 传统方法 Future<String> userDataFuture = getRemoteOrder(); userDataFuture.isDone(); //是否请求完 // orderFuture.get(); //禁止这样写,防止线程非正常结束后,这里将会无限阻塞 String i = userDataFuture.get(10, TimeUnit.SECONDS); //最多阻塞10秒,就算未完成,也返回结果 //CompletableFuture 方法 Future<String> orderFuture = getRemoteOrderII(); orderFuture.get(5, TimeUnit.SECONDS); //最多阻塞10秒,就算未完成,也返回结果 } /** * 并发测试1 */ @Test public void test2() { //顺序进行 long beginTime = System.currentTimeMillis(); List<String> orders = IntStream.range(1, 5) .mapToObj(i -> data()) .collect(Collectors.toList()); long endTime = System.currentTimeMillis(); System.out.println("串行发费时间 :" + (endTime - beginTime) + " ms"); //4090 ms //并行 beginTime = System.currentTimeMillis(); orders = IntStream.range(1, 5) .parallel() .mapToObj(i -> data()) .collect(Collectors.toList()); endTime = System.currentTimeMillis(); System.out.println("并行发费时间 :" + (endTime - beginTime) + " ms"); //1027 ms //并行,第二种方式 beginTime = System.currentTimeMillis(); List<CompletableFuture<String>> futures = IntStream.range(1, 5) .mapToObj(i -> CompletableFuture.supplyAsync(FutureTest::data)) .collect(Collectors.toList()); orders = futures.stream().map(CompletableFuture::join) //CompletableFuture.join 与 CompletableFuture.get 同样,只是join不会抛出任何检测到的异常 .collect(Collectors.toList()); endTime = System.currentTimeMillis(); System.out.println("并行II发费时间 :" + (endTime - beginTime) + " ms"); //1023 ms } }
1.并行第二种式第一种都是使用ForkJoin,因此速度都差很少。但第二种能够设置你本身的线程执行器,改变线程池的数量 2.线程池大小与处理器的利用率之比能够使用下面的公式进行估算: N(threads) = N(cpu) * U(cpu) * (1+W/C) N(cpu): 是处理器的核的数目,能够 经过Runtime.getRuntime().availableProcessors() 获得 U(cpu): 是指望的cpu利用率(该值应该介于 0 - 1之间) W/C : 是等待时间 与计算时间的比率