关于kotlin中的Collections、Sequence、Channel和Flow (二)

关于Collections和Sequence请看关于kotlin中的Collections、Sequence、Channel和Flow (一) html

Channel

image.png

简介

Channel 是一个和 BlockingQueue 很是类似的概念。其中一个不一样是它代替了阻塞的 put 操做并提供了挂起的 send,还替代了阻塞的 take 操做并提供了挂起的 receiveChannel 是并发安全的,它能够用来链接协程,实现不一样协程的通讯。android

简单使用

val channel = Channel<Int>()
//producer
launch(Dispatchers.IO) {
    var i = 0
    while (true) {
        channel.send(i++)
        delay(1000)
    }
}

//consumer
launch {
    while (true) {
        println(channel.receive())
    }
}
复制代码

既然咱们说 Channel 实际上就是一个队列,队列不该该有缓冲区吗,那么这个缓冲区一旦满了,而且也一直没有人调用 receive 取走元素的话,send 就挂起了。那么接下来咱们看下 Channel 的缓冲区的定义:git

public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
    when (capacity) {
        RENDEZVOUS -> RendezvousChannel()
        UNLIMITED -> LinkedListChannel()
        CONFLATED -> ConflatedChannel()
        else -> ArrayChannel(capacity)
    }
复制代码
  • RENDEZVOUS 就是 0,这个词本意就是描述“不见不散”的场景,因此你不来 receive,我这 send 就一直搁这儿挂起等着。换句话说,咱们开头的例子里面,若是 consumer 不 receive,producer 里面的第一个 send 就给挂起了。github

  • UNLIMITED 比较好理解,无限制,从它给出的实现 LinkedListChannel 来看,这一点也与咱们的 LinkedBlockingQueue 相似。编程

  • CONFLATED,这个词是合并的意思,这个类型的 Channel 有一个元素大小的缓冲区,但每次有新元素过来,都会用新的替换旧的。设计模式

  • BUFFERED 了,它接收一个值做为缓冲区容量的大小,默认64。缓存

Channel是热流,即便没有消费者,它的生产操做也会执行。若是你不接收,那么你可能再也接收不到。 由于刚才说了channel 相似于BlockingQueue , 它的send()receive() 其实也是入队出队的操做,假定有多个消费者那它们就会竞争:安全

val channel = Channel<Int>()
//producer
launch(Dispatchers.IO) {
    var i = 0
    while (true) {
        channel.send(i++)
        delay(1000)
    }
}

//consumer 1
launch {
    while (true) {
        println("~~~"+channel.receive())
    }

}
//consumer 2
launch {
    while (true) {
        println("!!!"+channel.receive())
    }
}

部分输出:
~~~0
~~~1
!!!2
~~~3
!!!4
~~~5
!!!6
~~~7
!!!8
~~~9
复制代码

发现基本是交替获取到值。那若是想全都接收怎么办呢: 使用BroadcastChannel :markdown

val channel = BroadcastChannel<Int>(Channel.BUFFERED)
//producer
launch(Dispatchers.IO) {
    var i = 0
    while (true) {
        channel.send(i++)
        delay(1000)
    }
}
//consumer 1
launch {
    while (true) {
        println("~~~"+channel.openSubscription().receive())
    }

}
//consumer 2
launch {
    while (true) {
        println("!!!"+channel.openSubscription().receive())
    }
}

部分输出:
~~~1
!!!1
~~~2
!!!2
~~~3
!!!3
~~~4
!!!4
复制代码

还有一点要注意的是,channel须要手动关闭。网络

Channel 版本的序列生成器

上面说到 sequence 没法享受更上层的协程框架概念下的各类能力,还有一点 sequence 显然不是线程安全的,而 Channel 能够在并发场景下使用。

launch {
    val channel = produce(Dispatchers.Unconfined) {
        send(1)
        send(2)
    }

    for (item in channel) {
        println("got : $item")
    }
}
复制代码

Channel 即便没有人“消费”,值依旧会生产,这会形成必定的浪费。
那么能不能Sequence + Channel 搞一下?

image.png

Flow

image.png

简介

Flow 是在 Kotlin Coroutines 1.2.0 alpha 以后新增的一套API,也叫作异步流,是 Kotlin 协程与响应式编程模型结合的产物。

什么是响应式编程

响应式编程基于观察者模式,是一种面向数据流和变化传播的声明式编程方式。换个说法就是:响应式编程是使用异步数据流进行编程。【响应式编程】

Flow 解决了什么

异步挂起函数可以返回单一值,那么咱们如何返回多个异步计算的值呢?而这个就是Kotlin Flow解决的问题。

和Channel对比

  • Flow是“冷”🥶的 ,和Sequence同样,只有遇到末端操做才会执行,但又不同↓
  • Flow是响应式的,由生产者回调给消费者 (sequence是消费端通知生产端)
  • 它基于协程构建,所以提供告终构化并发和取消的全部好处。
  • 丰富的操做符

channel的【操做符】在kotlin 1.4标记为弃用,将来是要移除掉的

如何使用

Flow有多种构建方式,如下是最简单的方式:

viewModelScope.launch{
    //构建 flow
    val testFlow= flow {
         emit(1)
    }

    //消费Flow
    testFlow.collect { 
       println(it)
    }
    
}
复制代码

怎么就是冷流了

一个 Flow 建立出来以后,不消费则不生产,屡次消费则屡次生产,生产和消费老是相对应的。 所谓冷数据流,就是只有消费时才会生产的数据流,这一点与Channel 相反:Channel 的发送端并不依赖于接收端。

image.png 收集器是具备单一挂起功能的流接口收集,它是终端操做符:

public interface Flow<out T> {    
    public suspend fun collect(collector: FlowCollector<T>)
}
复制代码

发射器是FlowCollector,具备一个称为emit的单个挂起函数

public interface FlowCollector<in T> {  
    public suspend fun emit(value: T)
}
复制代码

在其内部,收集器和发射器的整个机制只是调用两边的函数

而suspend关键字则为其增长魔力。

线程切换

  • 使用flowOn()来切换流的执行线程,flowOn 指定的调度器影响前面的逻辑。
fun main() = runBlocking {
   flow {
        emit("Context")
        println(" emit on ${Thread.currentThread().name}")
    }
    .flowOn(Dispatchers.IO)
    .map {
        println(" map on ${Thread.currentThread().name}")
        it + " Preservation"
     }
     .flowOn(Dispatchers.Default)
     .collect { value ->
        println(" collect on ${Thread.currentThread().name}")
        println(value)
      }
}
复制代码

输出:

emit on DefaultDispatcher-worker-2
 map on DefaultDispatcher-worker-1
 collect on main
 Context Preservation
复制代码

异常处理

Flow从不捕获或处理下游⬇️流中发生的异常,它们仅使用catch运算符捕获上游⬆️发生的异常。

flow {
  emit(1)
  throw ArithmeticException("Div 0")
}.catch { t: Throwable ->
  println("caught error: $t")
}
复制代码

开始和结束

若是你想在流的开始和结尾处进行一些操做。 onCompletion 用起来比较相似于 try ... catch ... finally 中的 finally,不管前面是否存在异常,它都会被调用,参数 t 则是前面未捕获的异常。

flow {
    emit(1)
}.onStart {
    println("smart")
}.onCompletion {  t: Throwable ->
  println("caught error: $t")
}.collect {
    println(it)
}
复制代码

输出:

smart
1
end
复制代码

Flow 的设计初衷是但愿确保流操做中异常透明。所以禁止🚫在flow的构建中 try catch:

Wrong

flow { 
  try {
    emit(1)
    throw ArithmeticException("Div 0")
  } catch (t: Throwable){
    println("caught error: $t")
  } finally {
    println("finally.")
  }
}
复制代码

末端操做符

前面的例子当中,咱们用 collect 消费 Flow 的数据。collect 是最基本的末端操做符,除了 collect 以外,还有其余常见的末端操做符,大致分为三种类:

  1. 集合类型转换操做,包括 toListtoSet 等。
  2. 聚合操做,包括将 Flow 规约到单值的 reducefold 等操做,以及得到单个元素的操做包括 singlesingleOrNullfirst 等。
  3. 无操做 collect()launchIn()等。

实际上,识别是否为末端操做符,还有一个简单方法,因为 Flow 的消费端必定须要运行在协程当中,所以末端操做符都是挂起函数。

Flow 的取消

Flow 没有提供取消操做,Flow 的消费依赖于collect 这样的末端操做符,而它们又必须在协程当中调用,所以 Flow 的取消主要依赖于末端操做符所在的协程的状态

val job = launch {
    val intFlow = flow {
        (1..3).forEach {
            delay(1000)
            emit(it)
        }
    }

    intFlow.collect { println(it) }
}

delay(2500)
job.cancel()
复制代码

其余 Flow 的建立方式

flow { ... } 是基础的建立方式,还有其余构建器使流的声明更简单:

  • flowOf 构建器定义了一个发射固定,集的流。
  • 使用 .asFlow() 扩展函数,能够将各类集合与序列转换为流。

flow { ... }中没法随意切换调度器,这是由于 emit 函数不是线程安全的:

flow {
    withContext(Dispatchers.IO){  //error
        emit(2)
    }
    emit(1)
}.collect {
    println(it)
}
复制代码

image.png 想要在生成元素时切换调度器,就须使用channelFlow 函数来建立 Flow

channelFlow {
  send(1)
  withContext(Dispatchers.IO) {
    send(2)
  }
}
复制代码

SharedFlow

上面咱们说flow是冷流,只有collect 以后才触发"生产",那我就想要一个"热"流咋整呢? **SharedFlow**就是解决这个问题。在SharedFlow以前一般是使用BroadcastChannel而后asFlow 去实现,但这种实现方式不够优雅,和Channel过于耦合。所以在Coroutine 1.4时推出了SharedFlow. 它是一个**“热”流**,且能够有多个订阅者

简单使用:

val broadcasts=MutableSharedFlow<String>()

viewModelScope.launch{
      broadcasts.emit("Hello")
      broadcasts.emit("SharedFlow")
}


lifecycleScope.launch{
    broadcasts.collect { 
       print(it)
    }
}
复制代码

StateFlow

StateFlowSharedFlow 的一个比较特殊的变种,而 SharedFlow又是 Kotlin 数据流当中比较特殊的一种类型。StateFlow 与 LiveData 是最接近的,由于:

  • 它始终是有值的。
  • 它的值是惟一的。
  • 它容许被多个观察者共用 (所以是共享的数据流)。
  • 它永远只会把最新的值重现给订阅者,这与活跃观察者的数量是无关的。

当暴露 UI 的状态给视图时,应该使用 StateFlow。这是一种安全和高效的观察者,专门用于容纳 UI 状态。

简单来讲就是相似LiveData,可是更好用!

StateFlow仅在值已更新且不相同值时返回。简单来讲,假定两个值x和y,其中x是最初发出的值,y是要发出的值,若是(x == y)不执行任何操做,(x !=y)则仅在此状况下才发出新值。 简单使用:

val stateFlow = MutableStateFlow(UIState.Loading)//初始状态

stateFlow.value = UIState.Error

launch {
    stateFlow.collect {
       ...
    }
}
复制代码

更多信息参阅:StateFlow和SharedFlow

背压

只要是响应式编程,就必定会有背压问题,咱们先来看看背压到底是什么: 生产者生产数据的速度超过了消费者消费的速度致使的问题。 但得益于suspend功能,能够在Kotlin流程中实现透明的背压管理。 当流的收集器不堪重负时,它能够简单地挂起发射器,并在准备好接受更多元素时将其resume。 但为了保证数据不丢失,咱们也会考虑添加缓存来缓解问题:

flow {
  List(100) {
    emit(it)
  }
}.buffer()
复制代码

咱们也能够为 buffer 指定一个容量。不过,若是咱们只是单纯地添加缓存,而不是从根本上解决问题就始终会形成数据积压。 (就像咱们板球的聊天室消息缓存池)。

问题产生的根本缘由是生产和消费速率的不匹配,除直接优化消费者的性能之外,咱们也能够采起一些取舍的手段。

第一种是 conflate 新数据会覆盖老数据,例如:

flow {
  List(100) {
    emit(it)
  }
}.conflate()
.collect { value ->
  println("Collecting $value")
  delay(100) 
  println("$value collected")
}
复制代码

咱们快速地发送了 100 个元素,最后接收到的只有两个,固然这个结果每次都不必定同样:

Collecting 1
1 collected
Collecting 99
99 collected
复制代码

第二种是 collectLatest。顾名思义,只处理最新的数据,这看上去彷佛与 conflate 没有区别,其实区别大了:它并不会直接用新数据覆盖老数据,而是每个都会被处理,只不过若是前一个还没被处理完后一个就来了的话,处理前一个数据的逻辑就会被取消。 仍是前面的例子,咱们稍做修改:

flow {
  List(100) {
    emit(it)
  }
}.collectLatest { value ->
  println("Collecting $value")
  delay(100)
  println("$value collected")
}
复制代码

结果:

Collecting 0
Collecting 1
...
Collecting 97
Collecting 98
Collecting 99
▶ 100ms later
99 collected
复制代码

collectLatest 以外还有 mapLatestflatMapLatest 等等,都是这个做用。

在项目中的实战

近年来flow是谷歌大力支持技术,像RoomDataStore, Paging3 等都支持了Flow ,那你还等什么呢,学起来,用起来。

image.png

普通suspend请求改造

能够发送多个值,UI 状态彻底由数据驱动,好比Follow按钮就能够改造一下,先Loading后展现结果:

@WorkerThread
 fun getObservableUserEvents(userId: String?):Flow<Result<ObservableUserEvents>{
    return flow {
        emit(Result.Loading)
        if (userId == null) {
            emit(sessionRepository.getSessions())
        }
    }
}
复制代码

重试机制

我要给某一网络请求增长重试机制:

override suspend fun getTrendsList() = flow<Result<xxx>> {
   ...
   emit(Result.Success(result))
}.retry(2).catch { e ->
   emit(Result.Error(e))
}
复制代码

当lifecycleScope赶上flow

搜索有多个tab,都要监听搜索的触发,可是一次预期是触发一个tab的搜索。在ViewPager里,旁边的Fragment是onPause,此时依旧能够收到livedata回调,可是使用lifecycleScopeflow 便可解决这个问题,由于launchWhenResumed不在Resume时会挂起:

lifecycleScope.launchWhenResumed{
    searchRequestFlow.collect{request->
        doSearch(request)
    }
}
复制代码

而在Lifecycle 2.4.0以后提供了一个新的API repeatOnLifecycle,能够指定生命周期状态,而且在离开状态时不是简单的挂起,而是取消协程,当生命周期恢复时:

lifecycleScope.launch {
    lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) {
        viewModel.navigationActions.collect {
            ...
        }
    }
}
复制代码

官方倾向于使用 repeatOnLifecycle API 收集数据流,而不是在 launchWhenX API 内部进行收集。因为后面的 API 会挂起协程,而不是在 Lifecycle 处于 STOPPED 状态时取消。上游数据流会在后台保持活跃状态,并可能会发出新的项并耗用资源。

"数据倒灌"?不存在的

在以前,咱们会使用LiveData发送Event去和UI交互,或者执行某段逻辑,可是有些时候页面重建致使LiveData从新绑定,此时会当即收到回调致使触发逻辑。为了解决这个问题,在FlowChannel都还没稳定的时候,谷歌的示例使用封装的Event来判断事件是否处理过:

open class Event<out T>(private val content: T) {

    var hasBeenHandled = false
        private set // Allow external read but not write

    /** * Returns the content and prevents its use again. */
    fun getContentIfNotHandled(): T? {
        return if (hasBeenHandled) {
            null
        } else {
            hasBeenHandled = true
            content
        }
    }

    /** * Returns the content, even if it's already been handled. */
    fun peekContent(): T = content
}
复制代码

但这只是个简单的封装,只能有一个观察者,想要应用在复杂场景还得设计个Manager来管理多个观察者。可是使用SharedFlow 则不会有这个问题,毕竟LiveData原本就不也是用来干这活的,人家设计来是和UI来绑定的。

由于SharedFlow是热流,事件被广播给未知数量的订阅者。在没有订阅者的状况下,任何发布的事件都会当即删除。它是一种用于必须当即处理或根本不处理的事件的设计模式。 使用示例:

val scrollToEvent: SharedFlow<ScheduleScrollEvent> =
    loadSessionsResult.combineTransform(currentEventIndex) { result, currentEventIndex ->
       emit(ScheduleScrollEvent(currentEventIndex))
    }.shareIn(viewModelScope, WhileViewSubscribed, replay = 0) 
复制代码

针对事件发送有些时候也可使用Channel,这个看业务场景:Channel会每一个事件都传递给单个订阅者。一旦Channel缓冲区满了,会尝试在没有订阅者的状况下暂停事件发布,等待订阅者出现。默认状况下永远不会删除已发布的事件。(不过经过设置也能够无缓存或者仅缓存一个)

使用示例:

// SIDE EFFECTS: Navigation actions
private val _navigationActions = Channel<NavigationAction>(capacity = Channel.CONFLATED)
val navigationActions = _navigationActions.receiveAsFlow()
复制代码

debounce

搜索监听输入框,输入时执行搜索,这里要进行debounce,避免发出过多的sug请求:

val query=MutableStateFlow<String?>(null)

fun onTextChanged(text:String){
    query.value=text
}

launch{
    query.debounce(100).collect{text->
        text?.let{
             doSearch(text)
        }
    }
}
复制代码

多路复用

同时请求缓存和网络,网络先到则更新缓存,并取消协程,缓存先到则数据发送到UI后继续执行,直到网络数据返回。

listOf(
        async { dataSource.getCacheData() },
        async { dataSource.getRemoteData() })
.map { deferred ->
    flow { emit(deferred.await()) }
}.merge().onEach { result ->
     //网络数据
    if (result.requestType == RequestType.NETWORK) {
        if (isActive) {
            _source.postValue(result)
        }
        if (result is Result.Success) {
            result.data?.let { newData ->
                 //更新缓存
                dataSource.flushCache(newData)
            }           
           cancel()
        }
    } else {
    //缓存数据
        if (result is Result.Success) {
            if (isActive) {
                _source.postValue(result)
            }
        }
    }
}.onCompletion {
    isPreLoading.set(false)
}.launchIn(this)
复制代码

组合多个流

  • Zip

每次各取一个,一旦其中一个流完成,结果流就完成,并在剩余流上调用cancel。

val flow = flowOf("4K显示器", "2K显示器", "1080P显示器")
val flow2 = flowOf("小明", "小陈", "小红", "小十一郎")
flow.zip(flow2) { i, s -> i + " 发给了 "+s }.collect {
    println(it)
}

4K显示器 发给了 小明
2K显示器 发给了 小陈
1080P显示器 发给了 小红
复制代码
  • Combine

经过组合每一个流的最近发射的值,使用转换函数生成其值:

val flow = flowOf("Tom", "Jack", "Lucifer")
val flow2 = flowOf("小明", "小陈", "小红", "小十一郎")
flow.combine(flow2) { i, s -> i + " 和 " + s + "握了手" }.collect {
    println(it)
}

Tom 和 小明握了手
Jack 和 小明握了手
Jack 和 小陈握了手
Lucifer 和 小陈握了手
Lucifer 和 小红握了手
Lucifer 和 小十一郎握了手
复制代码

若是咱们对 第一个flow的发射加一个延迟:

val flow = flowOf("Tom", "Jack", "Lucifer").onEach { delay(10) }
val flow2 = flowOf("小明", "小陈", "小红", "小十一郎")
flow.combine(flow2) { i, s -> i + " 和 " + s + "握了手" }.collect {
    println(it)
}

Tom 和 小十一郎握了手
Jack 和 小十一郎握了手
Lucifer 和 小十一郎握了手
复制代码

因为第一个流加了延迟,当数据发射时,第二个流已经发送完毕了,那么对于第二个流来讲,最新值就是“小十一郎”。因此结果就成了上面那样。

Flow操做符虽然比RxJava少些,但知足大部分场景,其余操做符剩余的你们自行研究吧~ 更多操做符请查阅: Kotlin Flow

我用SharedFlow写了个Eventbus

SharedFlow支持replay,支持多观察者,再加上协程特性,结合Lifecycle,一个FlowEventBus不就出来了?!

【Kotlin】就几行代码?! 用SharedFlow写个FlowEventBus

参考

Bennyhuo 的博客

协程 Flow 最佳实践 | 基于 Android 开发者峰会应用

kotlin中文语言站 -Flow

StateFlow和SharedFlow

哇 你都看到这了,点个赞再走呗。~

相关文章
相关标签/搜索