『译』Kotlin Flows 与 Reactive Streams —— 青出于蓝的历史

转载请注明原文地址:juejin.im/editor/post…react

原文标题Reactive Streams and Kotlin Flowsgit

原文做者Roman Elizarovgithub

原文发布于:2019-06-10编程

图片来自于Darron Birgenheier

响应式扩展(简称为 ReactiveX 或者 Rx)这一律念,由 Erik Meijer 首次在 .NET 中提出,并于 2010 公之于众。它是一个新的异步数据流 API 的构建趋势,即采用包含发射元素(onNext)、流执行完成(onCompleted)、出现错误(onError)的回调,同时引入了像 map 和 filter 等的流式处理操做符,让流的数据处理变得和使用集合同样容易。并发

基于观察者的 Rx 提供了更优的性能比传统基于迭代的数据处理 API。此外,Rx 主张使用“冷流”这一思想。它在那时是一个十分新颖的思想,由于在当时命令式语言处于业界主流,大多数的数据处理 API 都是“热的”。在资源管理方面,“热流”有不少弊端(一旦你打开它,你必须不要忘记关闭它),然而“冷流”提供了一个优雅的解决方案(可参考文章:Cold flows, hot channels)。异步

当下 Rx 不断普及同时它被移植到了许多编程语言中,其中就包括 Java ,所以它就能够运用在最大的编程环境 JVM 中了。在 2013 年,Rx 移植到 Java 被称为 RxJava。于 2014 年发布了 1.0 稳定版本。编程语言

响应式流(Reactive Streams)

期间,在 JVM 平台还有另外两个项目 AkkaProject Reactor ,它们是致力于基于事件的异步系统,然而它们都面临一个共同的控制流问题 —— 背压。这样就促成了一次合做,为了提供一个标准的接口集合来解决 JVM 使用响应式数据流伴随的背压问题。它被称为“响应式流” 新方案,同时 Viktor Klang 发布了一个重大的会谈记录下了它的历史进程。响应式流的 1.0 版本于 2015 年发布。异步编程

响应式流是一个使人钦佩的工程。它为 JVM 世界带来了支持背压的异步事件流,不然它很难支持异步性。它是一个纯基于库的壮举,引入大量的必须严格遵循的契约。本文再也不详述这些,当你使用一些众所周知的专家构建的操做符,它就能够完美的运行,可是若是你本身去写响应式流的操做符,要遵照全部的这些契约是一个巨大的挑战。函数

Kotlin Flows

在 2018 年 Kotlin 编程语言发布了协程 ,做为一个专门针对异步编程的语言广泛特征。在 Kotlin 中,“挂起”(suspension)这个概念,是一个与生俱来的流控制方案。把它与基于观察者的 Rx 的具备“冷流”思想相结合,就能够深刻理解 Kotlin Flows 了。post

咱们致力于 Kotlin Flows 的一系列工做就是以实现一个简单的设计为目标。这个设计就是,仅仅只须要不多的基本构建单元,就能够编写你本身的操做符。好比:想要为没一个值都延迟一秒 ?没问题,使用基本的 flow 构建者,以及 collect 函数,就能实现:

fun <T> Flow<T>.delayASecond() = flow {

    collect { value -> // collect from the original flow
        delay(1000) // delay 1 second
        emit(value) // emit value to the resulting flow
    }
}
复制代码

你并不会看到显示任何有关处理背压的代码,由于它已经自动的在幕后完成了,这一切都归功于 Kotlin 编译器提供了对“挂起”(suspension)的支持。

并发性结构

从无到有的设计 Kotlin Flows,使得咱们也有机会来减小以前响应式流有关的一些模板代码。好比:当订阅一个响应式流,你最终会持有一个 “订阅”(Subscription)对象的引用,你必须很当心的管理它,以便你能够取消这个订阅,不然你可能有泄漏它的风险。这个问题和并发性结构正在解决的问题是很是相似的,而在 Flow 的设计中你不用担忧这些,你并不会由于不当心致使泄漏一个订阅。

Kotlin Flow 没有任何订阅的概念。挂起和轻量级的协程前来救援。Flow 的 collect 操做符最像一个订阅,可是它仅仅是一个挂起的函数,归功于并发性结构,只要不是滥用它,调用它是很难引起泄漏的。

collect 操做符是基于挂起的设计,所以再也不须要每次都单独设置 onError 和 onCompleted 回调。(译者注:能够回想一下 Kotlin 的 suspend 函数,它的设计初衷就是不用回调,让异步代码能够像同步代码通常无二的编写。)想要在流正常完成后执行一些操做 ?那么就在 collect 正常完成后作就行了:

fun <T> Flow<T>.onCompleted(action: () -> Unit) = flow {
    // reemit all values from the original flow
    collect { value -> emit(value) }
    // this code runs only after the normal completion
    action()
}
复制代码

前事不忘后事之师

在咱们设计期间,经过研究现有的响应式流应用代码,给咱们带来了极大的好处。例如,咱们看到涉及执行上下文切换都有共同的代码模式(无处不在的 subcribeOn / observeOn),从而仅仅采用一个简单的 flowOn 操做符,来完成了一样的机制(详情可参考:Execution context of Kotlin Flows)。

为此,在核心库中,咱们还能够奢侈的不去实现全部能够想到的操做符。咱们仅仅挑选了最流行和最基本的来实现,由于 Kotlin 支持扩展函数(参见:Extension-oriented design),把它与 Flow 的简化设计相结合,能够很容易的建立自定义操做符,就如同使用内置的操做符通常无二。

集成

Kotlin Flows 仍然是响应式流的理念。尽管它们是基于挂起的并且没有直接实现相关的接口,可是它这样的设计,可让它直截了当的与基于响应式流的系统集成起来。咱们提供了开箱便可用的 flow.asPublisher() 扩展函数来把 Flow 转换为响应式流的 Publisher 接口,以及 publisher.asFlow() 扩展来实现反向转换。

相关文章
相关标签/搜索