Flow 库是在 Kotlin Coroutines 1.3.2 发布以后新增的库。bash
官方文档给予了一句话简单的介绍:异步
Flow — cold asynchronous stream with flow builder and comprehensive operator set (filter, map, etc);async
Flow 从文档的介绍来看,它有点相似 RxJava 的 Observable。由于 Observable 也有 Cold 、Hot 之分。函数
Flow 可以返回多个异步计算的值,例以下面的 flow builder :post
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.collect{
println(it)
}
复制代码
其中 Flow 接口,只有一个 collect 函数ui
public interface Flow<out T> {
@InternalCoroutinesApi
public suspend fun collect(collector: FlowCollector<T>)
}
复制代码
若是熟悉 RxJava 的话,则能够理解为 collect() 对应subscribe()
,而 emit() 对应onNext()
。spa
除了刚刚展现的 flow builder 能够用于建立 flow,还有其余的几种方式:线程
flowOf()code
flowOf(1,2,3,4,5)
.onEach {
delay(100)
}
.collect{
println(it)
}
复制代码
asFlow()cdn
listOf(1, 2, 3, 4, 5).asFlow()
.onEach {
delay(100)
}.collect {
println(it)
}
复制代码
channelFlow()
channelFlow {
for (i in 1..5) {
delay(100)
send(i)
}
}.collect{
println(it)
}
复制代码
最后的 channelFlow builder 跟 flow builder 是有必定差别的。
flow 是 Cold Stream。在没有切换线程的状况下,生产者和消费者是同步非阻塞的。 channel 是 Hot Stream。而 channelFlow 实现了生产者和消费者异步非阻塞模型。
下面的代码,展现了使用 flow builder 的状况,大体花费1秒:
fun main() = runBlocking {
val time = measureTimeMillis {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.collect{
delay(100)
println(it)
}
}
print("cost $time")
}
复制代码
使用 channelFlow builder 的状况,大体花费700毫秒:
fun main() = runBlocking {
val time = measureTimeMillis{
channelFlow {
for (i in 1..5) {
delay(100)
send(i)
}
}.collect{
delay(100)
println(it)
}
}
print("cost $time")
}
复制代码
固然,flow 若是切换线程的话,花费的时间也是大体700毫秒,跟使用 channelFlow builder 效果差很少。
fun main() = runBlocking {
val time = measureTimeMillis{
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.flowOn(Dispatchers.IO)
.collect {
delay(100)
println(it)
}
}
print("cost $time")
}
复制代码
相比于 RxJava 须要使用 observeOn、subscribeOn 来切换线程,flow 会更加简单。只需使用 flowOn
,下面的例子中,展现了 flow builder 和 map 操做符都会受到 flowOn 的影响。
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.map {
it * it
}.flowOn(Dispatchers.IO)
.collect {
println(it)
}
复制代码
而 collect() 指定哪一个线程,则须要看整个 flow 处于哪一个 CoroutineScope 下。
例如,下面的代码 collect() 则是在 main 线程:
fun main() = runBlocking {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}.map {
it * it
}.flowOn(Dispatchers.IO)
.collect {
println("${Thread.currentThread().name}: $it")
}
}
复制代码
执行结果:
main: 1
main: 4
main: 9
main: 16
main: 25
复制代码
值得注意的地方,不要使用 withContext() 来切换 flow 的线程。
若是 flow 是在一个挂起函数内被挂起了,那么 flow 是能够被取消的,不然不能取消。
fun main() = runBlocking {
withTimeoutOrNull(2500) {
flow {
for (i in 1..5) {
delay(1000)
emit(i)
}
}.collect {
println(it)
}
}
println("Done")
}
复制代码
执行结果:
1
2
Done
复制代码
Flow 的 API 有点相似于 Java Stream 的 API。它也一样拥有 Intermediate Operations、Terminal Operations。
Flow 的 Terminal 运算符能够是 suspend 函数,如 collect、single、reduce、toList 等;也能够是 launchIn 运算符,用于在指定 CoroutineScope 内使用 flow。
@ExperimentalCoroutinesApi // tentatively stable in 1.3.0
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
collect() // tail-call
}
复制代码
整理一下 Flow 的 Terminal 运算符
该系列的相关文章: