关于Collections和Sequence请看:关于kotlin中的Collections、Sequence、Channel和Flow (一) html
Channel
是一个和 BlockingQueue
很是类似的概念。其中一个不一样是它代替了阻塞的 put
操做并提供了挂起的 send
,还替代了阻塞的 take
操做并提供了挂起的 receive
。 Channel
是并发安全的,它能够用来链接协程,实现不一样协程的通讯。android
val channel = Channel<Int>()
//producer
launch(Dispatchers.IO) {
var i = 0
while (true) {
channel.send(i++)
delay(1000)
}
}
//consumer
launch {
while (true) {
println(channel.receive())
}
}
复制代码
既然咱们说 Channel
实际上就是一个队列,队列不该该有缓冲区吗,那么这个缓冲区一旦满了,而且也一直没有人调用 receive
取走元素的话,send
就挂起了。那么接下来咱们看下 Channel
的缓冲区的定义:git
public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
when (capacity) {
RENDEZVOUS -> RendezvousChannel()
UNLIMITED -> LinkedListChannel()
CONFLATED -> ConflatedChannel()
else -> ArrayChannel(capacity)
}
复制代码
RENDEZVOUS
就是 0,这个词本意就是描述“不见不散”的场景,因此你不来 receive,我这 send 就一直搁这儿挂起等着。换句话说,咱们开头的例子里面,若是 consumer 不 receive,producer 里面的第一个 send 就给挂起了。github
UNLIMITED
比较好理解,无限制,从它给出的实现 LinkedListChannel
来看,这一点也与咱们的 LinkedBlockingQueue
相似。编程
CONFLATED
,这个词是合并的意思,这个类型的 Channel
有一个元素大小的缓冲区,但每次有新元素过来,都会用新的替换旧的。设计模式
BUFFERED
了,它接收一个值做为缓冲区容量的大小,默认64。缓存
但Channel
是热流,即便没有消费者,它的生产操做也会执行。若是你不接收,那么你可能再也接收不到。 由于刚才说了channel
相似于BlockingQueue
, 它的send()
和receive()
其实也是入队出队的操做,假定有多个消费者那它们就会竞争:安全
val channel = Channel<Int>()
//producer
launch(Dispatchers.IO) {
var i = 0
while (true) {
channel.send(i++)
delay(1000)
}
}
//consumer 1
launch {
while (true) {
println("~~~"+channel.receive())
}
}
//consumer 2
launch {
while (true) {
println("!!!"+channel.receive())
}
}
部分输出:
~~~0
~~~1
!!!2
~~~3
!!!4
~~~5
!!!6
~~~7
!!!8
~~~9
复制代码
发现基本是交替获取到值。那若是想全都接收怎么办呢: 使用BroadcastChannel
:markdown
val channel = BroadcastChannel<Int>(Channel.BUFFERED)
//producer
launch(Dispatchers.IO) {
var i = 0
while (true) {
channel.send(i++)
delay(1000)
}
}
//consumer 1
launch {
while (true) {
println("~~~"+channel.openSubscription().receive())
}
}
//consumer 2
launch {
while (true) {
println("!!!"+channel.openSubscription().receive())
}
}
部分输出:
~~~1
!!!1
~~~2
!!!2
~~~3
!!!3
~~~4
!!!4
复制代码
还有一点要注意的是,channel
须要手动关闭。网络
上面说到 sequence
没法享受更上层的协程框架概念下的各类能力,还有一点 sequence
显然不是线程安全的,而 Channel
能够在并发场景下使用。
launch {
val channel = produce(Dispatchers.Unconfined) {
send(1)
send(2)
}
for (item in channel) {
println("got : $item")
}
}
复制代码
但Channel
即便没有人“消费”,值依旧会生产,这会形成必定的浪费。
那么能不能Sequence + Channel
搞一下?
Flow
是在 Kotlin Coroutines 1.2.0 alpha
以后新增的一套API,也叫作异步流,是 Kotlin
协程与响应式编程模型结合的产物。
响应式编程基于观察者模式,是一种面向数据流和变化传播的声明式编程方式。换个说法就是:响应式编程是使用异步数据流进行编程。【响应式编程】
异步挂起函数可以返回单一值,那么咱们如何返回多个异步计算的值呢?而这个就是
Kotlin Flow
解决的问题。
channel的【操做符】在kotlin 1.4标记为弃用,将来是要移除掉的
Flow
有多种构建方式,如下是最简单的方式:
viewModelScope.launch{
//构建 flow
val testFlow= flow {
emit(1)
}
//消费Flow
testFlow.collect {
println(it)
}
}
复制代码
一个 Flow
建立出来以后,不消费则不生产,屡次消费则屡次生产,生产和消费老是相对应的。 所谓冷数据流,就是只有消费时才会生产的数据流,这一点与Channel
相反:Channel
的发送端并不依赖于接收端。
收集器是具备单一挂起功能的流接口收集,它是终端操做符:
public interface Flow<out T> {
public suspend fun collect(collector: FlowCollector<T>)
}
复制代码
发射器是FlowCollector,具备一个称为emit的单个挂起函数
public interface FlowCollector<in T> {
public suspend fun emit(value: T)
}
复制代码
在其内部,收集器和发射器的整个机制只是调用两边的函数,
而suspend关键字则为其增长魔力。
fun main() = runBlocking {
flow {
emit("Context")
println(" emit on ${Thread.currentThread().name}")
}
.flowOn(Dispatchers.IO)
.map {
println(" map on ${Thread.currentThread().name}")
it + " Preservation"
}
.flowOn(Dispatchers.Default)
.collect { value ->
println(" collect on ${Thread.currentThread().name}")
println(value)
}
}
复制代码
输出:
emit on DefaultDispatcher-worker-2
map on DefaultDispatcher-worker-1
collect on main
Context Preservation
复制代码
Flow
从不捕获或处理下游⬇️流中发生的异常,它们仅使用catch
运算符捕获上游⬆️发生的异常。
flow {
emit(1)
throw ArithmeticException("Div 0")
}.catch { t: Throwable ->
println("caught error: $t")
}
复制代码
若是你想在流的开始和结尾处进行一些操做。 onCompletion 用起来比较相似于 try ... catch ... finally
中的 finally
,不管前面是否存在异常,它都会被调用,参数 t
则是前面未捕获的异常。
flow {
emit(1)
}.onStart {
println("smart")
}.onCompletion { t: Throwable ->
println("caught error: $t")
}.collect {
println(it)
}
复制代码
输出:
smart
1
end
复制代码
Flow
的设计初衷是但愿确保流操做中异常透明。所以禁止🚫在flow
的构建中 try catch
:
Wrong:
flow {
try {
emit(1)
throw ArithmeticException("Div 0")
} catch (t: Throwable){
println("caught error: $t")
} finally {
println("finally.")
}
}
复制代码
前面的例子当中,咱们用 collect 消费 Flow 的数据。collect
是最基本的末端操做符,除了 collect
以外,还有其余常见的末端操做符,大致分为三种类:
toList
、toSet
等。reduce
、fold
等操做,以及得到单个元素的操做包括 single
、singleOrNull
、first
等。collect()
和 launchIn()
等。实际上,识别是否为末端操做符,还有一个简单方法,因为 Flow
的消费端必定须要运行在协程当中,所以末端操做符都是挂起函数。
Flow 没有提供取消操做,Flow 的消费依赖于collect
这样的末端操做符,而它们又必须在协程当中调用,所以 Flow 的取消主要依赖于末端操做符所在的协程的状态。
val job = launch {
val intFlow = flow {
(1..3).forEach {
delay(1000)
emit(it)
}
}
intFlow.collect { println(it) }
}
delay(2500)
job.cancel()
复制代码
flow { ... }
是基础的建立方式,还有其余构建器使流的声明更简单:
flowOf
构建器定义了一个发射固定,集的流。.asFlow()
扩展函数,能够将各类集合与序列转换为流。在flow { ... }
中没法随意切换调度器,这是由于 emit
函数不是线程安全的:
flow {
withContext(Dispatchers.IO){ //error
emit(2)
}
emit(1)
}.collect {
println(it)
}
复制代码
想要在生成元素时切换调度器,就须使用
channelFlow
函数来建立 Flow
:
channelFlow {
send(1)
withContext(Dispatchers.IO) {
send(2)
}
}
复制代码
上面咱们说flow
是冷流,只有collect 以后才触发"生产",那我就想要一个"热"流咋整呢? **SharedFlow**
就是解决这个问题。在SharedFlow
以前一般是使用BroadcastChannel
而后asFlow
去实现,但这种实现方式不够优雅,和Channel过于耦合。所以在Coroutine
1.4时推出了SharedFlow. 它是一个**“热”流**,且能够有多个订阅者。
简单使用:
val broadcasts=MutableSharedFlow<String>()
viewModelScope.launch{
broadcasts.emit("Hello")
broadcasts.emit("SharedFlow")
}
lifecycleScope.launch{
broadcasts.collect {
print(it)
}
}
复制代码
StateFlow
是SharedFlow
的一个比较特殊的变种,而SharedFlow
又是Kotlin
数据流当中比较特殊的一种类型。StateFlow 与 LiveData 是最接近的,由于:
- 它始终是有值的。
- 它的值是惟一的。
- 它容许被多个观察者共用 (所以是共享的数据流)。
- 它永远只会把最新的值重现给订阅者,这与活跃观察者的数量是无关的。
当暴露 UI 的状态给视图时,应该使用
StateFlow
。这是一种安全和高效的观察者,专门用于容纳 UI 状态。
简单来讲就是相似LiveData
,可是更好用!
StateFlow
仅在值已更新且不相同值时返回。简单来讲,假定两个值x和y,其中x是最初发出的值,y是要发出的值,若是(x == y)
不执行任何操做,(x !=y)
则仅在此状况下才发出新值。 简单使用:
val stateFlow = MutableStateFlow(UIState.Loading)//初始状态
stateFlow.value = UIState.Error
launch {
stateFlow.collect {
...
}
}
复制代码
更多信息参阅:StateFlow和SharedFlow
只要是响应式编程,就必定会有背压问题,咱们先来看看背压到底是什么: 生产者生产数据的速度超过了消费者消费的速度致使的问题。 但得益于suspend功能,能够在Kotlin流程中实现透明的背压管理。 当流的收集器不堪重负时,它能够简单地挂起发射器,并在准备好接受更多元素时将其resume。 但为了保证数据不丢失,咱们也会考虑添加缓存来缓解问题:
flow {
List(100) {
emit(it)
}
}.buffer()
复制代码
咱们也能够为 buffer 指定一个容量。不过,若是咱们只是单纯地添加缓存,而不是从根本上解决问题就始终会形成数据积压。 (就像咱们板球的聊天室消息缓存池)。
问题产生的根本缘由是生产和消费速率的不匹配,除直接优化消费者的性能之外,咱们也能够采起一些取舍的手段。
第一种是 conflate
新数据会覆盖老数据,例如:
flow {
List(100) {
emit(it)
}
}.conflate()
.collect { value ->
println("Collecting $value")
delay(100)
println("$value collected")
}
复制代码
咱们快速地发送了 100 个元素,最后接收到的只有两个,固然这个结果每次都不必定同样:
Collecting 1
1 collected
Collecting 99
99 collected
复制代码
第二种是 collectLatest
。顾名思义,只处理最新的数据,这看上去彷佛与 conflate
没有区别,其实区别大了:它并不会直接用新数据覆盖老数据,而是每个都会被处理,只不过若是前一个还没被处理完后一个就来了的话,处理前一个数据的逻辑就会被取消。 仍是前面的例子,咱们稍做修改:
flow {
List(100) {
emit(it)
}
}.collectLatest { value ->
println("Collecting $value")
delay(100)
println("$value collected")
}
复制代码
结果:
Collecting 0
Collecting 1
...
Collecting 97
Collecting 98
Collecting 99
▶ 100ms later
99 collected
复制代码
除 collectLatest
以外还有 mapLatest
、flatMapLatest
等等,都是这个做用。
近年来flow
是谷歌大力支持技术,像Room
,DataStore
, Paging3
等都支持了Flow
,那你还等什么呢,学起来,用起来。
能够发送多个值,UI 状态彻底由数据驱动,好比Follow
按钮就能够改造一下,先Loading
后展现结果:
@WorkerThread
fun getObservableUserEvents(userId: String?):Flow<Result<ObservableUserEvents>{
return flow {
emit(Result.Loading)
if (userId == null) {
emit(sessionRepository.getSessions())
}
}
}
复制代码
我要给某一网络请求增长重试机制:
override suspend fun getTrendsList() = flow<Result<xxx>> {
...
emit(Result.Success(result))
}.retry(2).catch { e ->
emit(Result.Error(e))
}
复制代码
搜索有多个tab,都要监听搜索的触发,可是一次预期是触发一个tab的搜索。在ViewPager里,旁边的Fragment是onPause,此时依旧能够收到livedata
回调,可是使用lifecycleScope
和flow
便可解决这个问题,由于launchWhenResumed
不在Resume
时会挂起:
lifecycleScope.launchWhenResumed{
searchRequestFlow.collect{request->
doSearch(request)
}
}
复制代码
而在Lifecycle 2.4.0
以后提供了一个新的API repeatOnLifecycle
,能够指定生命周期状态,而且在离开状态时不是简单的挂起,而是取消协程,当生命周期恢复时:
lifecycleScope.launch {
lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.navigationActions.collect {
...
}
}
}
复制代码
官方倾向于使用 repeatOnLifecycle API
收集数据流,而不是在 launchWhenX API
内部进行收集。因为后面的 API
会挂起协程,而不是在 Lifecycle
处于 STOPPED
状态时取消。上游数据流会在后台保持活跃状态,并可能会发出新的项并耗用资源。
在以前,咱们会使用LiveData
发送Event
去和UI
交互,或者执行某段逻辑,可是有些时候页面重建致使LiveData从新绑定,此时会当即收到回调致使触发逻辑。为了解决这个问题,在Flow
和Channel
都还没稳定的时候,谷歌的示例使用封装的Event
来判断事件是否处理过:
open class Event<out T>(private val content: T) {
var hasBeenHandled = false
private set // Allow external read but not write
/** * Returns the content and prevents its use again. */
fun getContentIfNotHandled(): T? {
return if (hasBeenHandled) {
null
} else {
hasBeenHandled = true
content
}
}
/** * Returns the content, even if it's already been handled. */
fun peekContent(): T = content
}
复制代码
但这只是个简单的封装,只能有一个观察者,想要应用在复杂场景还得设计个Manager
来管理多个观察者。可是使用SharedFlow
则不会有这个问题,毕竟LiveData
原本就不也是用来干这活的,人家设计来是和UI
来绑定的。
由于SharedFlow
是热流,事件被广播给未知数量的订阅者。在没有订阅者的状况下,任何发布的事件都会当即删除。它是一种用于必须当即处理或根本不处理的事件的设计模式。 使用示例:
val scrollToEvent: SharedFlow<ScheduleScrollEvent> =
loadSessionsResult.combineTransform(currentEventIndex) { result, currentEventIndex ->
emit(ScheduleScrollEvent(currentEventIndex))
}.shareIn(viewModelScope, WhileViewSubscribed, replay = 0)
复制代码
针对事件发送有些时候也可使用Channel
,这个看业务场景:Channel
会每一个事件都传递给单个订阅者。一旦Channel
缓冲区满了,会尝试在没有订阅者的状况下暂停事件发布,等待订阅者出现。默认状况下永远不会删除已发布的事件。(不过经过设置也能够无缓存或者仅缓存一个)
使用示例:
// SIDE EFFECTS: Navigation actions
private val _navigationActions = Channel<NavigationAction>(capacity = Channel.CONFLATED)
val navigationActions = _navigationActions.receiveAsFlow()
复制代码
搜索监听输入框,输入时执行搜索,这里要进行debounce,避免发出过多的sug请求:
val query=MutableStateFlow<String?>(null)
fun onTextChanged(text:String){
query.value=text
}
launch{
query.debounce(100).collect{text->
text?.let{
doSearch(text)
}
}
}
复制代码
同时请求缓存和网络,网络先到则更新缓存,并取消协程,缓存先到则数据发送到UI后继续执行,直到网络数据返回。
listOf(
async { dataSource.getCacheData() },
async { dataSource.getRemoteData() })
.map { deferred ->
flow { emit(deferred.await()) }
}.merge().onEach { result ->
//网络数据
if (result.requestType == RequestType.NETWORK) {
if (isActive) {
_source.postValue(result)
}
if (result is Result.Success) {
result.data?.let { newData ->
//更新缓存
dataSource.flushCache(newData)
}
cancel()
}
} else {
//缓存数据
if (result is Result.Success) {
if (isActive) {
_source.postValue(result)
}
}
}
}.onCompletion {
isPreLoading.set(false)
}.launchIn(this)
复制代码
每次各取一个,一旦其中一个流完成,结果流就完成,并在剩余流上调用cancel。
val flow = flowOf("4K显示器", "2K显示器", "1080P显示器")
val flow2 = flowOf("小明", "小陈", "小红", "小十一郎")
flow.zip(flow2) { i, s -> i + " 发给了 "+s }.collect {
println(it)
}
4K显示器 发给了 小明
2K显示器 发给了 小陈
1080P显示器 发给了 小红
复制代码
经过组合每一个流的最近发射的值,使用转换函数生成其值:
val flow = flowOf("Tom", "Jack", "Lucifer")
val flow2 = flowOf("小明", "小陈", "小红", "小十一郎")
flow.combine(flow2) { i, s -> i + " 和 " + s + "握了手" }.collect {
println(it)
}
Tom 和 小明握了手
Jack 和 小明握了手
Jack 和 小陈握了手
Lucifer 和 小陈握了手
Lucifer 和 小红握了手
Lucifer 和 小十一郎握了手
复制代码
若是咱们对 第一个flow的发射加一个延迟:
val flow = flowOf("Tom", "Jack", "Lucifer").onEach { delay(10) }
val flow2 = flowOf("小明", "小陈", "小红", "小十一郎")
flow.combine(flow2) { i, s -> i + " 和 " + s + "握了手" }.collect {
println(it)
}
Tom 和 小十一郎握了手
Jack 和 小十一郎握了手
Lucifer 和 小十一郎握了手
复制代码
因为第一个流加了延迟,当数据发射时,第二个流已经发送完毕了,那么对于第二个流来讲,最新值就是“小十一郎”。因此结果就成了上面那样。
Flow
操做符虽然比RxJava
少些,但知足大部分场景,其余操做符剩余的你们自行研究吧~ 更多操做符请查阅: Kotlin Flow
SharedFlow
支持replay
,支持多观察者,再加上协程特性,结合Lifecycle
,一个FlowEventBus
不就出来了?!
【Kotlin】就几行代码?! 用SharedFlow写个FlowEventBus
协程 Flow 最佳实践 | 基于 Android 开发者峰会应用
哇 你都看到这了,点个赞再走呗。~