java 8 CompletableFuture (1)

组合式异步编程java

 

package com.example.demo.future;

import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class FutureTest {

    /**
     * 运程数据
     *
     * @return
     */
    private static String data() {
        try {

            Thread.sleep(1020); //拟处理远程处理时间
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return String.format("{id:%s,orderNo:'%s'}", 
               Math.round(Math.random() * 10000), UUID.randomUUID().toString());
    }

    /**
     * 获取远程的订单
     *
     * @return
     */
    Future<String> getRemoteOrder() {
        ExecutorService executorService = Executors.newCachedThreadPool();
        return executorService.submit(FutureTest::data);
    }

    /**
     * 第二种实现
     *
     * @return
     */
    Future<String> getRemoteOrderII() {
        CompletableFuture<String> future = new CompletableFuture<>();
        new Thread(() -> {
            try {
                future.complete(data());  //正常获取值
            } catch (Exception e) {
                future.completeExceptionally(e);    //以异常抛出结束
            }
        }).start();
        return future;
    }

    /**
     * 第三种实现
     * 这种方法彻底等价于第二种 ,低层会交由 ForkJoinPool 去执行。你也能够使用本身的 Executor
     *
     * @return
     */
    Future<String> getRemoteOrderIII() {
        return CompletableFuture.supplyAsync(FutureTest::data);
    }

    @Test
    public void test1() throws InterruptedException, ExecutionException, TimeoutException {

        // 传统方法
        Future<String> userDataFuture = getRemoteOrder();
        userDataFuture.isDone();  //是否请求完
        //   orderFuture.get();   //禁止这样写,防止线程非正常结束后,这里将会无限阻塞
        String i = userDataFuture.get(10, TimeUnit.SECONDS); //最多阻塞10秒,就算未完成,也返回结果


        //CompletableFuture 方法
        Future<String> orderFuture = getRemoteOrderII();
        orderFuture.get(5, TimeUnit.SECONDS);  //最多阻塞10秒,就算未完成,也返回结果


    }

    /**
     * 并发测试1
     */
    @Test
    public void test2() {
        //顺序进行
        long beginTime = System.currentTimeMillis();

        List<String> orders = IntStream.range(1, 5)
                .mapToObj(i -> data())
                .collect(Collectors.toList());

        long endTime = System.currentTimeMillis();

        System.out.println("串行发费时间 :" + (endTime - beginTime) + " ms"); //4090 ms

        //并行
        beginTime = System.currentTimeMillis();

        orders = IntStream.range(1, 5)
                .parallel()
                .mapToObj(i -> data())
                .collect(Collectors.toList());

        endTime = System.currentTimeMillis();

        System.out.println("并行发费时间 :" + (endTime - beginTime) + " ms"); //1027 ms


        //并行,第二种方式
        beginTime = System.currentTimeMillis();

        List<CompletableFuture<String>> futures = IntStream.range(1, 5)
                .mapToObj(i -> CompletableFuture.supplyAsync(FutureTest::data))
                .collect(Collectors.toList());

        orders = futures.stream().map(CompletableFuture::join) 
 //CompletableFuture.join 与 CompletableFuture.get 同样,只是join不会抛出任何检测到的异常
                .collect(Collectors.toList());

        endTime = System.currentTimeMillis();


        System.out.println("并行II发费时间 :" + (endTime - beginTime) + " ms"); //1023 ms
    }

   

}

 

1.并行第二种式第一种都是使用ForkJoin,因此速度都差很少。但第二种能够设置你本身的线程执行器,改变线程池的数量
  2.线程池大小与处理器的利用率之比能够使用下面的公式进行估算:
  N(threads) = N(cpu) * U(cpu) * (1+W/C)
   N(cpu): 是处理器的核的数目,能够 经过Runtime.getRuntime().availableProcessors() 获得
   U(cpu):  是指望的cpu利用率(该值应该介于 0 - 1之间)
   W/C :    是等待时间 与计算时间的比率
相关文章
相关标签/搜索