微言异步回调

前言html

回调,顾名思义,回过头来调用,详细的说来就是用户无需关心内部实现的具体逻辑,只须要在暴露出的回调函数中放入本身的业务逻辑便可。因为回调机制解耦了框架代码和业务代码,因此能够看作是对面向对象解耦的具体实践之一。因为本文的侧重点在于讲解后端回调,因此对于前端回调甚至于相似JSONP的回调函数类的,利用本章讲解的知识进行代入的时候,请斟酌一二,毕竟后端和前端仍是有必定的区别,所谓差之毫厘,可能谬以千里,慎之。因此本章对回调的讲解侧重于后端,请知悉。前端

回调定义java

说到回调,其实个人理解相似于函数指针的功能,怎么讲呢?由于一个方法,一旦附加了回调入参,那么用户在进行调用的时候,这个回调入参是能够用匿名方法直接替代的。回调的使用必须和方法的签名保持一致性,下面咱们来看一个JDK实现的例子:git

    default boolean removeIf(Predicate<? super E> filter) {
        Objects.requireNonNull(filter);
        boolean removed = false;
        final Iterator<E> each = iterator();
        while (each.hasNext()) {
            if (filter.test(each.next())) {
                each.remove();
                removed = true;
            }
        }
        return removed;
    }

在JDK中,List结构有一个removeIf的方法,其实现方式如上所示。因为附带了具体的注释讲解,我这里就再也不进行过多的讲述。咱们须要着重关注的是其入参:Predicate,由于他就是一个函数式接口,入参为泛型E,出参为boolean,其实和Function<? super E, boolean>是等价的。因为List是一个公共的框架代码,里面不可能糅合业务代码,因此为了解耦框架代码和业务代码,JDK使用了内置的各类函数式接口做为方法的回调,将具体的业务实践抛出去,让用户本身实现,而它本身只接受用户返回的结果就好了:只要用户处理返回true(filter.test(each.next()返回true),那么我就删掉当前遍历的数据;若是用户处理返回false(filter.test(each.next()返回false),那么我就保留当前遍历的数据。是否是很是的nice?github

其实这种完美的协做关系,在JDK类库中随处可见,在其余常常用到的框架中也很常见,诸如Guava,Netty,实在是太多了(这也从侧面说明,利用函数式接口解耦框架和业务,是正确的作法),我撷取了部分片断以下:redis

//将map中的全部entry进行替换
void replaceAll(BiFunction<? super K, ? super V, ? extends V> function)
//将map中的entry进行遍历
void forEach(BiConsumer<? super K, ? super V> action)
//map中的entry值若是有,则用新值从新创建映射关系
V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction)

//Deque遍历元素
void forEach(Consumer<? super T> action)
//Deque按给定条件移除元素
boolean removeIf(Predicate<? super E> filter)

//Guava中获取特定元素
<T> T get(Object key, final Callable<T> valueLoader) 

//Netty中设置监听
ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener)

那么,回过头来想一想,若是咱们封装本身的组件,想要封装的很JDK Style,该怎么作呢?若是上来直接理解Predicate,Function,Callable,Consumer,我想不少人是有困难的,那就听我慢慢道来吧。编程

咱们先假设以下一段代码,这段代码我相信不少人会很熟悉,不少人也封装过,这就是咱们的大名鼎鼎的RedisUtils封装类:后端

  /**
     * key递增特定的num
     * @param key Redis中保存的key
     * @param num 递增的值
     * @return key计算完毕后返回的结果值
     */
    public Long incrBy(String key,long num) {
        CallerInfo callerInfo = Profiler.registerInfo("sendCouponService.redis.incrBy", "sendCouponService", false, true);
        Long result;
        try {
            result = jrc.incrBy(key, num);
        } catch (Exception e) {
            logger.error("incrBy", null, "sendCouponService.redis.incrBy异常,key={},value={}", e, key, num);
            Profiler.functionError(callerInfo);
            throw new RuntimeException("sendCouponService.redis.incrBy异常,key=" + key, e);
        } finally {
            Profiler.registerInfoEnd(callerInfo);
        }
        return result;
    }

上面这段代码只是一个示例,其实还有上百个方法基本上都是这种封装结构,这种封装有问题吗?没问题!并且封装方式辛辣老道,一看就是高手所为,由于既加了监控,又作了异常处理,并且还有错误日志记录,一旦发生问题,咱们可以第一时间知道哪里出了问题,哪一个方法出了问题,而后设定对应的应对方法。api

这种封装方式,若是当作普通的Util来用,彻底没有问题,可是若是想封装成组件,则欠缺点什么,我列举以下:数据结构

1. 当前代码写死了用jrc操做,若是后期切换到jimdb,是否是还得为jimdb专门写一套呢?

2. 当前代码,上百个方法,其实不少地方都是重复的,惟有redis操做那块不一样,代码重复度特别高,一旦扩展新方法,基本上是剖解原有代码,而后拷贝现有方法,最后改为新方法。

3. 当前方法,包含的都是redis单操做,若是遇到那种涉及到多个操做组合的(好比先set,而后expire或者更复杂一点),须要添加新方法,本质上这种新方法其实和业务性有关了。

从上面列出的这几点来看,其实咱们能够彻底将其打形成一个兼容jrc操做和cluster操做,同时具备良好框架扩展性(策略模式+模板模式)和良好代码重复度控制(函数式接口回调)的框架。因为本章涉及内容为异步回调,因此这里咱们将讲解这种代码如何保持良好的代码重复度控制上。至于良好的框架扩展性,若是感兴趣的话,我会在后面的章节进行讲解。那么咱们开始进行优化吧。

首先,找出公共操做部分(白色)和非公共操做部分(黄色):

/**
     * key递增特定的num
     * @param key Redis中保存的key
     * @param num 递增的值
     * @return key计算完毕后返回的结果值
     */
    public Long incrBy(String key,long num) {
        CallerInfo callerInfo = Profiler.registerInfo("sendCouponService.redis.incrBy", "sendCouponService", false, true);
        Long result;
        try {
            result = jrc.incrBy(key, num);
        } catch (Exception e) {
            logger.error("incrBy", null, "sendCouponService.redis.incrBy异常,key={},value={}", e, key, num);
            Profiler.functionError(callerInfo);
            return null;
        } finally {
            Profiler.registerInfoEnd(callerInfo);
        }
        return result;
    }

经过上面的标记,咱们发现非公共操做部分,有两类:

1. ump提示语和日志提示语不一致

2. 操做方法不一致

标记出来了公共操做部分,以后咱们开始封装公共部分:

/**
     * 公共模板抽取
     *
     * @param method
     * @param callable
     * @param <T>
     * @return
     */
    public static <T> T invoke(String method) {
        CallerInfo info = Profiler.registerInfo(method, false, true);
        try {
            //TODO 这里放置不一样的redis操做方法
        } catch (Exception e) {
            logger.error(method, e);
            AlarmUtil.alarm(method + e.getCause());
            reutrn null;
        } finally {
            Profiler.registerInfoEnd(info);
        }
    }

可是这里有个问题,咱们虽然把公共模板抽取出来了,可是TODO标签里面的内容怎么办呢? 如何把不一样的redis操做方法传递进来呢?

其实在java中,咱们能够利用接口的方式,将具体的操做代理出去,由外部调用者来实现,听起来是否是感受又和IOC搭上了点关系,不错,你想的没错,这确实是控制反转依赖注入的一种作法,经过接口方式将具体的实践代理出去,这也是进行回调操做的原理。接下来看咱们的改造:

    /**
     * redis操做接口
     */
    public interface RedisOperation<T>{
        //调用redis方法,入参为空,出参为T泛型
        T invoke();
    }

    /**
     *  redis操做公共模板
     * @param method
     * @param  redisOperation
     * @param <T>
     * @return
     */
    public static <T> T invoke(String method,RedisOperation redisOperation) {
        CallerInfo info = Profiler.registerInfo(method, false, true);
        try {
           return  redisOperation.invoke();
        } catch (Exception e) {
            logger.error(method, e);
            AlarmUtil.alarm(method + e.getCause());
            reutrn null;
        } finally {
            Profiler.registerInfoEnd(info);
        }
    }

这样,咱们就打造好了一个公共的redis操做模板,以后就能够像下面的方式来使用了:

    @Override
    public Long incrby(String key, long val){
        String method = "com.jd.marketing.util.RedisUtil.incrby";
        RedisOperation<Long> process = () -> {
            return redisUtils.incrBy(key, val);
        };
        return CacheHelper.invoke(method, process);
    }

以后的一百多个方法,你也可使用这样的方式来一一进行包装,以后你会发现原来RedisUtils封装完毕,代码写了2000行,可是用这种方式以后,代码只写了1000行,并且后续有新的联合操做过来,你只须要在以下代码段里面直接把级联操做添加进去便可:

 RedisOperation<Long> process = () -> {
           //TODO other methods
           //TODO other methods
            return redisUtils.incrBy(key, val);
};

是否是很方便快捷?在这里我须要因此下的是,因为RedisOperation里面的invoke方法是没有入参,带有一个出参结果的调用。因此在回调这里,我用了匿名表达式来()->{}来match这种操做。可是若是回调这里,一个入参,一个出参的话,那么个人匿名表达式须要这样写 param->{}, 多个入参,那就变成了这样 (param1, param2, param3)->{} 。因为这里并不是重点,我不想过多讲解,若是对这种使用方式不熟悉,能够彻底使用以下的方式来进行书写也行:

 @Override
    public Long incrby(String key, long val){
        String method = "com.jd.marketing.util.RedisUtil.incrby";
        RedisOperation<Long> process = () -> incrByOperation(key, val);
        return CacheHelper.invoke(method, process);
    }
   
   private Long incrByOperation(String key, long val){
       return redisUtils.incrBy(key, val);
   }

其实说到这里的时候,我就有必要提一下开头的埋下的线索了。其实以前演示的Netty的代码:

//Netty中设置监听
ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener)

GenericFutureListener这个接口

就是按照上面的写法来作的,是否是豁然开朗呢?至于其调用方式,也和上面讲解的一致,只要符合接口里面方法的调用标准就行(入参和出参符合就行), 好比 future –> {}。

说到这里,咱们可能认为这样太麻烦了,本身定义接口,而后注入到框架中,最后用户本身实现调用方法,一长串。是的,你说的没错,这样确实太麻烦了,JDK因而专门用了一个 FunctionalInterface的annotation来帮咱们作了,因此在JDK中,若是你看到Consumer,Function,Supplier等,带有@FunctionalInterface标注的接口,那么就说明他是一个函数式接口,而这种接口是干什么的,具体的原理就是我上面讲的。下面咱们来梳理梳理这些接口吧。

先看一下咱们的RedisUtils使用JDK自带的函数式接口的最终封装效果:

image

从图示代码能够看出,总体封装变得简洁许多,并且咱们用了JDK内置的函数式接口,因此也无需写其余多余的代码,看上去很清爽,重复代码基本不见了。并且,因为JDK提供的其余的函数式接口有运算操做,好比Predicate.or, Predicate.and操做等,大大增强了封装的趣味性和乐趣。

下面我将JDK中涉及的经常使用的函数式接口列举一遍,而后来详细讲解讲解吧,列表以下:

Consumer, 提供void accept(T t)回调

Runnable, 提供void run()回调

Callable, 提供V call() throws Exception回调

Supplier, 提供T get()回调

Function, 提供R apply(T t)回调, 有andThen接续操做

Predicate, 提供boolean test(T t)回调, 等价于 Function<T, boolean>

BiConsumer, 提供void accept(T t, U u)回调,注意带Bi的回调接口,代表入参都是双参数,好比BiPredicate    
......


其实还有不少,我这里就不一一列举了。感兴趣的朋友能够在这里找到JDK提供的全部函数式接口

接下来,咱们来说解其使用示范,以便于明白怎么样去使用它。

对于Consumer函数式接口,内部的void accept(T t)回调方法,代表了它只能回调有一个入参,没有返参的方法。示例以下:

/**
     * Consumer调用的例子
     */
    public void ConsumerSample() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("key", "val");
        linkedHashMap.forEach((k, v) -> {
            System.out.println("key" + k + ",val" + v);
        });
    }

对于Callable接口,其实和Supplier接口是同样的,只是有无Exception抛出的区别,示例以下:

 /**
     * Callable调用的例子
     */
    public Boolean setnx(String key, String val){
        String method = "com.jd.marketing.util.RedisUtil.setnx";
        Callable<Boolean> process = () -> {
            Long rst = redisUtils.setnx(key, val);
            if(rst == null || rst == 0){
                return false;
            }
            return true;
        };
        return CacheHelper.invoke(method, process);
    }

对于Predicate<T>接口,等价于Function<T, Boolean>, 示例以下:

 /**
     * Precidate调用的例子
     */
    public void PredicateSample() {
        List list = new ArrayList();
        list.add("a")
        list.removeIf(item -> {
            return item.equals("a");
        });
    }

说明一下,Predicate的入参为一个参数,出参为boolean,很适合进行条件判断的场合。在JDK的List数据结构中,因为removeIf方法没法耦合进去业务代码,因此利用Predicate函数式接口将业务逻辑实现部分抛给了用户自行处理,用户处理完毕,只要返回给我true,我就删掉当前的item;返回给我false,我就保留当前的item。解耦作的很是漂亮。那么List的removeIf实现方式你以为是怎样实现的呢?若是我不看JDK代码的话,我以为实现方式以下:

 public boolean removeIf(Predicate<T> predicate){
        final Iterator<T> iterator = getIterator();
        while (iterator.hasNext()) {
            T current = iterator.next();
            boolean result = predicate.test(current);
            if(result){
                iterator.remove();
                return true;
            }
        }
        return false;
    }

可是实际你去看看List默认的removeIf实现,源码大概和我写的差很少。因此只要理解了函数式接口,咱们也能写出JDK Style的代码,酷吧。

CompletableFuture实现异步处理

好了,上面就是函数式接口的总体介绍和使用简介,不知道你看了以后,理解了多少呢?接下来咱们要讲解的异步,彻底基于上面的函数式接口回调,若是以前的都看懂了,下面的讲解你将豁然开朗;反之则要悟了。可是正确的方向都已经指出来了,因此入门应该是没有难度的。

CompletableFuture,很长的一个名字,我对他的印象停留在一次代码评审会上,当时有人提到了这个类,我只是简单的记录下来了,以后去JDK源码中搜索了一下,看看主要干什么的,也没有怎么想去看它。结果当我搜到这个类,而后看到Author的时候,我以为我发现了金矿同样,因而我决定深刻的研究下去,那个做者的名字就是:

/**
 * A {@link Future} that may be explicitly completed (setting its
 * value and status), and may be used as a {@link CompletionStage},
 * supporting dependent functions and actions that trigger upon its
 * completion.
 *
 * <p>When two or more threads attempt to
 * {@link #complete complete},
 * {@link #completeExceptionally completeExceptionally}, or
 * {@link #cancel cancel}
 * a CompletableFuture, only one of them succeeds.
 *
 * <p>In addition to these and related methods for directly
 * manipulating status and results, CompletableFuture implements
 * interface {@link CompletionStage} with the following policies: <ul>
 *
 * <li>Actions supplied for dependent completions of
 * <em>non-async</em> methods may be performed by the thread that
 * completes the current CompletableFuture, or by any other caller of
 * a completion method.</li>
 *
 * <li>All <em>async</em> methods without an explicit Executor
 * argument are performed using the {@link ForkJoinPool#commonPool()}
 * (unless it does not support a parallelism level of at least two, in
 * which case, a new Thread is created to run each task).  To simplify
 * monitoring, debugging, and tracking, all generated asynchronous
 * tasks are instances of the marker interface {@link
 * AsynchronousCompletionTask}. </li>
 *
 * <li>All CompletionStage methods are implemented independently of
 * other public methods, so the behavior of one method is not impacted
 * by overrides of others in subclasses.  </li> </ul>
 *
 * <p>CompletableFuture also implements {@link Future} with the following
 * policies: <ul>
 *
 * <li>Since (unlike {@link FutureTask}) this class has no direct
 * control over the computation that causes it to be completed,
 * cancellation is treated as just another form of exceptional
 * completion.  Method {@link #cancel cancel} has the same effect as
 * {@code completeExceptionally(new CancellationException())}. Method
 * {@link #isCompletedExceptionally} can be used to determine if a
 * CompletableFuture completed in any exceptional fashion.</li>
 *
 * <li>In case of exceptional completion with a CompletionException,
 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an
 * {@link ExecutionException} with the same cause as held in the
 * corresponding CompletionException.  To simplify usage in most
 * contexts, this class also defines methods {@link #join()} and
 * {@link #getNow} that instead throw the CompletionException directly
 * in these cases.</li> </ul>
 *
 * @author Doug Lea
 * @since 1.8
 */

Doug Lea,Java并发编程的大神级人物,整个JDK里面的并发编程包,几乎都是他的做品,很务实的一个老爷子,目前在纽约州立大学奥斯威戈分校执教。好比咱们异常熟悉的AtomicInteger类也是其做品:

/**
 * An {@code int} value that may be updated atomically.  See the
 * {@link java.util.concurrent.atomic} package specification for
 * description of the properties of atomic variables. An
 * {@code AtomicInteger} is used in applications such as atomically
 * incremented counters, and cannot be used as a replacement for an
 * {@link java.lang.Integer}. However, this class does extend
 * {@code Number} to allow uniform access by tools and utilities that
 * deal with numerically-based classes.
 *
 * @since 1.5
 * @author Doug Lea
*/
public class AtomicInteger extends Number implements java.io.Serializable {
  // ignore code 
}

想查阅老爷子的最新资料,建议到Wikipedia上查找,里面有他的博客连接等,我这里就再也不作过多介绍,回到正题上来,咱们继续谈谈CompletableFuture吧。我刚才贴的关于这个类的描述,都是英文的,并且特别长,咱们不妨贴出中文释义来,看看具体是个什么玩意儿:

 继承自Future,带有明确的结束标记;同时继承自CompletionStage,支持多函数调用行为直至完成态。
 当两个以上的线程对CompletableFuture进行complete调用,completeExceptionally调用或者cancel调用,只有一个会成功。
 
 为了直观的保持相关方法的状态和结果,CompletableFuture按照以下原则继承并实现了CompletionStage接口:

 1. 多个同步方法的级联调用,可能会被当前的CompletableFuture置为完成态,也可能会被级联函数中的任何一个方法置为完成态。

 2. 异步方法的执行,默认使用ForkJoinPool来进行(若是当前的并行标记不支持多并发,那么将会为每一个任务开启一个新的线程来进行)。
    为了简化监控,调试,代码跟踪等,全部的异步任务必须继承自AsynchronousCompletionTask。

 3. 全部的CompletionStage方法都是独立的,overrid子类中的其余的方法并不会影响当前方法行为。

 CompletableFuture同时也按照以下原则继承并实现了Future接口:

 1. 因为此类没法控制完成态(一旦完成,直接返回给调用方),因此cancellation被当作是另外一种带有异常的完成状态. 在这种状况下cancel方法和CancellationException是等价的。
    方法isCompletedExceptionally能够用来监控CompletableFuture在一些异常调用的场景下是否完成。

 2. get方法和get(long, TimeUint)方法将会抛出ExecutionException异常,一旦计算过程当中有CompletionException的话。
    为了简化使用,这个类同时也定义了join()方法和getNow()方法来避免CompletionException的抛出(在CompletionException抛出以前就返回告终果)。
 

因为没有找到中文文档,因此这里自行勉强解释了一番,有些差强人意。

在咱们平常生活中,咱们的不少行为其实都是要么有结果的,要么无结果的。好比说作蛋糕,作出来的蛋糕就是结果,那么通常咱们用Callable或者Supplier来表明这个行为,由于这两个函数式接口的执行,是须要有返回结果的。再好比说吃蛋糕,吃蛋糕这个行为,是无结果的。由于他仅仅表明咱们去干了一件事儿,因此会用Consumer或者Runnable来表明吃饭这个行为。由于这两个函数式接口的执行,是不返回结果的。有时候我发现家里没有作蛋糕的工具,因而我便去外面的蛋糕店委托蛋糕师傅给我作一个,那么这种委托行为,其实就是一种异步行为,会用Future来描述。由于Future神奇的地方在于,可让一个同步执行的方法编程异步的,就好似委托蛋糕师傅作蛋糕同样。这样咱们就能够在蛋糕师傅给咱们作蛋糕期间去作一些其余的事儿,好比听音乐等等。可是因为Future不具备事件完成告知的能力,因此得须要本身去一遍一遍的问师傅,作好了没有。而CompletableFuture则具备这种能力,因此总结起来以下:

  • Callable,有结果的同步行为,好比作蛋糕
  • Runnable,无结果的同步行为,好比吃蛋糕
  • Future,异步封装Callable/Runnable,好比委托给蛋糕师傅(其余线程)去作
  • CompletableFuture,封装Future,使其拥有回调功能,好比让师傅主动告诉我蛋糕作好了

那么上面描述的场景,咱们用代码封装一下吧:

 public static void main(String... args) throws Exception {
        CompletableFuture
                .supplyAsync(() -> makeCake())
                .thenAccept(cake -> eatCake(cake));
        System.out.println("先回家听音乐,蛋糕作好后给我打电话,我来取...");
        Thread.currentThread().join();
    }

    private static Cake makeCake() {
        System.out.println("我是蛋糕房,开始为你制做蛋糕...");
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Cake cake = new Cake();
        cake.setName("草莓蛋糕");
        cake.setShape("圆形");
        cake.setPrice(new BigDecimal(99));
        System.out.println("蛋糕制做完毕,请取回...");
        return cake;
    }

    private static void eatCake(Cake cake) {
        System.out.println("这个蛋糕是" + cake.getName() + ",我喜欢,开吃...");
    }

最后执行结果以下:

我是蛋糕房,开始为你制做蛋糕...
先回家听音乐,蛋糕作好后给我打电话,我来取...
蛋糕制做完毕,请取回...
这个蛋糕是草莓蛋糕,我喜欢,开吃...

因为CompletableFuture的api有50几个,数量很是多,咱们能够先将其划分为若干大类(摘自理解CompletableFuture,总结的很是好,直接拿来用):

建立类:用于CompletableFuture对象建立,好比:

  • completedFuture
  • runAsync
  • supplyAsync
  • anyOf
  • allOf

状态取值类:用于判断当前状态和同步等待取值,好比:

  • join
  • get
  • getNow
  • isCancelled
  • isCompletedExceptionally
  • isDone

控制类:可用于主动控制CompletableFuture完成行为,好比:

  • complete
  • completeExceptionally
  • cancel

接续类:CompletableFuture最重要的特性,用于注入回调行为,好比:

  • thenApply, thenApplyAsync
  • thenAccept, thenAcceptAsync
  • thenRun, thenRunAsync
  • thenCombine, thenCombineAsync
  • thenAcceptBoth, thenAcceptBothAsync
  • runAfterBoth, runAfterBothAsync
  • applyToEither, applyToEitherAsync
  • acceptEither, acceptEitherAsync
  • runAfterEither, runAfterEitherAsync
  • thenCompose, thenComposeAsync
  • whenComplete, whenCompleteAsync
  • handle, handleAsync
  • exceptionally

上面的方法很是多,而大多具备类似性,咱们大可没必要立刻记忆。先来看看几个通常性的规律,即可辅助记忆(重要):

  1. 以Async后缀结尾的方法,均是异步方法,对应无Async则是同步方法。
  2. 以Async后缀结尾的方法,必定有两个重载方法。其一是采用内部forkjoin线程池执行异步,其二是指定一个Executor去运行。
  3. 以run开头的方法,其方法入参的lambda表达式必定是无参数,而且无返回值的,其实就是指定Runnable
  4. 以supply开头的方法,其方法入参的lambda表达式必定是无参数,而且有返回值,其实就是指Supplier
  5. 以Accept为开头或结尾的方法,其方法入参的lambda表达式必定是有参数,可是无返回值,其实就是指Consumer
  6. 以Apply为开头或者结尾的方法,其方法入参的lambda表达式必定是有参数,可是有返回值,其实就是指Function
  7. 带有either后缀的表示谁先完成则消费谁。

以上6条记住以后,就能够记住60%以上的API了。

先来看一下其具体的使用方式吧(网上有个外国人写了CompletableFuture的20个例子,我看有中文版了,放到这里,你们能够参考下)。

  /**
     * CompletableFuture调用completedFuture方法,代表执行完毕
     */
    static void sample1() {
        CompletableFuture cf = CompletableFuture.completedFuture("message");
        Assert.assertTrue(cf.isDone());
        Assert.assertEquals("message", cf.getNow(null));
    }

sample1代码,能够看出,若是想让一个ComopletableFuture执行完毕,最简单的方式就是调用其completedFuture方法便可。以后就能够用getNow对其结果进行获取,若是获取不到就返回默认值null。

 /**
     * 两个方法串行执行,后一个方法依赖前一个方法的返回
     */
    static void sample2() {
        CompletableFuture cf = CompletableFuture
                .completedFuture("message")
                .thenApply(message -> {
                    Assert.assertFalse(Thread.currentThread().isDaemon());
                    return message.toUpperCase();
                });
        Assert.assertEquals("MESSAGE", cf.getNow(null));
    }

sample2代码,利用thenApply实现两个函数串行执行,后一个函数的执行以来前一个函数的返回结果。

/**
     * 两个方法并行执行,两个都执行完毕后,在进行汇总
     */
    static void sample3() {

        long start = System.currentTimeMillis();

        CompletableFuture cf = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        CompletableFuture cf1 = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        CompletableFuture.allOf(cf, cf1).whenComplete((v,t)->{
            System.out.println("都完成了");
        }).join();
        long end = System.currentTimeMillis();
        System.out.println((end-start));
    }

sample3最后的执行结果为:

都完成了
2087

能够看到,耗时为2087毫秒,若是是串行执行,须要耗时3000毫秒,可是并行执行,则以最长执行时间为准,其实这个特性在进行远程RPC/HTTP服务调用的时候,将会很是有用,咱们一下子再进行讲解如何用它来反哺业务。

 /**
     * 方法执行取消
     */
    static void sample4(){
        CompletableFuture cf = CompletableFuture.supplyAsync(()->{
            try {
                System.out.println("开始执行函数...");
                Thread.sleep(2000);
                System.out.println("执行函数完毕...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "ok";
        });
        CompletableFuture cf2 = cf.exceptionally(throwable -> {
            return throwable;
        });
        cf2.cancel(true);
        if(cf2.isCompletedExceptionally()){
            System.out.println("成功取消了函数的执行");
        }
        cf2.join();

    }

调用结果以下:

开始执行函数...
成功取消了函数的执行
Exception in thread "main" java.util.concurrent.CancellationException
    at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2263)
    at com.jd.jsmartredis.article.Article.sample4(Article.java:108)
    at com.jd.jsmartredis.article.Article.main(Article.java:132)

能够看到咱们成功的将函数执行中断,同时因为cf2返回的会一个throwable的Exception,因此咱们的console界面将其也原封不动的打印了出来。

讲解了基本使用以后,如何使用其来反哺咱们的业务呢?咱们就以通用下单为例吧,来看看通用下单有哪些能够优化的点。

image

上图就是咱们在通用下单接口常常调用的接口,分为下单地址接口,商品信息接口,京豆接口,因为这三个接口没有依赖关系,因此能够并行的来执行。若是换作是目前的作法,那么确定是顺序执行,假如三个接口获取都耗时1s的话,那么三个接口获取完毕,咱们的耗时为3s。可是若是改为异步方式执行的话,那么将会简单不少,接下来,咱们开始改造吧。

public Result submitOrder(String pin, CartVO cartVO) {

        //获取下单地址
        CompletableFuture addressFuture = CompletableFuture.supplyAsync(() -> {
            AddressResult addressResult = addressRPC.getAddressListByPin(pin);
            return addressResult;
        });

        //获取商品信息
        CompletableFuture goodsFuture = CompletableFuture.supplyAsync(() -> {
            GoodsResult goodsResult = goodsRPC.getGoodsInfoByPin(pin, cartVO);
            return goodsResult;
        });

        //获取京豆信息
        CompletableFuture beanFuture = CompletableFuture.supplyAsync(() -> {
            JinbeanResult jinbeanResult = JinbeanRPC.getJinbeanByPin(pin);
            return jinbeanResult;
        });

        CompletableFuture.allOf(addressFuture, goodsFuture, beanFuture).whenComplete((v, throwable) -> {
            if (throwable == null) {
                logger.error("获取地址,商品,京豆信息失败", throwable);
                //TODO 尝试从新获取
            } else {
                logger.error("获取地址,商品,京豆信息成功");
            }
        }).join();

        AddressResult addressResult = addressFuture.getNow(null);
        GoodsResult goodsResult = goodsFuture.getNow(null);
        JinbeanResult jinbeanResult = beanFuture.getNow(null);
        
        //TODO 后续处理
    }

这样,咱们利用将普通的RPC执行编程了异步,并且附带了强大的错误处理,是否是很简单?

可是若是遇到以下图示的调用结构,CompletableFuture可否很轻松的应对呢?

image

因为业务变动,须要附带延保信息,为了后续从新计算价格,因此必须将延保商品获取出来,而后计算价格。其实这种既有同步,又有异步的作法,利用CompletableFuture来handle,也是轻松天然,代码以下:

 public Result submitOrder(String pin, CartVO cartVO) {

        //获取下单地址
        CompletableFuture addressFuture = CompletableFuture.supplyAsync(() -> {
            AddressResult addressResult = addressRPC.getAddressListByPin(pin);
            return addressResult;
        });

        //获取商品信息
        CompletableFuture goodsFuture = CompletableFuture.supplyAsync(() -> {
            GoodsResult goodsResult = goodsRPC.getGoodsInfoByPin(pin, cartVO);
            return goodsResult;
        }).thenApplyAsync((goodsResult, Map)->{
            YanbaoResult yanbaoResult = yanbaoRPC.getYanbaoInfoByGoodID(goodsResult.getGoodId, pin);
            Map<String, Object> map = new HashMap<>();
            map.put("good", goodsResult);
            map.put("yanbao",yanbaoResult);
            return map;
        });

        //获取京豆信息
        CompletableFuture beanFuture = CompletableFuture.supplyAsync(() -> {
            JinbeanResult jinbeanResult = JinbeanRPC.getJinbeanByPin(pin);
            return jinbeanResult;
        });

        CompletableFuture.allOf(addressFuture, goodsFuture, beanFuture).whenComplete((v, throwable) -> {
            if (throwable == null) {
                logger.error("获取地址,商品-延保,京豆信息失败", throwable);
                //TODO 尝试从新获取
            } else {
                logger.error("获取地址,商品-延保,京豆信息成功");
            }
        }).join();

        AddressResult addressResult = addressFuture.getNow(null);
        GoodsResult goodsResult = goodsFuture.getNow(null);
        JinbeanResult jinbeanResult = beanFuture.getNow(null);

        //TODO 后续处理
    }

这样咱们就能够了,固然这种改造给咱们带来的好处也是显而易见的,咱们不须要针对全部的接口进行OPS优化,而是针对性能最差的接口进行OPS优化,只要提高了性能最差的接口,那么总体的性能就上去了。

洋洋洒洒写了这么多,但愿对你们有用,谢谢。

参考资料:

理解CompletableFuture

CompletableFuture 详解

JDK中CompletableFuture的源码

相关文章
相关标签/搜索