前言html
回调,顾名思义,回过头来调用,详细的说来就是用户无需关心内部实现的具体逻辑,只须要在暴露出的回调函数中放入本身的业务逻辑便可。因为回调机制解耦了框架代码和业务代码,因此能够看作是对面向对象解耦的具体实践之一。因为本文的侧重点在于讲解后端回调,因此对于前端回调甚至于相似JSONP的回调函数类的,利用本章讲解的知识进行代入的时候,请斟酌一二,毕竟后端和前端仍是有必定的区别,所谓差之毫厘,可能谬以千里,慎之。因此本章对回调的讲解侧重于后端,请知悉。前端
回调定义java
说到回调,其实个人理解相似于函数指针的功能,怎么讲呢?由于一个方法,一旦附加了回调入参,那么用户在进行调用的时候,这个回调入参是能够用匿名方法直接替代的。回调的使用必须和方法的签名保持一致性,下面咱们来看一个JDK实现的例子:git
default boolean removeIf(Predicate<? super E> filter) { Objects.requireNonNull(filter); boolean removed = false; final Iterator<E> each = iterator(); while (each.hasNext()) { if (filter.test(each.next())) { each.remove(); removed = true; } } return removed; }
在JDK中,List结构有一个removeIf的方法,其实现方式如上所示。因为附带了具体的注释讲解,我这里就再也不进行过多的讲述。咱们须要着重关注的是其入参:Predicate,由于他就是一个函数式接口,入参为泛型E,出参为boolean,其实和Function<? super E, boolean>是等价的。因为List是一个公共的框架代码,里面不可能糅合业务代码,因此为了解耦框架代码和业务代码,JDK使用了内置的各类函数式接口做为方法的回调,将具体的业务实践抛出去,让用户本身实现,而它本身只接受用户返回的结果就好了:只要用户处理返回true(filter.test(each.next()返回true),那么我就删掉当前遍历的数据;若是用户处理返回false(filter.test(each.next()返回false),那么我就保留当前遍历的数据。是否是很是的nice?github
其实这种完美的协做关系,在JDK类库中随处可见,在其余常常用到的框架中也很常见,诸如Guava,Netty,实在是太多了(这也从侧面说明,利用函数式接口解耦框架和业务,是正确的作法),我撷取了部分片断以下:redis
//将map中的全部entry进行替换 void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) //将map中的entry进行遍历 void forEach(BiConsumer<? super K, ? super V> action) //map中的entry值若是有,则用新值从新创建映射关系 V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) //Deque遍历元素 void forEach(Consumer<? super T> action) //Deque按给定条件移除元素 boolean removeIf(Predicate<? super E> filter) //Guava中获取特定元素 <T> T get(Object key, final Callable<T> valueLoader) //Netty中设置监听 ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener)
那么,回过头来想一想,若是咱们封装本身的组件,想要封装的很JDK Style,该怎么作呢?若是上来直接理解Predicate,Function,Callable,Consumer,我想不少人是有困难的,那就听我慢慢道来吧。编程
咱们先假设以下一段代码,这段代码我相信不少人会很熟悉,不少人也封装过,这就是咱们的大名鼎鼎的RedisUtils封装类:后端
/** * key递增特定的num * @param key Redis中保存的key * @param num 递增的值 * @return key计算完毕后返回的结果值 */ public Long incrBy(String key,long num) { CallerInfo callerInfo = Profiler.registerInfo("sendCouponService.redis.incrBy", "sendCouponService", false, true); Long result; try { result = jrc.incrBy(key, num); } catch (Exception e) { logger.error("incrBy", null, "sendCouponService.redis.incrBy异常,key={},value={}", e, key, num); Profiler.functionError(callerInfo); throw new RuntimeException("sendCouponService.redis.incrBy异常,key=" + key, e); } finally { Profiler.registerInfoEnd(callerInfo); } return result; }
上面这段代码只是一个示例,其实还有上百个方法基本上都是这种封装结构,这种封装有问题吗?没问题!并且封装方式辛辣老道,一看就是高手所为,由于既加了监控,又作了异常处理,并且还有错误日志记录,一旦发生问题,咱们可以第一时间知道哪里出了问题,哪一个方法出了问题,而后设定对应的应对方法。api
这种封装方式,若是当作普通的Util来用,彻底没有问题,可是若是想封装成组件,则欠缺点什么,我列举以下:数据结构
1. 当前代码写死了用jrc操做,若是后期切换到jimdb,是否是还得为jimdb专门写一套呢?
2. 当前代码,上百个方法,其实不少地方都是重复的,惟有redis操做那块不一样,代码重复度特别高,一旦扩展新方法,基本上是剖解原有代码,而后拷贝现有方法,最后改为新方法。
3. 当前方法,包含的都是redis单操做,若是遇到那种涉及到多个操做组合的(好比先set,而后expire或者更复杂一点),须要添加新方法,本质上这种新方法其实和业务性有关了。
从上面列出的这几点来看,其实咱们能够彻底将其打形成一个兼容jrc操做和cluster操做,同时具备良好框架扩展性(策略模式+模板模式)和良好代码重复度控制(函数式接口回调)的框架。因为本章涉及内容为异步回调,因此这里咱们将讲解这种代码如何保持良好的代码重复度控制上。至于良好的框架扩展性,若是感兴趣的话,我会在后面的章节进行讲解。那么咱们开始进行优化吧。
首先,找出公共操做部分(白色)和非公共操做部分(黄色):
/** * key递增特定的num * @param key Redis中保存的key * @param num 递增的值 * @return key计算完毕后返回的结果值 */ public Long incrBy(String key,long num) { CallerInfo callerInfo = Profiler.registerInfo("sendCouponService.redis.incrBy", "sendCouponService", false, true); Long result;
try { result = jrc.incrBy(key, num);
} catch (Exception e) { logger.error("incrBy", null, "sendCouponService.redis.incrBy异常,key={},value={}", e, key, num);
Profiler.functionError(callerInfo);
return null;
} finally { Profiler.registerInfoEnd(callerInfo); } return result; }
经过上面的标记,咱们发现非公共操做部分,有两类:
1. ump提示语和日志提示语不一致
2. 操做方法不一致
标记出来了公共操做部分,以后咱们开始封装公共部分:
/** * 公共模板抽取 * * @param method * @param callable * @param <T> * @return */ public static <T> T invoke(String method) { CallerInfo info = Profiler.registerInfo(method, false, true); try { //TODO 这里放置不一样的redis操做方法 } catch (Exception e) { logger.error(method, e); AlarmUtil.alarm(method + e.getCause()); reutrn null; } finally { Profiler.registerInfoEnd(info); } }
可是这里有个问题,咱们虽然把公共模板抽取出来了,可是TODO标签里面的内容怎么办呢? 如何把不一样的redis操做方法传递进来呢?
其实在java中,咱们能够利用接口的方式,将具体的操做代理出去,由外部调用者来实现,听起来是否是感受又和IOC搭上了点关系,不错,你想的没错,这确实是控制反转依赖注入的一种作法,经过接口方式将具体的实践代理出去,这也是进行回调操做的原理。接下来看咱们的改造:
/** * redis操做接口 */ public interface RedisOperation<T>{ //调用redis方法,入参为空,出参为T泛型 T invoke(); } /** * redis操做公共模板 * @param method * @param redisOperation * @param <T> * @return */ public static <T> T invoke(String method,RedisOperation redisOperation) { CallerInfo info = Profiler.registerInfo(method, false, true); try { return redisOperation.invoke(); } catch (Exception e) { logger.error(method, e); AlarmUtil.alarm(method + e.getCause()); reutrn null; } finally { Profiler.registerInfoEnd(info); } }
这样,咱们就打造好了一个公共的redis操做模板,以后就能够像下面的方式来使用了:
@Override public Long incrby(String key, long val){ String method = "com.jd.marketing.util.RedisUtil.incrby"; RedisOperation<Long> process = () -> { return redisUtils.incrBy(key, val); }; return CacheHelper.invoke(method, process); }
以后的一百多个方法,你也可使用这样的方式来一一进行包装,以后你会发现原来RedisUtils封装完毕,代码写了2000行,可是用这种方式以后,代码只写了1000行,并且后续有新的联合操做过来,你只须要在以下代码段里面直接把级联操做添加进去便可:
RedisOperation<Long> process = () -> { //TODO other methods //TODO other methods return redisUtils.incrBy(key, val); };
是否是很方便快捷?在这里我须要因此下的是,因为RedisOperation里面的invoke方法是没有入参,带有一个出参结果的调用。因此在回调这里,我用了匿名表达式来()->{}来match这种操做。可是若是回调这里,一个入参,一个出参的话,那么个人匿名表达式须要这样写 param->{}, 多个入参,那就变成了这样 (param1, param2, param3)->{} 。因为这里并不是重点,我不想过多讲解,若是对这种使用方式不熟悉,能够彻底使用以下的方式来进行书写也行:
@Override public Long incrby(String key, long val){ String method = "com.jd.marketing.util.RedisUtil.incrby"; RedisOperation<Long> process = () -> incrByOperation(key, val); return CacheHelper.invoke(method, process); } private Long incrByOperation(String key, long val){ return redisUtils.incrBy(key, val); }
其实说到这里的时候,我就有必要提一下开头的埋下的线索了。其实以前演示的Netty的代码:
//Netty中设置监听
ChannelPromise addListener(GenericFutureListener<? extends Future<? super Void>> listener)
GenericFutureListener这个接口
就是按照上面的写法来作的,是否是豁然开朗呢?至于其调用方式,也和上面讲解的一致,只要符合接口里面方法的调用标准就行(入参和出参符合就行), 好比 future –> {}。
说到这里,咱们可能认为这样太麻烦了,本身定义接口,而后注入到框架中,最后用户本身实现调用方法,一长串。是的,你说的没错,这样确实太麻烦了,JDK因而专门用了一个 FunctionalInterface的annotation来帮咱们作了,因此在JDK中,若是你看到Consumer,Function,Supplier等,带有@FunctionalInterface标注的接口,那么就说明他是一个函数式接口,而这种接口是干什么的,具体的原理就是我上面讲的。下面咱们来梳理梳理这些接口吧。
先看一下咱们的RedisUtils使用JDK自带的函数式接口的最终封装效果:
从图示代码能够看出,总体封装变得简洁许多,并且咱们用了JDK内置的函数式接口,因此也无需写其余多余的代码,看上去很清爽,重复代码基本不见了。并且,因为JDK提供的其余的函数式接口有运算操做,好比Predicate.or, Predicate.and操做等,大大增强了封装的趣味性和乐趣。
下面我将JDK中涉及的经常使用的函数式接口列举一遍,而后来详细讲解讲解吧,列表以下:
Consumer, 提供void accept(T t)回调 Runnable, 提供void run()回调 Callable, 提供V call() throws Exception回调 Supplier, 提供T get()回调 Function, 提供R apply(T t)回调, 有andThen接续操做 Predicate, 提供boolean test(T t)回调, 等价于 Function<T, boolean> BiConsumer, 提供void accept(T t, U u)回调,注意带Bi的回调接口,代表入参都是双参数,好比BiPredicate
......
其实还有不少,我这里就不一一列举了。感兴趣的朋友能够在这里找到JDK提供的全部函数式接口。
接下来,咱们来说解其使用示范,以便于明白怎么样去使用它。
对于Consumer函数式接口,内部的void accept(T t)回调方法,代表了它只能回调有一个入参,没有返参的方法。示例以下:
/** * Consumer调用的例子 */ public void ConsumerSample() { LinkedHashMap linkedHashMap = new LinkedHashMap(); linkedHashMap.put("key", "val"); linkedHashMap.forEach((k, v) -> { System.out.println("key" + k + ",val" + v); }); }
对于Callable接口,其实和Supplier接口是同样的,只是有无Exception抛出的区别,示例以下:
/** * Callable调用的例子 */ public Boolean setnx(String key, String val){ String method = "com.jd.marketing.util.RedisUtil.setnx"; Callable<Boolean> process = () -> { Long rst = redisUtils.setnx(key, val); if(rst == null || rst == 0){ return false; } return true; }; return CacheHelper.invoke(method, process); }
对于Predicate<T>接口,等价于Function<T, Boolean>, 示例以下:
/** * Precidate调用的例子 */ public void PredicateSample() { List list = new ArrayList(); list.add("a") list.removeIf(item -> { return item.equals("a"); }); }
说明一下,Predicate的入参为一个参数,出参为boolean,很适合进行条件判断的场合。在JDK的List数据结构中,因为removeIf方法没法耦合进去业务代码,因此利用Predicate函数式接口将业务逻辑实现部分抛给了用户自行处理,用户处理完毕,只要返回给我true,我就删掉当前的item;返回给我false,我就保留当前的item。解耦作的很是漂亮。那么List的removeIf实现方式你以为是怎样实现的呢?若是我不看JDK代码的话,我以为实现方式以下:
public boolean removeIf(Predicate<T> predicate){ final Iterator<T> iterator = getIterator(); while (iterator.hasNext()) { T current = iterator.next(); boolean result = predicate.test(current); if(result){ iterator.remove(); return true; } } return false; }
可是实际你去看看List默认的removeIf实现,源码大概和我写的差很少。因此只要理解了函数式接口,咱们也能写出JDK Style的代码,酷吧。
CompletableFuture实现异步处理
好了,上面就是函数式接口的总体介绍和使用简介,不知道你看了以后,理解了多少呢?接下来咱们要讲解的异步,彻底基于上面的函数式接口回调,若是以前的都看懂了,下面的讲解你将豁然开朗;反之则要悟了。可是正确的方向都已经指出来了,因此入门应该是没有难度的。
CompletableFuture,很长的一个名字,我对他的印象停留在一次代码评审会上,当时有人提到了这个类,我只是简单的记录下来了,以后去JDK源码中搜索了一下,看看主要干什么的,也没有怎么想去看它。结果当我搜到这个类,而后看到Author的时候,我以为我发现了金矿同样,因而我决定深刻的研究下去,那个做者的名字就是:
/** * A {@link Future} that may be explicitly completed (setting its * value and status), and may be used as a {@link CompletionStage}, * supporting dependent functions and actions that trigger upon its * completion. * * <p>When two or more threads attempt to * {@link #complete complete}, * {@link #completeExceptionally completeExceptionally}, or * {@link #cancel cancel} * a CompletableFuture, only one of them succeeds. * * <p>In addition to these and related methods for directly * manipulating status and results, CompletableFuture implements * interface {@link CompletionStage} with the following policies: <ul> * * <li>Actions supplied for dependent completions of * <em>non-async</em> methods may be performed by the thread that * completes the current CompletableFuture, or by any other caller of * a completion method.</li> * * <li>All <em>async</em> methods without an explicit Executor * argument are performed using the {@link ForkJoinPool#commonPool()} * (unless it does not support a parallelism level of at least two, in * which case, a new Thread is created to run each task). To simplify * monitoring, debugging, and tracking, all generated asynchronous * tasks are instances of the marker interface {@link * AsynchronousCompletionTask}. </li> * * <li>All CompletionStage methods are implemented independently of * other public methods, so the behavior of one method is not impacted * by overrides of others in subclasses. </li> </ul> * * <p>CompletableFuture also implements {@link Future} with the following * policies: <ul> * * <li>Since (unlike {@link FutureTask}) this class has no direct * control over the computation that causes it to be completed, * cancellation is treated as just another form of exceptional * completion. Method {@link #cancel cancel} has the same effect as * {@code completeExceptionally(new CancellationException())}. Method * {@link #isCompletedExceptionally} can be used to determine if a * CompletableFuture completed in any exceptional fashion.</li> * * <li>In case of exceptional completion with a CompletionException, * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an * {@link ExecutionException} with the same cause as held in the * corresponding CompletionException. To simplify usage in most * contexts, this class also defines methods {@link #join()} and * {@link #getNow} that instead throw the CompletionException directly * in these cases.</li> </ul> * * @author Doug Lea * @since 1.8 */
Doug Lea,Java并发编程的大神级人物,整个JDK里面的并发编程包,几乎都是他的做品,很务实的一个老爷子,目前在纽约州立大学奥斯威戈分校执教。好比咱们异常熟悉的AtomicInteger类也是其做品:
/** * An {@code int} value that may be updated atomically. See the * {@link java.util.concurrent.atomic} package specification for * description of the properties of atomic variables. An * {@code AtomicInteger} is used in applications such as atomically * incremented counters, and cannot be used as a replacement for an * {@link java.lang.Integer}. However, this class does extend * {@code Number} to allow uniform access by tools and utilities that * deal with numerically-based classes. * * @since 1.5 * @author Doug Lea */ public class AtomicInteger extends Number implements java.io.Serializable { // ignore code }
想查阅老爷子的最新资料,建议到Wikipedia上查找,里面有他的博客连接等,我这里就再也不作过多介绍,回到正题上来,咱们继续谈谈CompletableFuture吧。我刚才贴的关于这个类的描述,都是英文的,并且特别长,咱们不妨贴出中文释义来,看看具体是个什么玩意儿:
继承自Future,带有明确的结束标记;同时继承自CompletionStage,支持多函数调用行为直至完成态。
当两个以上的线程对CompletableFuture进行complete调用,completeExceptionally调用或者cancel调用,只有一个会成功。
为了直观的保持相关方法的状态和结果,CompletableFuture按照以下原则继承并实现了CompletionStage接口:
1. 多个同步方法的级联调用,可能会被当前的CompletableFuture置为完成态,也可能会被级联函数中的任何一个方法置为完成态。
2. 异步方法的执行,默认使用ForkJoinPool来进行(若是当前的并行标记不支持多并发,那么将会为每一个任务开启一个新的线程来进行)。
为了简化监控,调试,代码跟踪等,全部的异步任务必须继承自AsynchronousCompletionTask。
3. 全部的CompletionStage方法都是独立的,overrid子类中的其余的方法并不会影响当前方法行为。
CompletableFuture同时也按照以下原则继承并实现了Future接口:
1. 因为此类没法控制完成态(一旦完成,直接返回给调用方),因此cancellation被当作是另外一种带有异常的完成状态. 在这种状况下cancel方法和CancellationException是等价的。
方法isCompletedExceptionally能够用来监控CompletableFuture在一些异常调用的场景下是否完成。
2. get方法和get(long, TimeUint)方法将会抛出ExecutionException异常,一旦计算过程当中有CompletionException的话。
为了简化使用,这个类同时也定义了join()方法和getNow()方法来避免CompletionException的抛出(在CompletionException抛出以前就返回告终果)。
因为没有找到中文文档,因此这里自行勉强解释了一番,有些差强人意。
在咱们平常生活中,咱们的不少行为其实都是要么有结果的,要么无结果的。好比说作蛋糕,作出来的蛋糕就是结果,那么通常咱们用Callable或者Supplier来表明这个行为,由于这两个函数式接口的执行,是须要有返回结果的。再好比说吃蛋糕,吃蛋糕这个行为,是无结果的。由于他仅仅表明咱们去干了一件事儿,因此会用Consumer或者Runnable来表明吃饭这个行为。由于这两个函数式接口的执行,是不返回结果的。有时候我发现家里没有作蛋糕的工具,因而我便去外面的蛋糕店委托蛋糕师傅给我作一个,那么这种委托行为,其实就是一种异步行为,会用Future来描述。由于Future神奇的地方在于,可让一个同步执行的方法编程异步的,就好似委托蛋糕师傅作蛋糕同样。这样咱们就能够在蛋糕师傅给咱们作蛋糕期间去作一些其余的事儿,好比听音乐等等。可是因为Future不具备事件完成告知的能力,因此得须要本身去一遍一遍的问师傅,作好了没有。而CompletableFuture则具备这种能力,因此总结起来以下:
那么上面描述的场景,咱们用代码封装一下吧:
public static void main(String... args) throws Exception { CompletableFuture .supplyAsync(() -> makeCake()) .thenAccept(cake -> eatCake(cake)); System.out.println("先回家听音乐,蛋糕作好后给我打电话,我来取..."); Thread.currentThread().join(); } private static Cake makeCake() { System.out.println("我是蛋糕房,开始为你制做蛋糕..."); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } Cake cake = new Cake(); cake.setName("草莓蛋糕"); cake.setShape("圆形"); cake.setPrice(new BigDecimal(99)); System.out.println("蛋糕制做完毕,请取回..."); return cake; } private static void eatCake(Cake cake) { System.out.println("这个蛋糕是" + cake.getName() + ",我喜欢,开吃..."); }
最后执行结果以下:
我是蛋糕房,开始为你制做蛋糕... 先回家听音乐,蛋糕作好后给我打电话,我来取... 蛋糕制做完毕,请取回... 这个蛋糕是草莓蛋糕,我喜欢,开吃...
因为CompletableFuture的api有50几个,数量很是多,咱们能够先将其划分为若干大类(摘自理解CompletableFuture,总结的很是好,直接拿来用):
建立类:用于CompletableFuture对象建立,好比:
状态取值类:用于判断当前状态和同步等待取值,好比:
控制类:可用于主动控制CompletableFuture完成行为,好比:
接续类:CompletableFuture最重要的特性,用于注入回调行为,好比:
上面的方法很是多,而大多具备类似性,咱们大可没必要立刻记忆。先来看看几个通常性的规律,即可辅助记忆(重要):
无参数
,而且无返回值
的,其实就是指定Runnable无参数
,而且有返回值
,其实就是指Supplier有参数
,可是无返回值
,其实就是指Consumer有参数
,可是有返回值
,其实就是指Function以上6条记住以后,就能够记住60%以上的API了。
先来看一下其具体的使用方式吧(网上有个外国人写了CompletableFuture的20个例子,我看有中文版了,放到这里,你们能够参考下)。
/** * CompletableFuture调用completedFuture方法,代表执行完毕 */ static void sample1() { CompletableFuture cf = CompletableFuture.completedFuture("message"); Assert.assertTrue(cf.isDone()); Assert.assertEquals("message", cf.getNow(null)); }
sample1代码,能够看出,若是想让一个ComopletableFuture执行完毕,最简单的方式就是调用其completedFuture方法便可。以后就能够用getNow对其结果进行获取,若是获取不到就返回默认值null。
/** * 两个方法串行执行,后一个方法依赖前一个方法的返回 */ static void sample2() { CompletableFuture cf = CompletableFuture .completedFuture("message") .thenApply(message -> { Assert.assertFalse(Thread.currentThread().isDaemon()); return message.toUpperCase(); }); Assert.assertEquals("MESSAGE", cf.getNow(null)); }
sample2代码,利用thenApply实现两个函数串行执行,后一个函数的执行以来前一个函数的返回结果。
/** * 两个方法并行执行,两个都执行完毕后,在进行汇总 */ static void sample3() { long start = System.currentTimeMillis(); CompletableFuture cf = CompletableFuture.runAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); CompletableFuture cf1 = CompletableFuture.runAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } }); CompletableFuture.allOf(cf, cf1).whenComplete((v,t)->{ System.out.println("都完成了"); }).join(); long end = System.currentTimeMillis(); System.out.println((end-start)); }
sample3最后的执行结果为:
都完成了
2087
能够看到,耗时为2087毫秒,若是是串行执行,须要耗时3000毫秒,可是并行执行,则以最长执行时间为准,其实这个特性在进行远程RPC/HTTP服务调用的时候,将会很是有用,咱们一下子再进行讲解如何用它来反哺业务。
/** * 方法执行取消 */ static void sample4(){ CompletableFuture cf = CompletableFuture.supplyAsync(()->{ try { System.out.println("开始执行函数..."); Thread.sleep(2000); System.out.println("执行函数完毕..."); } catch (InterruptedException e) { e.printStackTrace(); } return "ok"; }); CompletableFuture cf2 = cf.exceptionally(throwable -> { return throwable; }); cf2.cancel(true); if(cf2.isCompletedExceptionally()){ System.out.println("成功取消了函数的执行"); } cf2.join(); }
调用结果以下:
开始执行函数... 成功取消了函数的执行 Exception in thread "main" java.util.concurrent.CancellationException at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2263) at com.jd.jsmartredis.article.Article.sample4(Article.java:108) at com.jd.jsmartredis.article.Article.main(Article.java:132)
能够看到咱们成功的将函数执行中断,同时因为cf2返回的会一个throwable的Exception,因此咱们的console界面将其也原封不动的打印了出来。
讲解了基本使用以后,如何使用其来反哺咱们的业务呢?咱们就以通用下单为例吧,来看看通用下单有哪些能够优化的点。
上图就是咱们在通用下单接口常常调用的接口,分为下单地址接口,商品信息接口,京豆接口,因为这三个接口没有依赖关系,因此能够并行的来执行。若是换作是目前的作法,那么确定是顺序执行,假如三个接口获取都耗时1s的话,那么三个接口获取完毕,咱们的耗时为3s。可是若是改为异步方式执行的话,那么将会简单不少,接下来,咱们开始改造吧。
public Result submitOrder(String pin, CartVO cartVO) { //获取下单地址 CompletableFuture addressFuture = CompletableFuture.supplyAsync(() -> { AddressResult addressResult = addressRPC.getAddressListByPin(pin); return addressResult; }); //获取商品信息 CompletableFuture goodsFuture = CompletableFuture.supplyAsync(() -> { GoodsResult goodsResult = goodsRPC.getGoodsInfoByPin(pin, cartVO); return goodsResult; }); //获取京豆信息 CompletableFuture beanFuture = CompletableFuture.supplyAsync(() -> { JinbeanResult jinbeanResult = JinbeanRPC.getJinbeanByPin(pin); return jinbeanResult; }); CompletableFuture.allOf(addressFuture, goodsFuture, beanFuture).whenComplete((v, throwable) -> { if (throwable == null) { logger.error("获取地址,商品,京豆信息失败", throwable); //TODO 尝试从新获取 } else { logger.error("获取地址,商品,京豆信息成功"); } }).join(); AddressResult addressResult = addressFuture.getNow(null); GoodsResult goodsResult = goodsFuture.getNow(null); JinbeanResult jinbeanResult = beanFuture.getNow(null); //TODO 后续处理 }
这样,咱们利用将普通的RPC执行编程了异步,并且附带了强大的错误处理,是否是很简单?
可是若是遇到以下图示的调用结构,CompletableFuture可否很轻松的应对呢?
因为业务变动,须要附带延保信息,为了后续从新计算价格,因此必须将延保商品获取出来,而后计算价格。其实这种既有同步,又有异步的作法,利用CompletableFuture来handle,也是轻松天然,代码以下:
public Result submitOrder(String pin, CartVO cartVO) { //获取下单地址 CompletableFuture addressFuture = CompletableFuture.supplyAsync(() -> { AddressResult addressResult = addressRPC.getAddressListByPin(pin); return addressResult; }); //获取商品信息 CompletableFuture goodsFuture = CompletableFuture.supplyAsync(() -> { GoodsResult goodsResult = goodsRPC.getGoodsInfoByPin(pin, cartVO); return goodsResult; }).thenApplyAsync((goodsResult, Map)->{ YanbaoResult yanbaoResult = yanbaoRPC.getYanbaoInfoByGoodID(goodsResult.getGoodId, pin); Map<String, Object> map = new HashMap<>(); map.put("good", goodsResult); map.put("yanbao",yanbaoResult); return map; }); //获取京豆信息 CompletableFuture beanFuture = CompletableFuture.supplyAsync(() -> { JinbeanResult jinbeanResult = JinbeanRPC.getJinbeanByPin(pin); return jinbeanResult; }); CompletableFuture.allOf(addressFuture, goodsFuture, beanFuture).whenComplete((v, throwable) -> { if (throwable == null) { logger.error("获取地址,商品-延保,京豆信息失败", throwable); //TODO 尝试从新获取 } else { logger.error("获取地址,商品-延保,京豆信息成功"); } }).join(); AddressResult addressResult = addressFuture.getNow(null); GoodsResult goodsResult = goodsFuture.getNow(null); JinbeanResult jinbeanResult = beanFuture.getNow(null); //TODO 后续处理 }
这样咱们就能够了,固然这种改造给咱们带来的好处也是显而易见的,咱们不须要针对全部的接口进行OPS优化,而是针对性能最差的接口进行OPS优化,只要提高了性能最差的接口,那么总体的性能就上去了。
洋洋洒洒写了这么多,但愿对你们有用,谢谢。
参考资料: