kotlinx-coroutines-core 1.4.2版本出来了,没有了以前的实验方法警告,终于能够能够愉快的玩耍了,如今准备把有关flow的相关资料整理汇总一下java
A cold asynchronous data stream that sequentially emits values and completes normally or with an exception。
复制代码
意思是:按顺序发出值并正常完成或异常完成的Cold异步数据流。markdown
与rxjava做用相似,可能会在之后的开发中逐步代替rxjava,使整个开发生态更加趋向一体化异步
emptyFlow<String>()
复制代码
flowOf(1, 2, 3)
// 1, 2, 3
flowOf(listOf(1,2,3))
// [1, 2, 3]
复制代码
listOf(1, 2, 3).asFlow()
// 1, 2, 3
复制代码
fun flowBuilderFunction(): Int {
return 10
}
::flowBuilderFunction.asFlow()
// 10
复制代码
suspend fun flowBuilderFunction(): Int {
return 10
}
::flowBuilderFunction.asFlow()
// 10
复制代码
LongRange(1, 5).asFlow().collect { value -> println(value) }
复制代码
debounceasync
特性:函数
最后一个值不受影响,老是会被释放emit。 [timeout]能够传毫秒,也能够传Durationui
flow {
emit(1)
delay(3000)
emit(2)
delay(1000)
emit(3)
delay(1000)
emit(4)
}.debounce(2000)
// 结果:1 4
// 解释:
// 2和1的间隔大于2000,1被释放
// 3和2的间隔小于2000, 2被忽略
// 4和3的间隔小于2000, 3被忽略
// 4是最后一个值不受timeout值的影响, 4被释放
flow {
emit(1)
delay(3000)
emit(2)
delay(1000)
emit(3)
delay(1000)
emit(4)
}.debounce(2000.milliseconds)
// 结果:1 4
应用:可用于搜索框的反复输入内容筛选
复制代码
distinctUntilChangedspa
1.若是生产的值和上个发送的值相同,值就会被过滤掉code
flow {
emit(1)
emit(1)
emit(2)
emit(2)
emit(3)
emit(4)
}.distinctUntilChanged()
// 结果:1 2 3 4
// 解释:
// 第一个1被释放
// 第二个1因为和第一个1相同,被过滤掉
// 第一个2被释放
// 第二个2因为和第一个2相同,被过滤掉
// 第一个3被释放
// 第一个4被释放
复制代码
private class Person(val age: Int, val name: String)
flow {
emit(Person(20, "张三"))
emit(Person(21, "李四"))
emit(Person(21, "王五"))
emit(Person(22, "赵六"))
}.distinctUntilChanged{old, new -> old.age == new.age }
.collect{ value -> println(value.name) }
// 结果:张三 李四 赵六
// 解释:本例子定义若是年龄相同就认为是相同的值,因此王五被过滤掉了
复制代码
flow {
emit(Person(20, "张三"))
emit(Person(21, "李四"))
emit(Person(21, "王五"))
emit(Person(22, "赵六"))
}.distinctUntilChangedBy { person -> person.age }
// 结果:张三 李四 赵六
复制代码
transformorm
对每一个值进行转换协程
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.transform {
if (it % 2 == 0) {
emit(it * it)
}
}
// 结果:4 16
// 解释:
// 1 不是偶数,被忽略
// 2 是偶数,2的平方4
// 3 不是偶数,被忽略
// 4 是偶数,4的平方16
复制代码
onStart
第一个值被释放以前被执行
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.onStart { emit(1000) }
// 结果:1000 1 2 3 4
// 解释:
// 第一个值1被释放的时候调用了emit(10 00), 因此1000在1以前被释放
复制代码
onCompletion
最后一个值释放完成以后被执行
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.onCompletion { emit(1000) }
// 结果:1 2 3 4 1000
// 解释:
// 第一个值4被释放的时候调用了emit(100 0), 因此1000在4以后被释放
复制代码
drop
忽略最开始的[count]个值
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.drop(2)
// 结果:3 4
// 解释:
// 最开始释放的两个值(1,2)被忽略了
复制代码
dropWhile
判断第一个值若是知足(T) -> Boolean这个条件就忽略
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.dropWhile {
it % 2 == 0
}
// 结果:1 2 3 4
// 解释:
// 第一个值不是偶数,因此1被释放
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.dropWhile {
it % 2 != 0
}
// 结果:2 3 4
// 解释:
// 第一个值是偶数,因此1被忽略
复制代码
take
只释放前面[count]个值
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.take(2)
// 结果:1 2
// 解释:
// 前面两个值被释放
复制代码
takeWhile
判断第一个值若是知足(T) -> Boolean这个条件就释放
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.takeWhile { it%2 != 0 }
// 结果:1
// 解释:
// 第一个值知足是奇数条件
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.takeWhile { it%2 == 0 }
// 结果:无
// 解释:
// 第一个值不知足是奇数条件
复制代码
flowOn
能够切换CoroutineContext 说明:flowOn只影响该运算符以前的CoroutineContext,对它以后的CoroutineContext没有任何影响
buffer
将flow的多个任务分配到不一样的协程中去执行,加快执行的速度。
conflate
若是值的生产速度大于值的消耗速度,就忽略掉中间将来得及处理的值,只处理最新的值。
val flow1 = flow {
delay(2000)
emit(1)
delay(2000)
emit(2)
delay(2000)
emit(3)
delay(2000)
emit(4)
}.conflate()
flow1.collect { value ->
println(value)
delay(5000)
}
// 结果: 1 3 4
// 解释:
// 2000毫秒后生产了1这个值,交由collect 执行,花费了5000毫秒,当1这个值执行co llect完成后已经通过了7000毫秒。
// 这7000毫秒中,生产了2,可是collect还 没执行完成又生产了3,因此7000毫秒之后 会直接执行3的collect方法,忽略了2这 个值
// collect执行完3后,还有一个4,继续执 行。
复制代码
将原始的Flow<T>经过[transform]转换成Flow<Flow<T>>,而后将Flow<Flow<T>>释放的Flow<T>其中释放的值一个个释放。
flow {
delay(1000)
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(1000)
emit(4)
}.flatMapConcat {
flow {
emit("$it 产生第一个flow值")
delay(2500)
emit("$it 产生第二个flow值")
}
}.collect { value ->
println(value)
}
// 结果
// I/System.out: 1 产生第一个flow值
// I/System.out: 1 产生第二个flow值
// I/System.out: 2 产生第一个flow值
// I/System.out: 2 产生第二个flow值
// I/System.out: 3 产生第一个flow值
// I/System.out: 3 产生第二个flow值
// I/System.out: 4 产生第一个flow值
// I/System.out: 4 产生第二个flow值
// 解释:
// 原始Flow<Int>经过flatMapConcat被转换成Flow<Flow<Int>>
// 原始Flow<Int>首先释放1,接着Flow<Flow<Int>> 就会释放 1产生第一个flow值 和 1产生第二个flow值 两个值
// Flow<Int>释放2,...
// Flow<Int>释放3,...
// Flow<Int>释放4,...
复制代码
flattenConcat
和flatMapConcat相似,只是少了一步Map操做。
flow {
delay(1000)
emit(flow {
emit("1 产生第一个flow值")
delay(2000)
emit("1 产生第二个flow值") })
delay(1000)
emit(flow {
emit("2 产生第一个flow值")
delay(2000)
emit("3 产生第二个flow值") })
delay(1000)
emit(flow {
emit("3 产生第一个flow值")
delay(2000)
emit("3 产生第二个flow值") })
delay(1000)
emit(flow {
emit("4 产生第一个flow值")
delay(2500)
emit("4 产生第二个flow值") })
}.flattenConcat()
// 结果
// I/System.out: 1 产生第一个flow值
// I/System.out: 1 产生第二个flow值
// I/System.out: 2 产生第一个flow值
// I/System.out: 2 产生第二个flow值
// I/System.out: 3 产生第一个flow值
// I/System.out: 3 产生第二个flow值
// I/System.out: 4 产生第一个flow值
// I/System.out: 4 产生第二个flow值
复制代码
flatMapMerge
将原始的Flow经过[transform]转换成Flow<Flow>,而后将Flow<Flow>释放的Flow其中释放的值一个个释放。 它与flatMapConcat的区别是:Flow<Flow>释放的Flow其中释放的值没有顺序性,谁先产生谁先释放。
flow {
delay(1000)
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(1000)
emit(4)
}.flatMapMerge {
flow {
emit("$it 产生第一个flow值")
delay(2500)
emit("$it 产生第二个flow值")
}
}.collect { value ->
println(value)
}
复制代码
merge
将Iterable<Flow>合并成一个Flow
val flow1 = listOf(
flow {
emit(1)
delay(500)
emit(2)
},
flow {
emit(3)
delay(500)
emit(4)
},
flow {
emit(5)
delay(500)
emit(6)
}
)
flow1.merge().collect { value -> println("$value") }
// 结果: 1 3 5 2 4 6
// 解释:
// 按Iterable的顺序和耗时顺序依次释放值
复制代码
transformLatest
原始flow会触发transformLatest转换后的flow, 当原始flow有新的值释放后,transformLatest转换后的flow会被取消,接着触发新的转换后的flow
flatMapLatest
和transformLatest相似, 原始flow会触发transformLatest转换后的flow, 当原始flow有新的值释放后,transformLatest转换后的flow会被取消,接着触发新的转换后的flow
区别:flatMapLatest的transform转换成的是Flow, transformLatest的transform转换成的是Unit
mapLatest
和transformLatest相似, 原始flow会触发transformLatest转换后的flow, 当原始flow有新的值释放后,transformLatest转换后的flow会被取消,接着触发新的转换后的flow
区别:mapLatest的transform转换成的是T,flatMapLatest的transform转换成的是Flow,transformLatest的transform转换成的是Unit
filter
经过predicate进行过滤,知足条件则被释放
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.filter { it % 2 == 0 }
// 结果: 2 4
// 解释:
// 2和4知足it % 2 == 0,被释放
复制代码
filterNot
经过predicate进行过滤,不知足条件则被释放
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.filterNot { it % 2 == 0 }
// 结果: 1 3
// 解释:
// 1和3不知足it % 2 == 0,被释放
复制代码
filterIsInstance
若是是某个数据类型则被释放
flow {
emit(1)
emit("2")
emit("3")
emit(4)
}.filterIsInstance<String>()
// 结果: "2" "3"
// 解释:
// "2" "3"是String类型,被释放
复制代码
filterNotNull
若是数据是非空,则被释放
flow {
emit(1)
emit("2")
emit("3")
emit(null)
}.filterNotNull()
// 结果: 1 "2" "3"
复制代码
map
将一个值转换成另一个值
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.map { it * it }
// 结果: 1 4 9 16
// 解释:
// 将1,2,3,4转换成对应的平方数
复制代码
mapNotNull
将一个非空值转换成另一个值
withIndex
将值封装成IndexedValue对象
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.withIndex()
// 结果:
// I/System.out: IndexedValue(index=0, value=1)
// I/System.out: IndexedValue(index=1, value=2)
// I/System.out: IndexedValue(index=2, value=3)
// I/System.out: IndexedValue(index=3, value=4)
复制代码
onEach
每一个值释放的时候能够执行的一段代码
scan
有一个初始值,而后每一个值都和初始值进行运算,而后这个值做为后一个值的初始值
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.scan(100) { acc, value ->
acc * value
}
// 结果: 100 100 200 600 2400
// 解释:
// 初始值 100
// 1 100 * 1 = 100
// 2 100 * 2 = 200
// 3 200 * 3 = 600
// 4 600 * 4 = 2400
复制代码
runningReduce
和scan相似,可是没有初始值,最开始是它自己
flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.runningReduce { acc, value ->
acc * value
}
// 结果: 1 2 6 24
// 解释:
// 1 1
// 2 1 * 2 = 2
// 3 2 * 3 = 6
// 4 6 * 4 = 24
复制代码
zip
将两个Flow在回调函数中进行处理返回一个新的值 R 当两个flow的长度不等时只发送最短长度的事件
val nums = (1..4).asFlow()
val strs = flowOf("one", "two", "three")
nums.zip(strs) { a, b -> "$a -> $b" }
.collect { println(it) }
// 结果:
1 -> one
2 -> two
3 -> three
复制代码
combine
任意一个flow释放值且都有释放值后会调用combine后的代码块,且值为每一个flow的最新值。 和zip的区别: 组合两个流,在通过第一次发射之后,任意方有新数据来的时候就能够发射,另外一方有多是已经发射过的数据
val flow1 = flowOf(1, 2, 3, 4).onEach { delay(10) }
val flow2 = flowOf("a", "b", "c", "d").onEach { delay(20) }
flow1.combine(flow2) { first, second ->
"$first$second"
}.collect { println("$it") }
// 结果:1a 2a 2b 3b 4b 4c 4d
// 解释:
// 开始 --- flow1 释放 1,flow2 释放 a, 释放1a
// 10毫秒 --- flow1 释放 2,释放2a
// 20毫秒 --- flow2 释放 b,此时释放2b
// 30毫秒 --- flow1 释放 3,此时释放3b
// 40毫秒 --- flow1 释放 4,此时释放4b
// 40毫秒 --- flow2 释放 c,此时释放4c
// 60毫秒 --- flow2 释放 d,此时释放4d
复制代码
public fun <T> Flow<T>.retry( retries: Long = Long.MAX_VALUE, // 重试次数 predicate: suspend (cause: Throwable) -> Boolean = { true } ): Flow<T> 复制代码
public fun <T> Flow<T>.retryWhen( predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean ): Flow<T>
复制代码
Collect相关的末端操做符
collect
接收值
launchIn
scope.launch { flow.collect() }的缩写, 表明在某个协程上下文环境中去接收释放的值
val flow1 = flow {
delay(1000)
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(1000)
emit(4)
}
flow1.onEach { println("$it") }
.launchIn(GlobalScope)
// 结果:1 2 3 4
复制代码
collectIndexed
和withIndex对应的,接收封装的IndexedValue
val flow1 = flow {
emit(1)
emit(2)
emit(3)
emit(4)
}.withIndex()
flow1.collectIndexed { index, value ->
println("index = $index, value = $value")
}
// 结果:
// I/System.out: index = 0, value = IndexedValue(index=0, value=1)
// I/System.out: index = 1, value = IndexedValue(index=1, value=2)
// I/System.out: index = 2, value = IndexedValue(index=2, value=3)
// I/System.out: index = 3, value = IndexedValue(index=3, value=4)
复制代码
collectLatest
collectLatest与collect的区别是,若是有新的值释放,上一个值的操做若是没执行完则将会被取消
val flow1 = flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(2000)
emit(4)
}
flow1.collectLatest {
println("正在计算收到的值 $it")
delay(1500)
println("收到的值 $it")
}
// 结果:
// I/System.out: 正在计算收到的值 1
// I/System.out: 正在计算收到的值 2
// I/System.out: 正在计算收到的值 3
// I/System.out: 收到的值 3
// I/System.out: 正在计算收到的值 4
// I/System.out: 收到的值 4
// 解释:
// 1间隔1000毫秒后释放2,2间隔1000毫秒后释放3,这间隔小于须要接收的时间1500毫秒,因此当2和3 到来后,以前的操做被取消了。
// 3和4 之间的间隔够长可以等待执行完毕,4是最后一个值也能执行
复制代码
Collection相关的末端操做符
toList
将释放的值转换成List
flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(2000)
emit(4)
}
println(flow1.toList())
// 结果:[1, 2, 3, 4]
复制代码
toSet
将释放的值转换成Set
flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(2000)
emit(4)
}
println(flow1.toSet())
// 结果:[1, 2, 3, 4]
复制代码
Count相关的末端操做符
count
1.计算释放值的个数
val flow1 = flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(2000)
emit(4)
}
println(flow1.count())
// 结果:4
2.计算知足某一条件的释放值的个数
val flow1 = flow {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(2000)
emit(4)
}
println(flow1.count { it % 2 == 0 })
// 结果:2
// 解释:
// 偶数有2个值 2 4
```
复制代码
Reduce相关的末端操做符
reduce
和runningReduce相似,可是只计算最后的结果。
val flow1 = flow {
emit(1)
emit(2)
emit(3)
emit(4)
}
println(flow1.reduce { acc, value -> acc * value })
// 结果:24
// 解释:计算最后的结果,1 * 2 * 3 * 4 = 24
复制代码
fold
和scan相似,有一个初始值,可是只计算最后的结果。
val flow1 = flow {
emit(1)
emit(2)
emit(3)
emit(4)
}
println(flow1.fold(100) { acc, value -> acc * value })
// 结果:2400
// 解释:计算最后的结果,100 * 1 * 2 * 3 * 4 = 2400
复制代码
single
只接收一个值的Flow 注意:多于1个或者没有值都会报错
val flow1 = flow {
emit(1)
}
println(flow1.single())
// 结果:1
复制代码
singleOrNull
接收一个值的Flow或者一个空值的Flow
first/firstOrNull
val flow1 = flow {
emit(1)
emit(2)
emit(3)
emit(4)
}
println(flow1.first())
// 结果:1
```
2. 接收第一个知足某个条件的值
val flow1 = flow {
emit(1)
emit(2)
emit(3)
emit(4)
}
println(flow1.first { it % 2 == 0})
// 结果:2
```
复制代码
能够经过 try catch 捕获错误异常
try {
flow {
for (i in 1..3) {
emit(i)
}
}.collect {
println("接收值 $it")
check(it <= 1) { "$it 大于1" }
}
} catch (e: Throwable) {
println("收到了异常: $e")
}
// 结果:
// I/System.out: 接收值 1
// I/System.out: 接收值 2
// I/System.out: 收到了异常: java.lang.IllegalStateException: 2 大于1
// 解释:
// 收到2的时候就抛出了异常,让后flow被取消,异常被捕获
复制代码
经过catch函数
catch函数可以捕获以前产生的异常,以后的异常没法捕获。
flow {
for (i in 1..3) {
emit(i)
}
}.map {
check(it <= 1) { "$it 大于1" }
it
}
.catch { e -> println("Caught $e") }
.collect()
// 结果:
// Caught java.lang.IllegalStateException: 2 大于1
复制代码
CoroutineScope.cancel
GlobalScope.launch {
val flow1 = flow {
for(i in 1..4){
emit(i)
}
}
flow1.collect { value ->
println("$value")
if (value >= 3) {
cancel()
}
}
}
// 结果:1 2 3
复制代码
流取消检测
在协程处于繁忙循环的状况下,必须明确检测是否取消。 能够添加 .onEach { currentCoroutineContext().ensureActive() }, 可是这里提供了一个现成的 cancellable 操做符来执行此操做:
(1..5).asFlow().cancellable().collect { value ->
if (value == 3) cancel()
println(value)
}
复制代码