原文连接 BennyHuo 破解 Kotlin 协程(11)-Flow 篇git
Flow
就是 Kotlin 协程与响应式编程模型结合的产物,你会发现它与 RxJava 很是像,两者之间也有相互转换的 API,使用起来很是方便。github
随着 RxJava 的流行,响应式编程模型逐步深刻人心。Flow
就是 Kotlin 协程与响应式编程模型结合的产物。编程
本文基于 Kotlinx.coroutines1.3.3,因为部分功能尚处于实验阶段,后续也可能会发生细微的调整。缓存
介绍 Flow
以前,咱们先来回顾下序列生成器:安全
代码清单1: 序列生成器bash
val ints = sequence {
(1..3).forEach {
yield(it)
}
}
复制代码
每次访问 ints
的下一个元素的时候它就执行内部的逻辑直到遇到 yield
,若是我但愿在元素之间加个延时呢?数据结构
代码清单2:序列生成器中不能调用其余挂起函数并发
val ints = sequence {
(1..3).forEach {
yield(it)
delay(1000) // ERROR!
}
}
复制代码
受 RestrictsSuspension
注解的约束,delay
不能在 SequenceScope
的扩展成员当中被调用,于是不能在序列生成器的协程体内调用了。框架
假设序列生成器不受这个限制,调用 delay
会致使后续的执行流程的线程发生变化,外部的调用者发如今访问 ints
的下一个元素的时候竟然还会有切换线程的反作用,这个是否是算一个“惊喜”呢?不只如此,我想经过指定调度器来限定序列建立所在的线程,一样是不能够的,咱们甚至没有办法为它设置协程上下文。异步
既然序列生成器有这么多限制,那咱们是时候须要认识一下 Flow
了。它的 API 与序列生成器极为类似:
代码清单3:建立 Flow
val intFlow = flow {
(1..3).forEach {
emit(it)
delay(100)
}
}
复制代码
新元素经过 emit
函数提供,Flow 的执行体内部也能够调用其余挂起函数,这样咱们就能够在每次提供一个新元素后再延时 100ms 了。
Flow 也能够设定它运行时所使用的调度器:
intFlow.flowOn(Dispatchers.IO)
复制代码
经过 flowOn
设置的调度器只对它以前的操做有影响,所以这里意味着 intFlow 的构造逻辑会在 IO
调度器上执行。
最终消费 intFlow
须要调用 collect
函数,这个函数也是一个挂起函数,咱们启动一个协程来消费 intFlow
:
代码清单4: 消费 Flow
GlobalScope.launch(myDispatcher) {
intFlow.flowOn(Dispatchers.IO)
.collect { println(it) }
}.join()
复制代码
为了区分调度器,咱们为协程设置了一个自定义的调度器,它会将协程调度到名叫 MyThread
的线程上,结果以下:
[MyThread] 1
[MyThread] 2
[MyThread] 3
复制代码
RxJava 也是一个基于响应式编程模型的异步框架,它提供了两个切换调度器的 API 分别是 subscribeOn
和 observeOn
:
代码清单5:RxJava 的调度器切换
Observable.create<Int> {
(1..3).forEach { e ->
it.onNext(e)
}
it.onComplete()
}.subscribeOn(Schedulers.io())
.observeOn(Schedulers.from(myExecutor))
.subscribe {
println(it)
}
复制代码
其中 subscribeOn
指定的调度器影响前面的逻辑,observeOn
影响的是后面的逻辑,所以 it.onNext(e)
执行在它的 io
这个调度器上,而最后的 println(it)
执行在经过 myExecutor
建立出来的调度器上。
Flow 的调度器 API 中看似只有 flowOn
与 subscribeOn
对应,其实否则, collect
所在协程的调度器则与 observeOn
指定的调度器对应。
在 RxJava 的学习和使用过程当中, subscribeOn
和 observeOn
常常容易被混淆;而在 Flow 当中 collect
所在的协程天然就是观察者,它想运行在什么调度器上它本身指定便可,很是容易区分。
一个 Flow 建立出来以后,不消费则不生产,屡次消费则屡次生产,生产和消费老是相对应的。
代码清单6:Flow 能够被重复消费
GlobalScope.launch(dispatcher) {
intFlow.collect { println(it) }
intFlow.collect { println(it) }
}.join()
复制代码
intFlow
就是本节最开始咱们建立的 Flow,消费它会输出 1,2,3,重复消费它会重复输出 1,2,3。
这一点其实相似于咱们前面提到的 sequence
和 RxJava 例子,它们也都有本身的消费端。咱们建立一个序列而后去迭代它,每次迭代都会建立一个新的迭代器从头开始迭代;RxJava 的 Observable
也是如此,每次调用它的 subscribe
都会从新消费一次。
所谓冷数据流,就是只有消费时才会生产的数据流,这一点与 Channel
正对应:Channel
的发送端并不依赖于接收端。
说明 RxJava 也存在热数据流,能够经过必定的手段实现冷热数据流的转化。不过相比之下,冷数据流的应用场景更为丰富。
Flow 的异常处理也比较直接,直接调用 catch
函数便可:
代码清单7:捕获 Flow 的异常
flow {
emit(1)
throw ArithmeticException("Div 0")
}.catch { t: Throwable ->
println("caught error: $t")
}
复制代码
咱们在 Flow 的参数中抛了一个异常,在 catch
函数中就能够直接捕获到这个异常。若是没有调用 catch
函数,未捕获异常会在消费时抛出。请注意,catch
函数只能捕获它的上游的异常。
若是咱们想要在流完成时执行逻辑,可使用 onCompletion
:
代码清单8:订阅流的完成
flow {
emit(1)
throw ArithmeticException("Div 0")
}.catch { t: Throwable ->
println("caught error: $t")
}.onCompletion { t: Throwable? ->
println("finally.")
}
复制代码
onCompletion
用起来比较相似于 try ... catch ... finally
中的 finally
,不管前面是否存在异常,它都会被调用,参数 t 则是前面未捕获的异常。
Flow 的设计初衷是但愿确保流操做中异常透明。所以,如下写法是违反 Flow 的设计原则的:
代码清单9:命令式的异常处理(不推荐)
flow {
try {
emit(1)
throw ArithmeticException("Div 0")
} catch (t: Throwable){
println("caught error: $t")
} finally {
println("finally.")
}
}
复制代码
在流操做内部使用 try ... catch ... finally
这样的写法后续可能被禁用。
在 RxJava 当中还有 onErrorReturn
相似的操做:
代码清单10:RxJava 从异常中恢复
val observable = Observable.create<Int> {
...
}.onErrorReturn {
println(t)
10
}
复制代码
捕获异常后,返回 10 做为下一个值。
咱们在 Flow 当中也能够模拟这样的操做:
代码清单11:Flow 从异常中恢复
flow {
emit(1)
throw ArithmeticException("Div 0")
}.catch { t: Throwable ->
println("caught error: $t")
emit(10)
}
复制代码
这里咱们可使用 emit
从新生产新元素出来。细心的读者必定会发现,emit
定义在 FlowCollector
当中,所以只要遇到 Receiver 为 FlowCollector
的函数,咱们就能够生产新元素。
说明 onCompletion 预计在协程框架的 1.4 版本中会被从新设计,以后它的做用相似于 RxJava 中 Subscriber 的 onComplete,即做为整个 Flow 的完成回调使用,回调的参数也将包含整个 Flow 的未捕获异常,参见 GitHub Issue:Breaking change: Experimental Flow.onCompletion contract for cause #1732。
前面的例子当中,咱们用 collect
消费 Flow 的数据。collect
是最基本的末端操做符,功能与 RxJava 的 subscribe
相似。除了 collect
以外,还有其余常见的末端操做符,大致分为两类:
toList
、toSet
等。reduce
、fold
等操做,以及得到单个元素的操做包括 single
、singleOrNull
、first
等。实际上,识别是否为末端操做符,还有一个简单方法,因为 Flow 的消费端必定须要运行在协程当中,所以末端操做符都是挂起函数。
咱们除了能够在 collect
处消费 Flow 的元素之外,还能够经过 onEach
来作到这一点。这样消费的具体操做就不须要与末端操做符放到一块儿,collect
函数能够放到其余任意位置调用,例如:
代码清单12:分离 Flow 的消费和触发
fun createFlow() = flow<Int> {
(1..3).forEach {
emit(it)
delay(100)
}
}.onEach { println(it) }
fun main(){
GlobalScope.launch {
createFlow().collect()
}
}
复制代码
由此,咱们又能够衍生出一种新的消费 Flow 的写法:
代码清单13:使用协程做用域直接触发 Flow
fun main(){
createFlow().launchIn(GlobalScope)
}
复制代码
其中 launchIn
函数只接收一个 CoroutineScope
类型的参数。
Flow 没有提供取消操做,缘由很简单:不须要。
咱们前面已经介绍了 Flow 的消费依赖于 collect
这样的末端操做符,而它们又必须在协程当中调用,所以 Flow 的取消主要依赖于末端操做符所在的协程的状态。
代码清单14:Flow 的取消
val job = GlobalScope.launch {
val intFlow = flow {
(1..3).forEach {
delay(1000)
emit(it)
}
}
intFlow.collect { println(it) }
}
delay(2500)
job.cancelAndJoin()
复制代码
每隔 1000ms 生产一个元素,2500ms 之后协程被取消,所以最后一个元素生产前 Flow 就已经被取消,输出为:
1
▶ 1000ms later
2
复制代码
如此看来,想要取消 Flow 只须要取消它所在的协程便可。
咱们已经知道了 flow { ... }
这种形式的建立方式,不过在这当中没法随意切换调度器,这是由于 emit
函数不是线程安全的:
代码清单15:不能在 Flow 中直接切换调度器
flow { // BAD!!
emit(1)
withContext(Dispatchers.IO){
emit(2)
}
}
复制代码
想要在生成元素时切换调度器,就必须使用 channelFlow
函数来建立 Flow:
channelFlow {
send(1)
withContext(Dispatchers.IO) {
send(2)
}
}
复制代码
此外,咱们也能够经过集合框架来建立 Flow:
listOf(1, 2, 3, 4).asFlow()
setOf(1, 2, 3, 4).asFlow()
flowOf(1, 2, 3, 4)
复制代码
只要是响应式编程,就必定会有背压问题,咱们先来看看背压到底是什么。
背压问题在生产者的生产速率高于消费者的处理速率的状况下出现。为了保证数据不丢失,咱们也会考虑添加缓存来缓解问题:
代码清单16:为 Flow 添加缓冲
flow {
List(100) {
emit(it)
}
}.buffer()
复制代码
咱们也能够为 buffer
指定一个容量。不过,若是咱们只是单纯地添加缓存,而不是从根本上解决问题就始终会形成数据积压。
问题产生的根本缘由是生产和消费速率的不匹配,除直接优化消费者的性能之外,咱们也能够采起一些取舍的手段。
第一种是 conflate
。与 Channel
的 Conflate
模式一致,新数据会覆盖老数据,例如:
代码清单17:使用 conflate 解决背压问题
flow {
List(100) {
emit(it)
}
}.conflate()
.collect { value ->
println("Collecting $value")
delay(100)
println("$value collected")
}
复制代码
咱们快速地发送了 100 个元素,最后接收到的只有两个,固然这个结果每次都不必定同样:
Collecting 1
1 collected
Collecting 99
99 collected
复制代码
第二种是 collectLatest
。顾名思义,只处理最新的数据,这看上去彷佛与 conflate
没有区别,其实区别大了:它并不会直接用新数据覆盖老数据,而是每个都会被处理,只不过若是前一个还没被处理完后一个就来了的话,处理前一个数据的逻辑就会被取消。
仍是前面的例子,咱们稍做修改:
代码清单18:使用 collectLatest 解决背压问题
flow {
List(100) {
emit(it)
}
}.collectLatest { value ->
println("Collecting $value")
delay(100)
println("$value collected")
}
复制代码
运行结果以下:
Collecting 0
Collecting 1
...
Collecting 97
Collecting 98
Collecting 99
▶ 100ms later
99 collected
复制代码
前面的 Collecting
输出了 0 ~ 99 的全部结果,而 collected
却只有 99,由于后面的数据到达时,处理上一个数据的操做正好被挂起了(请注意delay(100)
)。
除 collectLatest
以外还有 mapLatest
、flatMapLatest
等等,都是这个做用。
咱们已经对集合框架的变换很是熟悉了,Flow
看上去极其相似于这样的数据结构,这一点与 RxJava 的 Observable
的表现也基本一致。
例如咱们可使用 map 来变换 Flow 的数据:
代码清单19:Flow 的元素变换
flow {
List(5){ emit(it) }
}.map {
it * 2
}
复制代码
也能够映射成其余 Flow:
代码清单20:Flow 的嵌套
flow {
List(5){ emit(it) }
}.map {
flow { List(it) { emit(it) } }
}
复制代码
这实际上获得的是一个数据类型为 Flow
的 Flow
,若是但愿将它们拼接起来,可使用 flattenConcat
:
代码清单21:拼接 Flow
flow {
List(5){ emit(it) }
}.map {
flow { List(it) { emit(it) } }
}.flattenConcat()
.collect { println(it) }
复制代码
拼接的操做中 flattenConcat
是按顺序拼接的,结果的顺序仍然是生产时的顺序;还有一个是 flattenMerge
,它会并发拼接,所以结果不会保证顺序。
多数状况下,咱们能够经过构造合适的 Flow 来实现多路复用的效果。
上一篇文章破解 Kotlin 协程(10) - Select 篇中对 await 的复用咱们能够用 Flow 实现以下:
代码清单22:使用 Flow 实现对 await 的多路复用
coroutineScope {
val login = "..."
listOf(::getUserFromApi, ::getUserFromLocal) ... ①
.map { function ->
function.call(login) ... ②
}
.map { deferred ->
flow { emit(deferred.await()) } ... ③
}
.merge() ... ④
.onEach { user ->
println("Result: $user")
}.launchIn(this)
}
复制代码
这其中,① 处用建立了两个函数引用组成的 List;② 处调用它们获得 deferred;③ 处比较关键,对于每个 deferred 咱们建立一个单独的 Flow,并在 Flow 内部发送 deferred.await() 返回的结果,即返回的 User 对象;如今咱们有了两个 Flow 实例,咱们须要将它们整合成一个 Flow 进行处理,调用 merge 函数便可。
图1:使用 merge 合并 Flow
一样的,对 Channel 的读取复用的场景也可使用 Flow 来完成。对照破解 Kotlin 协程(10) - Select 篇,咱们给出 Flow 的实现版本:
代码清单23:使用 Flow 实现对 Channel 的复用
val channels = List(10) { Channel<Int>() }
...
val result = channels.map {
it.consumeAsFlow()
}
.merge()
.first()
复制代码
这比 select
的版本看上去要更简洁明了,每一个 Channel 都经过 consumeAsFlow
函数被映射成 Flow,再 merge 成一个 Flow,取第一个元素。
Flow
是协程当中比较重要的异步工具,它的用法与其余相似的响应式编程框架很是相近,你们能够采起类比的学习方式去了解它的功能。
中文官网:www.kotlincn.net/
中文官方博客:www.kotliner.cn/
公众号:Kotlin
知乎专栏:Kotlin
CSDN:Kotlin中文社区
掘金:Kotlin中文社区
简书:Kotlin中文社区