这是一篇译文,原文出处 戳这里。其实好久之前我就看完了这篇文章,只不过我的对响应式编程研究的不够深刻,羞于下笔翻译,在加上这类译文加了原创还有争议性,因此一直没有动力。恰逢今天交流群里两个大佬对响应式编程的话题辩得不可开交,趁印象还算深入,借机把这篇文章翻译一下。说道辩论的点,不妨也在这里抛出来:html
响应式编程在单机环境下是否鸡肋?java
结论是:没有结论,我以为只能抱着怀疑的眼光审视这个问题了。另外还聊到了 RSocket 这个最近在 SpringOne 大会上比较火爆的响应式"新“网络协议,github 地址戳这里,为何给”新“字打了个引号,仔细观察下 RSocket 的 commit log,其实三年前就有了。有兴趣的同窗自行翻阅,说不定就是今年这最后两三个月的热点技术哦。react
Java 圈子有一个怪事,那就是对 RxJava,Reactor,WebFlux 这些响应式编程的名词、框架永远处于渴望了解,感到新鲜,却又不甚了解,使用贫乏的状态。以前转载小马哥的那篇《Reactive Programming 一种技术,各自表述》时,就已经聊过这个关于名词之争的话题了,今天群里的讨论更是加深了个人映像。Java 圈子里面不少朋友一直对响应式编程处于一个了解名词,知道基本原理,而不是深度用户的状态(我也是之一)。可能真的和圈子有关,按石冲兄的说法,其实 Scala 圈子里面的那帮人,不知道比我们高到哪里去了(就响应式编程而言)。git
实在是很久没发文章了,向你们说声抱歉,之后的更新频率确定是没有之前那么勤了(说的好像之前很勤快似的),一部分缘由是在公司内网写的文章无法贴到公众号中和你们分享讨论,另外一部分是目前我也处于学习公司内部框架的阶段,不太方便提炼成文章,最后,最大的一部分缘由仍是我这段时间须要学(tou)习(lan)其(da)他(you)东(xi)西啦。好了,废话也说完了,下面是译文的正文部分。github
关于响应式编程(Reactive Programming),你可能有过这样的疑问:咱们已经有了 Java8 的 Stream, CompletableFuture, 以及 Optional,为何还必要存在 RxJava 和 Reactor?编程
回答这个问题并不难,若是在响应式编程中处理的问题很是简单,你的确不须要那些第三方类库的支持。 但随着复杂问题的出现,你写出了一堆难看的代码。而后这些代码变得愈来愈复杂,难以维护,而 RxJava 和 Reactor 具备许多方便的功能,能够解决你当下问题,并保障了将来一些可预见的需求。本文从响应式编程模型中抽象出了8个标准,这将有助于咱们理解标准特性与这些库之间的区别:缓存
咱们将会对如下这些类进行这些特性的对比:安全
让咱们开始吧~微信
这些类都是支持 Composable 特性的,使得各位使用者很便利地使用函数式编程的思想去思考问题,这也正是咱们拥趸它们的缘由。网络
CompletableFuture - 众多的 .then*()
方法使得咱们能够构建一个 pipeline, 用以传递空值,单一的值,以及异常.
Stream - 提供了许多链式操做的编程接口,支持在各个操做之间传递多个值。
Optional - 提供了一些中间操做 .map()
, .flatMap()
, .filter()
.
Observable, Flowable, Flux - 和 Stream 相同
CompletableFuture - 不具有惰性执行的特性,它本质上只是一个异步结果的容器。这些对象的建立是用来表示对应的工做,CompletableFuture 建立时,对应的工做已经开始执行了。但它并不知道任何工做细节,只关心结果。因此,没有办法从上至下执行整个 pipeline。当结果被设置给 CompletableFuture 时,下一个阶段才开始执行。
Stream - 全部的中间操做都是延迟执行的。全部的终止操做(terminal operations),会触发真正的计算(译者注:如 collect() 就是一个终止操做)。
Optional - 不具有惰性执行的特性,全部的操做会马上执行。
Observable, Flowable, Flux - 惰性执行,只有当订阅者出现时才会执行,不然不执行。
CompletableFuture - 能够复用,它仅仅是一个实际值的包装类。但须要注意的是,这个包装是可更改的。.obtrude*()
方法会修改它的内容,若是你肯定没有人会调用到这类方法,那么重用它仍是安全的。
Stream - 不能复用。Java Doc 注释道:
A stream should be operated on (invoking an intermediate or terminal stream operation) only once. A stream implementation may throw IllegalStateException if it detects that the stream is being reused. However, since some stream operations may return their receiver rather than a new stream object, it may not be possible to detect reuse in all cases.
(译者注:Stream 只能被调用一次。若是被校测到流被重复使用了,它会跑出抛出一个 IllegalStateException 异常。可是某些流操做会返回他们的接受者,而不是一个新的流对象,因此没法在全部状况下检测出是否能够重用)
Optional - 彻底可重用,由于它是不可变对象,并且全部操做都是马上执行的。
Observable, Flowable, Flux - 生而重用,专门设计成如此。当存在订阅者时,每一次执行都会从初始点开始完整地执行一边。
CompletableFuture - 这个类的要点在于它异步地把多个操做链接了起来。CompletableFuture
表明一项操做,它会跟一个 Executor
关联起来。若是不明确指定一个 Executor
,那么会默认使用公共的 ForkJoinPool
线程池来执行。这个线程池能够用 ForkJoinPool.commonPool()
获取到。默认设置下它会建立系统硬件支持的线程数同样多的线程(一般和 CPU 的核心数相等,若是你的 CPU 支持超线程(hyperthreading),那么会设置成两倍的线程数)。不过你也可使用 JVM 参数指定 ForkJoinPool 线程池的线程数,
-Djava.util.concurrent.ForkJoinPool.common.parallelism=?
复制代码
或者在建立 CompletableFuture
时提供一个指定的 Executor。
Stream - 不支持建立异步执行流程,可是可使用 stream.parallel()
等方式建立并行流。
Optional - 不支持,它只是一个容器。
Observable, Flowable, Flux - 专门设计用以构建异步系统,但默认状况下是同步的。subscribeOn
和 observeOn
容许你来控制订阅以及接收(这个线程会调用 observer 的 onNext
/ onError
/ onCompleted
方法)。
subscribeOn
方法使得你能够决定由哪一个 Scheduler
来执行 Observable.create
方法。即使你没有调用建立方法,系统内部也会作一样的事情。例如:
Observable
.fromCallable(() -> {
log.info("Reading on thread: " + currentThread().getName());
return readFile("input.txt");
})
.map(text -> {
log.info("Map on thread: " + currentThread().getName());
return text.length();
})
.subscribeOn(Schedulers.io()) // <-- setting scheduler
.subscribe(value -> {
log.info("Result on thread: " + currentThread().getName());
});
复制代码
输出:
Reading file on thread: RxIoScheduler-2
Map on thread: RxIoScheduler-2
Result on thread: RxIoScheduler-2
复制代码
相反的,observeOn()
控制在 observeOn()
以后,用哪一个 Scheduler
来运行下游的执行阶段。例如:
Observable
.fromCallable(() -> {
log.info("Reading on thread: " + currentThread().getName());
return readFile("input.txt");
})
.observeOn(Schedulers.computation()) // <-- setting scheduler
.map(text -> {
log.info("Map on thread: " + currentThread().getName());
return text.length();
})
.subscribeOn(Schedulers.io()) // <-- setting scheduler
.subscribe(value -> {
log.info("Result on thread: " + currentThread().getName());
});
复制代码
输出:
Reading file on thread: RxIoScheduler-2
Map on thread: RxComputationScheduler-1
Result on thread: RxComputationScheduler-1
复制代码
可缓存和可复用之间的区别是什么?假如咱们有 pipeline A
,重复使用它两次,来建立两个新的 pipeline B = A + X
以及 C = A + Y
CompletableFuture - 跟可重用的答案同样。
Stream - 不能缓存中间操做的结果,除非调用了终止操做。
Optional - 可缓存,全部操做马上执行,而且进行了缓存。
Observable, Flowable, Flux - 默认不可缓存的,可是能够调用 .cache()
把这些类变成可缓存的。例如:
Observable<Integer> work = Observable.fromCallable(() -> {
System.out.println("Doing some work");
return 10;
});
work.subscribe(System.out::println);
work.map(i -> i * 2).subscribe(System.out::println);
复制代码
输出:
Doing some work
10
Doing some work
20
复制代码
使用 .cache()
:
Observable<Integer> work = Observable.fromCallable(() -> {
System.out.println("Doing some work");
return 10;
}).cache(); // <- apply caching
work.subscribe(System.out::println);
work.map(i -> i * 2).subscribe(System.out::println);
复制代码
输出:
Doing some work
10
20
复制代码
Stream 和 Optional - 拉模型。调用不一样的方法(.get()
, .collect()
等)从 pipeline 拉取结果。拉模型一般和阻塞、同步关联,那也是公平的。当调用方法时,线程会一直阻塞,直到有数据到达。
CompletableFuture, Observable, Flowable, Flux - 推模型。当订阅一个 pipeline ,而且某些事件被执行后,你会获得通知。推模型一般和非阻塞、异步这些词关联在一块儿。当 pipeline 在某个线程上执行时,你能够作任何事情。你已经定义了一段待执行的代码,当通知到达的时候,这段代码就会在下个阶段被执行。
支持回压的前提是 pipeline 必须是推模型。
Backpressure(回压) 描述了 pipeline 中的一种场景:某些异步阶段的处理速度跟不上,须要告诉上游生产者放慢速度。直接失败是不能接受的,这会致使大量数据的丢失。
Stream & Optional - 不支持回压,由于它们是拉模型。
CompletableFuture - 不存在这个问题,由于它只产生 0 个或者 1 个结果。
Observable(RxJava 1), Flowable, Flux - 支持。经常使用策略以下:
Buffering - 缓冲全部的 onNext
的值,直到下游消费它们。
Drop Recent - 若是下游处理速率跟不上,丢弃最近的 onNext
值。
Use Latest - 若是下游处理速率跟不上,只提供最近的 onNext
值,以前的值会被覆盖。
None - onNext
事件直接被触发,不作缓冲和丢弃。
Exception - 若是下游处理跟不上的话,抛出异常。
Observable(RxJava 2) - 不支持。不少 RxJava 1 的使用者用 Observable
来处理不适用回压的事件,或者是使用 Observable
的时候没有配置任何策略,致使了不可预知的异常。因此,RxJava 2 明确地区分两种状况,提供支持回压的 Flowable
和不支持回压的 Observable
。
操做融合的内涵在于,它使得生命周期的不一样点上的执行阶段得以改变,从而消除类库的架构因素所形成的系统开销。全部这些优化都在内部被处理完毕,从而让外部用户以为这一切都是透明的。
只有 RxJava 2 和 Reactor 支持这个特性,但支持的方式不一样。总的来讲,有两种类型的优化:
Macro-fusion - 用一个操做替换 2 个或更多的相继的操做
Micro-fusion - 一个输出队列的结束操做,和在一个输入队列的开始操做,可以共享一个队列的实例。好比说,与其调用 request(1)
而后处理 onNext()`:
否则让订阅者直接从父 observable
拉取值。
一图胜千言
Stream
,CompletableFuture
和 Optional
这些类的建立,都是为了解决特定的问题。 而且他们很是适合用于解决这些问题。 若是它们知足你的需求,你能够立马使用它们。
然而,不一样的问题具备不一样的复杂度,而且某些问题只有新技术才能很好的解决,新技术的出现也是为了解决那些高复杂度的问题。 RxJava 和 Reactor 是通用的工具,它们帮助你以声明方式来解决问题,而不是使用那些不够专业的工具,生搬硬套的使用其余的工具来解决响应式编程的问题,只会让你的解决方案变成一种 hack 行为。
欢迎关注个人微信公众号:「Kirito的技术分享」,关于文章的任何疑问都会获得回复,带来更多 Java 相关的技术分享。