简介: Reactive 编程模型有哪些价值?它的原理是什么?如何正确使用?本文做者将根据他学习和使用的经历,分享 Reactive 的概念、规范、价值和原理。欢迎同窗们共同探讨、斧正。html
Reactive 直接翻译的意思式反应式,反应性。咋一看,彷佛不太好懂。java
举个例子:在 Excel 里,C 单元格上设置函数 Sum(A+B),当你改变单元格 A 或者单元格 B 的数值时,单元格 C 的值同时也会发生变化。这种行为就是 Reactive。react
在计算机编程领域,Reactive 通常指的是 Reactive programming。指的是一种面向数据流并传播事件的异步编程范式(asynchronous programming paradigm)。算法
先举个例子你们感觉一下:spring
public static void main(String[] args) { FluxProcessor<Integer, Integer> publisher = UnicastProcessor.create(); publisher.doOnNext(event -> System.out.println("receive event: " + event)).subscribe(); publisher.onNext(1); // print 'receive event: 1' publisher.onNext(2); // print 'receive event: 2' }
代码 1编程
以上例代码(使用 Reactor 类库)为例,publisher 产生了数据流 (1,2),而且传播给了 OnNext 事件, 上例中 lambda 响应了该事件,输出了相应的信息。上例代码中生成数据流和注册/执行 lambda 是在同一线程中,但也能够在不一样线程中。缓存
注:若是上述代码执行逻辑有些疑惑,能够暂时将 lambda 理解成 callback 就能够了。网络
对于 Reactive 如今你应该大体有一点感受了,可是 Reactive 有什么价值,有哪些设计原则,估计你仍是有些模糊。这就是 Reactive Manifesto 要解决的疑问了。多线程
使用 Reactive 方式构建的系统具备如下特征:架构
即时响应性 (Responsive)
只要有可能, 系统就会及时地作出响应。即时响应是可用性和实用性的基石, 而更加剧要的是,即时响应意味着能够快速地检测到问题而且有效地对其进行处理。即时响应的系统专一于提供快速而一致的响应时间, 确立可靠的反馈上限, 以提供一致的服务质量。这种一致的行为转而将简化错误处理、 创建最终用户的信任并促使用户与系统做进一步的互动。
回弹性 (Resilient)
系统在出现失败时依然保持即时响应性。这不只适用于高可用的、 任务关键型系统——任何不具有回弹性的系统都将会在发生失败以后丢失即时响应性。回弹性是经过复制、 遏制、 隔离以及委托来实现的。失败的扩散被遏制在了每一个组件内部, 与其余组件相互隔离, 从而确保系统某部分的失败不会危及整个系统,并能独立恢复。每一个组件的恢复都被委托给了另外一个(外部的)组件, 此外,在必要时能够经过复制来保证高可用性。(所以)组件的客户端再也不承担组件失败的处理。
弹性 (Elastic)
系统在不断变化的工做负载之下依然保持即时响应性。反应式系统能够对输入(负载)的速率变化作出反应,好比经过增长或者减小被分配用于服务这些输入(负载)的资源。这意味着设计上并无争用点和中央瓶颈, 得以进行组件的分片或者复制, 并在它们之间分布输入(负载)。经过提供相关的实时性能指标, 反应式系统能支持预测式以及反应式的伸缩算法。这些系统能够在常规的硬件以及软件平台上实现成本高效的弹性。
消息驱动 (Message Driven)
反应式系统依赖异步的消息传递,从而确保了松耦合、隔离、位置透明的组件之间有着明确边界。这一边界还提供了将失败做为消息委托出去的手段。使用显式的消息传递,能够经过在系统中塑造并监视消息流队列, 并在必要时应用回压, 从而实现负载管理、 弹性以及流量控制。使用位置透明的消息传递做为通讯的手段, 使得跨集群或者在单个主机中使用相同的结构成分和语义来管理失败成为了可能。非阻塞的通讯使得接收者能够只在活动时才消耗资源, 从而减小系统开销。
注:
知道了 Reactive 的概念,特征和价值后,是否有相关的产品或者框架来帮助咱们构建 Reactive 式系统呢?在早些时候有一些类库 (Rxjava 1.x, Rx.Net) 可使用,可是规范并不统一,因此后来 Netfilx, Pivotal 等公司就制定了一套规范指导你们便于实现它(该规范也是受到早期产品的启发),这就是 Reactive Stream 的做用。
Reactive Stream 是一个使用非阻塞 back pressure(回压)实现异步流式数据处理的标准。目前已经在 JVM 和 JavaScript 语言中实现同一套语意的规范;以及尝试在各类涉及到序列化和反序列化的传输协议(TCP, UDP, HTTP and WebSockets)基础上,定义传输 reactive 数据流的网络协议。
The purpose of Reactive Streams is to provide a standard for asynchronous stream processing with non-blocking backpressure.
当遇到未预料数据流时,依然能够在可控资源消耗下保持系统的可用性。
控制在一个异步边界的流式数据交换。例如传递一个数据到另一个线程或者线程池,确保接收方没有 buffer(缓存)任意数量的数据。而 back pressure(回压)是解决这种场景的不可或缺的特性。
此标准只描述经过回压来实现异步流式数据交换的必要的行为和实体,最小接口,例以下方的 Publisher, Subscriber。Reactive Streams 只关注在这些组件之间的流式数据中转,并不关注流式数据自己的组装,分割,转换等行为, 例如 map, zip 等 operator。Reactive Streams 规范包括:
Publisher
产生一个数据流(可能包含无限数据), Subscriber 们能够根据它们的须要消费这些数据。
public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); }
Subscriber
Publisher 建立的元素的接收者。监听指定的事件,例如 OnNext, OnComplete, OnError 等。
publicinterface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); }
Subscription
是 Publisher 和 Subscriber 一对一的协调对象。Subscriber 能够经过它来向 Publisher 取消数据发送或者 request 更多数据。
public interface Subscription { public void request(long n); public void cancel(); }
Processor
同时具有 Publisher 和 Subscriber 特征。代码1中 FluxProcessor 既能够发送数据(OnNext),也能够接收数据 (doOnNext)。
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}
Thread
Callback
Future
Reactive Extensions (Rx)
Coroutines
Reactive 的实现原理我的认为仍是回调,kotlin 协程实现原理一样也是回调。但实现回掉的方式不同。一个是经过事件传播, 一个是经过状态机。但 cooutine 编程的易用性明显强于 Rx,后面有空我会专门写篇文章介绍 kotlin coroutine 的实现原理。
有了 Reactive Stream 这个规范,就会有相应实现该规范的类库。Reactor 就是其中之一。
Reactor 是遵照 Reactive Stream 规范构建非阻塞应用的 Java 语言 Reactive 类库,已经在 spring 5 中集成,与他类似的类库有 RxJava2, RxJs, JDK9 Flow 等。
阿里内部的 Faas 系统目前使用 Reactor 来构建整个系统,包括函数应用和各类核心应用(逻辑架构)。根据咱们压测结果显示,使用 Reactive 方式构建的系统确实会有这些特色:
另外从原理上,我认为资源利用率和吞吐量也会高于非反应式的应用。
阿里内部的 Faas 系统主要作了两件事情:
涉及到 IO 的地方几乎全异步化。例如中间件(HSF, MetaQ 等提供异步 API)调用。
IO 线程模型变化。使用较少(通常 CPU 核数)线程处理全部的请求。
传统 Java 应用 IO 线程模型
参考 Netty 中 Reactor IO (worker thread pool) 模型,下方伪代码(kotlin)进行了简化。
// 非阻塞读取客户端请求数据(in), 读取成功后执行lambda. inChannel.read(in) { workerThreadPool.execute{ // 阻塞处理业务逻辑(process), 业务逻辑在worker线程池中执行,同步执行完后,再向客户端返回输出(out) val out = process(in) outChannel.write(out) } }
Reactive 应用 IO 线程模型
IO 线程也能够执行业务逻辑 (process),能够不须要 worker 线程池。
// 非阻塞读取客户端请求数据(in), 读取成功后执行lambda inChannel.read(in) { // IO线程执行业务逻辑(process), 而后向客户端返回输出(out). 这要求业务处理流程必须是非阻塞的. process(in){ out-> outChannel.write(out) { // this lambda is executed when the writing completes ... } } }
以 Reactive 方式构建的系统有不少值得学习和发挥价值的地方,但坦白讲 Reactive programing 方式目前接受程度并不高。特别是使用 Java 语言开发同窗,我我的也感同身受,由于这和 Java 面向命令控制流程的编程思惟方式有较大差别。因此这里以 Reactor (Java) 学习为例:
反应式的系统有不少优势,可是完整构建反应式的系统却并不容易。不只仅是语言上的差别,还有一些组件就不支持非阻塞式的调用方式,例如:JDBC。可是有一些开源组织正在推进这些技术进行革新,例如:R2DBC。另外,为了方便构建反应式系统,一些组织/我的适配了一些主流技术组件 reactor-core, reactor-netty, reactor-rabbimq, reactor-kafka 等,来方便完整构建反应式系统。
当你的系统从底层到上层,从系统内部到依赖外部都变成了反应式,这就造成了 Reactive 架构。
这种架构价值有多大?将来可期。
参考
https://www.reactivemanifesto.org/
https://www.reactive-streams.org/
https://kotlinlang.org/docs/tutorials/coroutines/async-programming.html
https://projectreactor.io/docs/core/release/reference/index.html