转载请注明原文地址:juejin.im/editor/post…react
原文标题:Reactive Streams and Kotlin Flowsgit
原文做者:Roman Elizarovgithub
原文发布于:2019-06-10编程
响应式扩展(简称为 ReactiveX 或者 Rx)这一律念,由 Erik Meijer 首次在 .NET 中提出,并于 2010 公之于众。它是一个新的异步数据流 API 的构建趋势,即采用包含发射元素(onNext)、流执行完成(onCompleted)、出现错误(onError)的回调,同时引入了像 map 和 filter 等的流式处理操做符,让流的数据处理变得和使用集合同样容易。并发
基于观察者的 Rx 提供了更优的性能比传统基于迭代的数据处理 API。此外,Rx 主张使用“冷流”这一思想。它在那时是一个十分新颖的思想,由于在当时命令式语言处于业界主流,大多数的数据处理 API 都是“热的”。在资源管理方面,“热流”有不少弊端(一旦你打开它,你必须不要忘记关闭它),然而“冷流”提供了一个优雅的解决方案(可参考文章:Cold flows, hot channels)。异步
当下 Rx 不断普及同时它被移植到了许多编程语言中,其中就包括 Java ,所以它就能够运用在最大的编程环境 JVM 中了。在 2013 年,Rx 移植到 Java 被称为 RxJava。于 2014 年发布了 1.0 稳定版本。编程语言
期间,在 JVM 平台还有另外两个项目 Akka 和 Project Reactor ,它们是致力于基于事件的异步系统,然而它们都面临一个共同的控制流问题 —— 背压。这样就促成了一次合做,为了提供一个标准的接口集合来解决 JVM 使用响应式数据流伴随的背压问题。它被称为“响应式流” 新方案,同时 Viktor Klang 发布了一个重大的会谈记录下了它的历史进程。响应式流的 1.0 版本于 2015 年发布。异步编程
响应式流是一个使人钦佩的工程。它为 JVM 世界带来了支持背压的异步事件流,不然它很难支持异步性。它是一个纯基于库的壮举,引入大量的必须严格遵循的契约。本文再也不详述这些,当你使用一些众所周知的专家构建的操做符,它就能够完美的运行,可是若是你本身去写响应式流的操做符,要遵照全部的这些契约是一个巨大的挑战。函数
在 2018 年 Kotlin 编程语言发布了协程 ,做为一个专门针对异步编程的语言广泛特征。在 Kotlin 中,“挂起”(suspension)这个概念,是一个与生俱来的流控制方案。把它与基于观察者的 Rx 的具备“冷流”思想相结合,就能够深刻理解 Kotlin Flows 了。post
咱们致力于 Kotlin Flows 的一系列工做就是以实现一个简单的设计为目标。这个设计就是,仅仅只须要不多的基本构建单元,就能够编写你本身的操做符。好比:想要为没一个值都延迟一秒 ?没问题,使用基本的 flow 构建者,以及 collect 函数,就能实现:
fun <T> Flow<T>.delayASecond() = flow {
collect { value -> // collect from the original flow
delay(1000) // delay 1 second
emit(value) // emit value to the resulting flow
}
}
复制代码
你并不会看到显示任何有关处理背压的代码,由于它已经自动的在幕后完成了,这一切都归功于 Kotlin 编译器提供了对“挂起”(suspension)的支持。
从无到有的设计 Kotlin Flows,使得咱们也有机会来减小以前响应式流有关的一些模板代码。好比:当订阅一个响应式流,你最终会持有一个 “订阅”(Subscription)对象的引用,你必须很当心的管理它,以便你能够取消这个订阅,不然你可能有泄漏它的风险。这个问题和并发性结构正在解决的问题是很是相似的,而在 Flow 的设计中你不用担忧这些,你并不会由于不当心致使泄漏一个订阅。
Kotlin Flow 没有任何订阅的概念。挂起和轻量级的协程前来救援。Flow 的 collect 操做符最像一个订阅,可是它仅仅是一个挂起的函数,归功于并发性结构,只要不是滥用它,调用它是很难引起泄漏的。
collect 操做符是基于挂起的设计,所以再也不须要每次都单独设置 onError 和 onCompleted 回调。(译者注:能够回想一下 Kotlin 的 suspend 函数,它的设计初衷就是不用回调,让异步代码能够像同步代码通常无二的编写。)想要在流正常完成后执行一些操做 ?那么就在 collect 正常完成后作就行了:
fun <T> Flow<T>.onCompleted(action: () -> Unit) = flow {
// reemit all values from the original flow
collect { value -> emit(value) }
// this code runs only after the normal completion
action()
}
复制代码
在咱们设计期间,经过研究现有的响应式流应用代码,给咱们带来了极大的好处。例如,咱们看到涉及执行上下文切换都有共同的代码模式(无处不在的 subcribeOn / observeOn),从而仅仅采用一个简单的 flowOn 操做符,来完成了一样的机制(详情可参考:Execution context of Kotlin Flows)。
为此,在核心库中,咱们还能够奢侈的不去实现全部能够想到的操做符。咱们仅仅挑选了最流行和最基本的来实现,由于 Kotlin 支持扩展函数(参见:Extension-oriented design),把它与 Flow 的简化设计相结合,能够很容易的建立自定义操做符,就如同使用内置的操做符通常无二。
Kotlin Flows 仍然是响应式流的理念。尽管它们是基于挂起的并且没有直接实现相关的接口,可是它这样的设计,可让它直截了当的与基于响应式流的系统集成起来。咱们提供了开箱便可用的 flow.asPublisher() 扩展函数来把 Flow 转换为响应式流的 Publisher 接口,以及 publisher.asFlow() 扩展来实现反向转换。