【译】kotlin 协程 Flow:给 RxJava 使用者的介绍

原文:Flow: an intro for an RxJava user
做者:Mohamed Ibrahim
译者:Fly_with24html


RxJava 多是我使用的最重要的库,Rx 一般是编写代码的另外一种范式,Kotlin 做为一种新的编程语言,使它能够轻松实现将协程驱动的 flow 实现为本身的 Rx 实现。 我可能在 Hello Kotlin Coroutines 中介绍了协程,这对于理解 flow 颇有必要java

Kotlin 具备一组扩展,以方便使用集合。 但它不是响应式的git

listOf("Madara", "Kakashi", "Naruto", "Jiraya", "Itachi")
    .map { it.length }
    .filter { it > 4 }
    .forEach {
        println(it)
    }
复制代码

在此示例中,若是您深刻研究 map 函数源代码,您将发现这里没有魔法,它只是列表的循环,进行了一些转换而后为您提供了一个新列表。 过滤器也同样。 这种机制称为 eager evaluation ,该函数在整个列表中进行操做并提供一个新列表。 可是若是咱们不须要建立这些临时列表以节省一些内存,那咱们可使用 Sequencesgithub

listOf("Madara", "Kakashi", "Naruto", "Jiraya", "Itachi")
	// 使用 Sequence
    .asSequence()
    .map { it.length }
    .filter { it > 4 }
    .forEach {
        println(it)
    }
复制代码

这里的区别就是先调用 asSequence 方法,而后使用咱们的操做,再次 查看 map 方法后,咱们发现了一些不一样之处,它只是 sequence 的修饰符,返回值类型也是 sequence 。 使用 sequence map 时,只能一项一项地进行操做。列表较大时,sequence 比普通集合要好得多。sequence 能够同步完成其工做,有没有办法异步使用那些转换运算符呢?答案是 flow编程

flow

若是咱们尝试获取列表并将其用做 flow ,并在流的末尾调用 collect {..},则会收到编译错误。 因为 flow 是基于协程构建的,所以默认状况下它具备异步功能,所以您能够在代码中使用协程时使用它bash

collect {…} 运算符,您能够将其想像为 Rxjava 中的 subscribe异步

流也是 cold stream ,这意味着,直到您调用操做符(如 collect)后,flow 才会被执行。 若是您重复调用 collect ,每次您将得到相同的结果编程语言

所以,Collections 扩展功能仅适用于小数据,sequence 能够节省您没必要要的工做(不建立临时列表),而使用 flow,您能够用协程的强大功能来编写代码。 所以,让咱们学习如何构建它函数

构建 flow

咱们看到 asFlow 方法,它是 Collections 上的扩展函数,可将其转换为 flow,咱们查看一下源码学习

public fun <T> Iterable<T>.asFlow(): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}
复制代码

若是咱们要编写前面的示例在数据源中添加一些逻辑,则只需使用 flow{…} 或者 flowof()

转换操做符

flow 拥有一些列的用于转换的运算符,例如 mapfiltergroupByscan 等等

在由 Coroutines 提供支持的 flow 中,您能够天然地在您的操做符中使用异步代码,假设咱们想要作一些耗时的操做,这里使用延迟一秒钟表示。 使用 RxJava 时,您可使用 flatmap

这里想表达的是 flow 具备更简单的设计,而且与以其陡峭的学习曲线而闻名的 RxJava 相比易于学习,我在此使用 flow 将它简化一下

terminal 操做符

我已经提到 collect() 是 terminal operator,当您在调用它时获得结果,在 RxJava 中,您能够经过调用 subscribe() 来启动它,或者使用阻塞的方式,调用 blockingGet

flow 中的 terminal operator 是须要做用域操做的挂起函数,其余的 operator 例如

  • toList(),toSet -> 返回集合中的全部 item

  • first() -> 仅返回第一个发射

  • reduce(),fold() -> 使用特定操做获取结果

发射数据

为了发射数据,您须要使用一个挂起函数

//fire a coroutine
someScope.launch {
  //fire flow with a terminal operator
  flowSampleData().collect { }
}
复制代码

上面的花括号让人想起了回调,您可使用 launchIn 函数,处理结果可使用 onEach{...}

flowSampleData()
    .onEach {
     //handle emissions
    }
    .launchIn(someScope)
复制代码

取消

每次设置 RxJava 订阅时,咱们都必须取消这些订阅以免内存泄漏或过时的任务在后台运行,RxJava 提供对订阅的引用(disposable)来取消订阅,disposable().dispose() 。若是您在 CompositeDisposable 使用了多个对象,则调用 clear()dispose()

对于 flow 使用特定 scope 的协程则能够无需进行额外的工做来达到此目的

错误处理

RxJava 最有用的功能之一就是处理错误的方式,您可使用此 onError() 函数捕获工做流中的任何错误。 flow 有一个相似的称为 catch {…} ,若是不使用 catch {…} ,则您的代码可能会引起异常或应用崩溃。 您就能够选择使用常规 try catch 或使用 atch {…} 以声明方式进行编码

让咱们模拟一个错误

private fun flowOfAnimeCharacters() = flow {
    emit("Madara")
    emit("Kakashi")
    // 抛出异常
    throw IllegalStateException()
    emit("Jiraya")
    emit("Itachi")
    emit("Naruto")
}
复制代码

使用

runBlocking {
    flowOfAnimeCharacters()
        .map { stringToLength(it) }
        .filter { it > 4 }
        .collect {
            println(it)
        }
}
复制代码

若是咱们运行此代码,它将引起异常,而且如咱们所说,您有两个选项能够处理错误,即常规 try-catchcatch {…}。 这是两种状况下的修改代码

// 使用 try-catch
runBlocking {
    try {
        flowOfAnimeCharacters()
            .map { stringToLength(it) }
            .filter { it > 4 }
            .collect {
                println(it)
            }
    } catch (e: Exception) {
        println(e.stackTrace)
    } finally {
        println("Beat it")
    }
}
复制代码
// 使用 catch{}
runBlocking {
    flowOfAnimeCharacters()
        .map { stringToLength(it) }
        .filter { it > 4 }
         // catch
        .catch { println(it) }
        .collect {
            println(it)
        }
}
复制代码

使用 catch{} 须要注意的是 catch{} 操做符的放置顺序,它要放置在 terminal operator 以前,这样您才能够捕获想要的异常

恢复

若是错误中断了流,而且咱们打算使用完整备份或默认数据恢复流,在 Rxjava 中使用 onErrorResumeNext()onErrorReturn() ,在 flow 中,咱们仍是使用 catch {…},但咱们在其中调用了 emit() 来逐个生成备份,甚至咱们可使用 emitAll() 引入一个全新的 flow,例如若是中途出现了异常,咱们须要“ Minato” 和 “ Hashirama”

runBlocking {
    flowOfAnimeCharacters()
        .catch {
            emitAll(flowOf("Minato", "Hashirama"))
        }
        .collect {
            println(it)
        }
}
复制代码

那么获得的结果是

Madara
Kakashi
Minato
Hashirama
复制代码

flowOn()

默认状况下,flow 数据源将在调用者上下文中运行,若是要更改它,例如,要使 flow 在 IO 而不是 Main 上运行,则使用 flowOn(),并更改上游的上下文,上游是调用 flowOn 以前的所有操做符。 这是一个很好的文档示例

这里的 flowOn() 充当 RxJava 中的两个角色 [subscribeOn() — observeOn()],您能够编写流而后肯定将在哪一个上下文中进行操做

完成

当 flow 完成发射时,您可能须要执行一些操做,onCompletion {…} 能够解决这一问题,而且它肯定 flow 是正常完成仍是异常完成

已知数据源以下

private fun flowOfAnimeCharacters() = flow {
    emit("Madara")
    emit("Kakashi")
    throw IllegalStateException()
    emit("Jiraya")
    emit("Itachi")
    emit("Naruto")
}

复制代码

catch {…} 的工做就是捕获 IllegalStateException() 并从新开始新流程,这使咱们从源头上留下“ Madara”,“ Kakashi”,在后面留下“ Minato”,“ Hashirama”。 可是 onCompletion {…} 会显示错误吗?

答案是否认的,catch 捕获了全部错误,接下来是全新的事情,请记住 onCompletion {…}catch {…} 只是中介程序运算符。 它们的顺序很重要

总结

您可使用 Flow builders 构建 flow,其中最基本的是 flow{…}。 若是要开始该 flow,请调用诸如 collect {…} 之类的 terminal operator,而且因为 terminal operator 是挂起函数,所以须要使用协程构建器 launch {…} 的做用域,或者若是您想要以优雅的风格进行操做, 您能够结合使用 launchIn()onEach {…}。 使用 catch {…} 捕获上游错误,并根据须要提供回退流程。 onCompletion {..} 将在上游完成全部发射以后或发生错误时触发。 默认状况下,全部这些方法都适用于调用程序协程上下文,若是要更改上游上下文,请使用flowOn()

关于我

我是 Fly_with24

相关文章
相关标签/搜索