强大的CompletableFuture

引子

为了让程序更加高效,让CPU最大效率的工做,咱们会采用异步编程。首先想到的是开启一个新的线程去作某项工做。再进一步,为了让新线程能够返回一个值,告诉主线程事情作完了,因而乎Future粉墨登场。然而Future提供的方式是主线程主动问询新线程,要是有个回调函数就爽了。因此,为了知足Future的某些遗憾,强大的CompletableFuture随着Java8一块儿来了。算法

Future

传统多线程的却让程序更加高效,毕竟是异步,可让CPU充分工做,但这仅限于新开的线程无需你的主线程再费心了。好比你开启的新线程仅仅是为了计算1+...+n再打印结果。有时候你须要子线程返回计算结果,在主线程中进行进一步计算,就须要Future了。编程

看下面这个例子,主线程计算2+4+6+8+10;子线程计算1+3+5+7+9;最后须要在主线程中将两部分结果再相加。bash

public class OddNumber implements Callable<Integer> {
    @Override
    public Integer call() throws Exception {
        Thread.sleep(3000);
        int result = 1 + 3 + 5 + 7 + 9;
        return result;
    }
}
复制代码
public class FutureTest {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        OddNumber oddNumber = new OddNumber();
        Future<Integer> future = executor.submit(oddNumber);
        long startTime = System.currentTimeMillis();
        int evenNumber = 2 + 4 + 6 + 8 + 10;
        try {
            Thread.sleep(1000);
            System.out.println("0.开始了:"+ (System.currentTimeMillis()-startTime) +"秒");
            int oddNumberResult = future.get();//这时间会被阻塞
            System.out.println("1+2+...+9+10="+(evenNumber+oddNumberResult));
            System.out.println("1.开始了:"+ (System.currentTimeMillis()-startTime) +"秒");
        } catch (Exception e) {
            System.out.println(e);
        }
    }
}
输出结果:
0.开始了:1001秒
1+2+...+9+10=55
1.开始了:3002秒
复制代码

看一下Future接口,只有五个方法比较简单网络

//取消任务,若是已经完成或者已经取消,就返回失败
boolean cancel(boolean mayInterruptIfRunning);
//查看任务是否取消
boolean isCancelled();
//查看任务是否完成
boolean isDone();
//刚才用到了,查看结果,任务未完成就一直阻塞
V get() throws InterruptedException, ExecutionException;
//同上,可是加了一个过时时间,防止长时间阻塞,主线程也作不了事情
V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException;
复制代码

CompletableFuture

上面的看到Future的五个方法,不是很丰富,既然咱们的主线程叫作main,就应该以我为主,我更但愿子线程作完了事情主动通知我。为此,Java8带来了CompletableFuture,一个Future的实现类。其实CompletableFuture最迷人的地方并非极大丰富了Future的功能,而是完美结合了Java8流的新特性。多线程

实现回调,自动后续操做

提早说一下CompletableFuture实现回调的方法(之一):thenAccept()框架

public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
        return uniAcceptStage(null, action);
    }
复制代码

参数有个Consumer,用到了Java8新特性,行为参数化,就是参数不必定是基本类型或者类,也可以使是函数(行为),或者说一个方法(接口)。异步

public class OddNumberPlus implements Supplier<Integer> {
    @Override
    public Integer get() {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 1+3+5+7+9;
    }
}
复制代码
public class CompletableFutureTest {
    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();
        final int evenNumber = 2 + 4 + 6 + 8 + 10;
        CompletableFuture<Integer> oddNumber = CompletableFuture.supplyAsync(new OddNumberPlus());
        try {
            Thread.sleep(1000);
            System.out.println("0.开始了:"+ (System.currentTimeMillis()-startTime) +"秒");
            //看这里,实现回调
            oddNumber.thenAccept(oddNumberResult->
                        {
                            System.out.println("1.开始了:"+ (System.currentTimeMillis()-startTime) +"秒");
                            System.out.println("此时计算结果为:"+(evenNumber+oddNumberResult));
                        });
            oddNumber.get();
        } catch (Exception e) {
            System.out.println(e);
        }
    }
}
输出结果:
0.开始了:1006秒
1.开始了:3006秒
此时计算结果为:55
复制代码

值得一提的是,本例中并无显示的建立任务链接池,程序会默认选择一个任务链接池ForkJoinPool.commonPool()async

private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
复制代码

ForkJoinPool始自JDK7,叫作分支/合并框架。能够经过将一个任务递归分红不少分子任务,造成不一样的流,进行并行执行,同时还伴随着强大的工做窃取算法。极大的提升效率。固然,你也能够本身指定链接池。ide

CompletableFuture合并

Java8的确丰富了Future实现,CompletableFuture有不少方法可供你们使用,可是但从上面的例子来看,其实CompletableFuture能作的功能,貌似Future。毕竟你CompletableFuture用get()这个方法的时候还不是阻塞了,我Future蛮能够本身拿到返回值,再手动执行一些操做嘛(虽然说这样main方法必定很不爽)。那么接下来的事情,Future作起来就十分麻烦了。假设咱们main方法只作奇数合集加上偶数合集这一个操做,提早算这两个合集的操做异步交给两个子线程,咱们须要怎么作呢?没错,开启两个线程,等到两个线程都计算结束的时候,咱们进行最后的相加,问题在于,你怎么知道那个子线程最后结束的呢?(貌似能够作个轮询,不定的调用isDone()这个方法...)丰富的CompletableFuture功能为咱们提供了一个方法,用于等待两个子线程都结束了,再进行相加操做:异步编程

//asyncPool就是上面提到的默认线程池ForkJoinPool
    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(asyncPool, other, fn);
    }
复制代码

看个例子:

public class OddCombine implements Supplier<Integer> {
    @Override
    public Integer get() {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 1+3+5+7+9;
    }
}
复制代码
public class EvenCombine implements Supplier<Integer> {
    @Override
    public Integer get() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 2+4+6+8+10;
    }
}

复制代码
public class CompletableCombineTest {
    public static void main(String[] args) throws Exception{
        CompletableFuture<Integer> oddNumber = CompletableFuture.supplyAsync(new OddCombine());
        CompletableFuture<Integer> evenNumber = CompletableFuture.supplyAsync(new EvenCombine());
        long startTime = System.currentTimeMillis();
        CompletableFuture<Integer> resultFuturn = oddNumber.thenCombine(evenNumber,(odd,even)->{
            return odd + even;
        });
        System.out.println(resultFuturn.get());
        System.out.println("0.开始了:"+ (System.currentTimeMillis()-startTime) +"秒");
    }
}
输出结果:
55
0.开始了:3000秒
复制代码

这边模拟一个睡1秒,一个睡3秒,可是真正的网络请求时间是不定的。是否是很爽,最爽的还不是现象,而是以上操做已经利用了Java8流的概念。

两个子线程还不够,那么还有**anyOff()**函数,能够承受多个CompletableFuture,会等待全部任务都完成。

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
        return andTree(cfs, 0, cfs.length - 1);
    }
复制代码

与它长的很像的,有个方法,是当第一个执行结束的时候,就结束,后面任务再也不等了,能够看做充分条件。

public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
        return orTree(cfs, 0, cfs.length - 1);
    }
复制代码

在上面那个例子的基础上,把OddNumberPlus类时间调长一点:

public class OddNumberPlus implements Supplier<Integer> {
    @Override
    public Integer get() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 1+3+5+7+9;
    }
}
复制代码
public class CompletableCombineTest {
    public static void main(String[] args) throws Exception{
        CompletableFuture<Integer> oddNumber = CompletableFuture.supplyAsync(new OddCombine());
        CompletableFuture<Integer> evenNumber = CompletableFuture.supplyAsync(new EvenCombine());
        CompletableFuture<Integer> testNumber = CompletableFuture.supplyAsync(new OddNumberPlus());
        long startTime = System.currentTimeMillis();
        CompletableFuture<Object> resultFuturn = CompletableFuture.anyOf(oddNumber,evenNumber,testNumber);
        System.out.println(resultFuturn.get());
        System.out.println("0.开始了:"+ (System.currentTimeMillis()-startTime) +"秒");
    }
}
输出结果:
30
0.开始了:1000秒
复制代码

小结

CompletableFuture的方法其实还有不少,经常使用的好比说runAsync(),相似于supplyAsync(),只是没有返回值;除了thenApply()能够加回调函数之外,还有thenApply();还有注入runAfterBoth()、runAfterEither(),这些见名知意。还有不少,能够点开CompletableFuture这个类的源码仔细看一看。见微知著,透过CompletableFuture,更加感受到Java8的强大,强大的流概念、行为参数化、高效的并行理念等等,不只让Java写起来更爽,还不断丰富Java整个生态。Java一直在进步,因此没有被时代淘汰,咱们Javaer也能够继续职业生涯,感谢Java,一块儿进步。

相关文章
相关标签/搜索