最全面的Kotlin协程: Coroutine/Channel/Flow 以及实际应用

协程这个概念在1958年就开始出现, 比线程更早, 目前不少语言开始原生支, Java没有原生协程可是能够大型公司都本身或者使用第三方库来支持协程编程, 可是Kotlin原生支持协程.java

我认为协程的核心就是一个词: 做用域, 理解什么是做用域就理解协程了android

什么是协程git

协程是协做式任务, 线程是抢占式任务, 本质上二者都属于并发github

Kotlin协程就是线程库不是协程? 内部代码用的线程池?编程

  1. 最知名的协程语言Go内部也是维护了线程, 他也不是协程了?
  2. 协程只是方便开发者处理异步, 线程才能提高性能效率, 二者自己不是替换关系没有说用了谁就不用另外一个了
  3. 协程是一种概念, 无关乎具体实现方式
  4. kotlin标准库中的协程不包含线程池代码, 仅扩展库才内部处理了线程池

协程设计来源后端

  1. Kotlin的协程完美复刻了谷歌的Go语言的协程设计模式(做用域/channel/select), 将做用域用对象来具化出来; 且能够更好地控制做用域生命周期;
  2. await模式(JavaScript的异步任务解决方案)
  3. Kotlin参考RxJava响应式框架创造出Flow
  4. 使用协程开始就不须要考虑线程的问题, 只须要在不一样场景使用不一样的调度器(调度器会对特定任务进行优化)就好

特性

使用场景设计模式

假设首页存在七个接口网络请求(后端人员处理差)的状况一个个使用串行网络请求的时间比并发网络请求慢了接近七倍缓存

目前计算机都是经过多核CPU提高计算能力, 因此熟练掌握并发编程是将来的趋势安全

协程优点bash

  1. 并发实现方便
  2. 没有回调嵌套发生, 代码结构清晰
  3. 建立协程性能开销优于建立线程, 一个线程能够运行多个协程, 单线程便可异步

实验特性

协程在Kotlin1.3时候放出正式版本, 目前仍然存在不稳定函数(不影响项目开发), 经过注解标识

@FlowPreview 表明可能之后存在Api函数变更

@ExperimentalCoroutinesApi  表明目前可能存在不稳定的因素的函数

@ObsoleteCoroutinesApi 可能存在被废弃的可能
复制代码

构成

Kotlin的协程主要构成分为三部分

  1. CoroutineScope 协程做用域: 每一个协程体都存在一个做用域, 异步仍是同步由该做用域决定
  2. Channel 通道: 数据如同一个通道进行发送和接收, 能够在协程之间互相传递数据或者控制阻塞和继续
  3. Flow 响应流: 相似RxJava等结构写法

为自动化/并发网络请求我建立一个库, 我姑且称为Android最强的网络请求库: Net

1.0+版本为RxJava实现, 2.0+版本为Coroutine实现, 同时包含更强的轮循器用于替代RxJava的轮循功能

由于须要抛弃RxJava, 取代RxBus事件分发我使用协程建立出一个更增强大的: Channel

咱们公司项目属于 MVVM + Kotlin + Coroutine + JetPack, 在国外很经常使用, 主要带来的优点;

  1. 简洁, 减小70%左右代码
  2. 双向数据绑定
  3. 并发异步任务(网络)倍增速度
  4. 更健壮的数据保存和恢复

我平时项目开发必备框架

框架 描述
Net Android不是最强网络请求/异步任务库
BRV Android不是最强列表
Serialize 建立自动保存和恢复的字段
StateLayout Android不是最强缺省页
LogCat JSON和长文本日志打印工具
Tooltip 完善的吐司工具
DebugKit 开发调试窗口工具
StatusBar 一行代码建立透明状态栏
Channel 基于协程和JetPack特性的事件分发框架

展望

协程对于后端高并发优点很大, 至于Google的Jetpack基本上都有针对协程扩展, 最明显的是并发网络请求速度倍增; 同时代码更加结构清晰, 本文章后续会根据Kotlin的版本中的协程迭代进行更新

依赖

这里咱们使用协程扩展库, kotlin标准库的协程太过于简陋不适用于开发者使用

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.9"
复制代码

建立协程

开启主协程的分为三种方式

生命周期和App一致, 没法取消(不存在Job), 不存在线程阻塞

fun main() {
    GlobalScope.launch { // 在后台启动一个新的协程并继续
        delay(1000L)
        println("World!")
    }
    Thread.sleep(2000) // 防止JVM虚拟机退出
}
复制代码

这里说的是GlobalScope没有Job, 可是启动的launch都是拥有Job的. GlobalScope自己就是一个做用域, launch属于其子做用域;

不存在线程阻塞, 能够取消, 能够经过CoroutineContext控制协程生命周期

fun main() {
    CoroutineScope(Dispatchers.IO).launch {
    }
    Thread.sleep(1000)
}
复制代码

线程阻塞, 适用于单元测试, 不须要延迟阻塞防止JVM虚拟机退出. runBlocking属于全局函数能够在任意地方调用

通常咱们在项目中是不会使用runBlocking, 由于阻塞主线程没有开启的任何意义

fun main() = runBlocking { 
    // 阻塞线程直到协程做用域内部全部协程执行完毕
}
复制代码

建立做用域

协程内部还可使用函数建立其余协程做用域, 分为两种建立函数:

  1. CoroutineScope的扩展函数, 只有在做用域内部才能建立其余的做用域
  2. suspend修饰的函数内部
  3. 协程永远会等待其内部做用域内全部协程都执行完毕后才会关闭协程

在主协程内还能够建立子协程做用域, 建立函数分为两种

  1. 阻塞做用域(串行): 会阻塞当前做用域

  2. 挂起做用域(并发): 不会阻塞当前做用域

同步做用域函数

都属于suspend函数

  • withContext 能够切换调度器, 有返回结果
  • coroutineScope 建立一个协程做用域, 该做用域会阻塞当前所在做用域而且等待其子协程执行完才会恢复, 有返回结果
  • supervisorScope 使用SupervisorJob的coroutineScope, 异常不会取消父协程
public suspend fun <T> withContext( context: CoroutineContext, block: suspend CoroutineScope.() -> T ): T
// 返回结果; 能够和当前协程的父协程存在交互关系, 主要做用为来回切换调度器

public suspend inline operator fun <T> CoroutineDispatcher.invoke( noinline block: suspend CoroutineScope.() -> T ): T = withContext(this, block)
// withContext工具函数而已

public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R

public suspend fun <R> supervisorScope(block: suspend CoroutineScope.() -> R): R

public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T
复制代码

异步做用域函数

这两个函数都不属于suspend, 只须要CoroutineScope就能够调用

  • launch: 异步并发, 没有返回结果
  • async: 异步并发, 有返回结果
public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job

public fun <T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T ): Deferred<T>
复制代码

并发

同一个协程做用域中的异步任务遵照顺序原则开始执行; 适用于串行网络请求, 在一个异步任务须要上个异步任务的结果时.

协程挂起须要时间, 因此异步协程永远比同步代码执行慢

fun main() = runBlocking<Unit> {
    launch {
        System.err.println("(Main.kt:34) 后执行")
    }

    System.err.println("(Main.kt:37) 先执行")
}
复制代码

当在协程做用域中使用async函数时能够建立并发任务

public fun <T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T ): Deferred<T>
复制代码

示例

fun main() = runBlocking<Unit> {
	val name = async { getName() }
    val title = async { getTitle() }

    System.err.println("(Main.kt:35) result = ${name.await() + title.await()}")
    delay(2000)
}
复制代码
  1. 返回对象Deferred; 经过函数await获取结果值
  2. Deferred集合还可使用awaitAll()等待所有完成
  3. 不执行await任务也会等待执行完协程关闭
  4. 若是Deferred不执行await函数则async内部抛出的异常不会被logCat或tryCatch捕获, 可是依然会致使做用域取消和异常崩溃; 但当执 行await时异常信息会从新抛出

惰性并发

将async函数中的start设置为CoroutineStart.LAZY时则只有调用Deferred对象的await时才会开始执行异步任务(或者执行start函数)

启动模式

  1. DEFAULT 当即执行
  2. LAZY 直到Job执行start或者join才开始执行
  3. ATOMIC 在做用域开始执行以前没法取消
  4. UNDISPATCHED 不执行任何调度器, 直接在当前线程中执行, 可是会根据第一个挂起函数的调度器切换

异常

协程中发生异常, 则父协程取消而且父协程其余的子协程一样所有取消

Deferred

继承自Job

提供一个全局函数用于建立CompletableDeferred对象, 该对象能够实现自定义Deferred功能

public suspend fun await(): T 
// 结果
public val onAwait: SelectClause1<T>
// 在select中使用

public fun getCompleted(): T
// 若是完成[isCompleted]则返回结果, 不然抛出异常
public fun getCompletionExceptionOrNull(): Throwable?
// 若是完成[isCompleted]则返回结果, 不然抛出异常
复制代码

示例

fun main() = runBlocking<Unit> {
    val deferred = CompletableDeferred<Int>()
    
    launch {
        delay(1000 )
        deferred.complete(23)
    }

    System.err.println("(Demo.kt:72) 结果 = ${deferred.await()}")
}
复制代码

建立CompletableDeferred的顶层函数

public fun <T> CompletableDeferred(parent: Job? = null): CompletableDeferred<T>
public fun <T> CompletableDeferred(value: T): CompletableDeferred<T>
复制代码

CompletableDeferred函数

public fun complete(value: T): Boolean
// 结果

public fun completeExceptionally(exception: Throwable): Boolean
// 抛出异常, 异常发生在`await()`时

public fun <T> CompletableDeferred<T>.completeWith(result: Result<T>): Boolean
// 能够经过标记来判断是否成功, 避免异常抛出
复制代码

CoroutineScope

建立此对象表示建立一个协程做用域

结构化并发

若是你看协程的教程可能会常常看到这个词, 这就是做用域内部开启新的协程; 父协程会限制子协程的生命周期, 子协程承接父协程的上下文, 这种层级关系就是结构化并发

在一个协程做用域里面开启多个子协程进行并发行为

CoroutineContext

协程上下文, 我认为协程上下文能够看作包含协程基本信息的一个Context(上下文), 其能够决定协程的名称或者运行

建立一个新的调度器

fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher
fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher
复制代码

建立新的调度器比较消耗资源, 建议复用且当不须要的时候使用close函数释放

调度器

Dispatchers继承自CoroutineContext, 该枚举拥有三个实现; 表示不一样的线程调度; 当函数不使用调度器时承接当前做用域的调度器

  1. Dispatchers.Unconfined 不指定线程, 若是子协程切换线程那么接下来的代码也运行在该线程上
  2. Dispatchers.IO 适用于IO读写
  3. Dispatchers.Main 根据平台不一样而有所差, Android上为主线程
  4. Dispatchers.Default 默认调度器, 在线程池中执行协程体, 适用于计算操做

当即执行

Dispatchers.Main.immediate
复制代码

immediate属于全部调度器都有的属性, 该属性表明着若是当前正处于该调度器中不执行调度器切换直接执行, 能够理解为在同一调度器内属于同步协程做用域

例如launch函数开启做用域会比后续代码执行顺序低, 可是使用该属性协程属于顺序执行

示例

CoroutineScope(Job() + Dispatchers.Main.immediate).launch {
	// 执行顺序 1
}

// 执行顺序 2

CoroutineScope(Job() + Dispatchers.Main).launch {
		// 执行顺序 4
}

// 执行顺序 3
复制代码

协程命名

经过建立一个CoroutineName对象, 在构造函数中指定参数为协程名称, CoroutineName继承自CoroutineContext.

launch(CoroutineName("吴彦祖")){

}
复制代码

协程上下文名称用于方便调试使用

协程挂起

yield函数可让当前协程暂时挂起执行其余协程体, 若是没有其余正在并发的协程体则继续执行当前协程体(至关于无效调用)

public suspend fun yield(): Unit
复制代码

看协程中可能常常说起挂起, 挂起能够理解为这段代码(做用域)暂停, 而后执行后续代码; 挂起函数通常表示suspend关键字修饰的函数, suspend要求只容许在suspend修饰的函数内部调用, 可是自己这个关键字是没作任何事的. 只是为了限制开发者随意调用

挂起函数调用会在左侧行号列显示箭头图标

image-20200106120117080

JOB

在协程中Job一般被称为做业, 表示一个协程工做任务, 他一样继承自CoroutineContext

val job = launch {

}
复制代码

Job属于接口

interface Job : CoroutineContext.Element
复制代码

函数

public suspend fun join()
// 等待协程执行完毕都阻塞当前线程
public val onJoin: SelectClause0
// 后面说起的选择器中使用

public fun cancel(cause: CancellationException? = null)
// 取消协程
public suspend fun Job.cancelAndJoin()
// 阻塞而且在协程结束之后取消协程

public fun start(): Boolean
public val children: Sequence<Job>
// 所有子做业

public fun getCancellationException(): CancellationException

public fun invokeOnCompletion( onCancelling: Boolean = false, invokeImmediately: Boolean = true, handler: CompletionHandler): DisposableHandle
// p1: 当为true表示cancel不会回调handler
// p2: 当为true则先执行[handler]而后再返回[DisposableHandle], 为false则先返回[DisposableHandle]

public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
// 当其做用域完成之后执行, 主协程指定才有效, 直接给CoroutineScope指定时无效的
// 手动抛出CancellationException一样会赋值给cause
复制代码

状态

经过字段能够获取JOB当前处于状态

public val isActive: Boolean
public val isCancelled: Boolean
public val isCompleted: Boolean
复制代码

扩展函数

public fun Job.cancelChildren(cause: CancellationException? = null)

public suspend fun Job.cancelAndJoin()
复制代码

每一个协程做用域都存在coroutineContext. 而协程上下文中都存在Job对象

coroutineContext[Job]
复制代码

结束协程

若是协程做用域内存在计算任务(一直打日志也算)则没法被取消, 若是使用delay函数则能够被取消;

fun main() = runBlocking<Unit> {

  val job = launch(Dispatchers.Default) {
    while (true){
      delay(100) // 这行代码存在则能够成功取消协程, 不存在则没法取消
      System.err.println("(Main.kt:30) ")
    }
  }
  
  delay(500)
  job.cancel() 
  System.err.println("(Main.kt:42) 结束")
}
复制代码

经过使用协程内部isActive属性来判断是否应该结束

fun main() = runBlocking<Unit> {

    val job = launch(Dispatchers.Default) {
        while (isActive) { // 一旦协程被取消则为false
            System.err.println("(Main.kt:30) ")
        }
    }

    delay(500)
    job.cancel()
    System.err.println("(Main.kt:42) 结束")
}
复制代码

释放资源

协程存在被手动取消的状况, 可是有些资源须要在协程取消的时候释放资源, 这个操做能够在finally中执行

不管如何finally都会被执行

fun main() = runBlocking<Unit> {

    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        try {
            repeat(1000){
                System.err.println("(Main.kt:31) it = $it")
                delay(500)
            }
        } finally {
           // 已被取消的协程没法继续挂起
        }
    }
    delay(1500)
    job.cancel()
    System.err.println("(Main.kt:42) ")
}
复制代码

再次开启协程

经过withContextNonCancellable能够在已被取消的协程中继续挂起协程; 这种用法其实能够看作建立一个没法取消的任务

withContext(NonCancellable) {
    println("job: I'm running finally")
    delay(1000L)
    println("job: And I've just delayed for 1 sec because I'm non-cancellable")
}
复制代码

上下文组合

协程做用域能够接收多个CoroutineContext做为上下文参数; CoroutineContext自己属于接口, 不少上下文相关的类都实现与他

配置多个CoroutineContext能够经过+符号同时指定多个协程上下文, 每一个实现对象可能包含一部分信息能够存在覆盖行为故相加时的顺序存在覆盖行为

val a = CoroutineScope(SupervisorJob() + coroutineContext).launch(handler) {
  delay(1000)
  System.err.println("(Main.kt:51) ${Thread.currentThread()}")
}
复制代码
launch(Dispatchers.IO + CoroutineName("吴彦祖")){	}
复制代码

协程局部变量

使用ThreadLocal能够获取线程的局部变量, 可是要求使用扩展函数asContextElement转为协程上下文做为参数传入在建立协程的时候

该局部变量做用于持有该协程上下文的协程做用域内

public fun <T> ThreadLocal<T>.asContextElement(value: T = get()): ThreadContextElement<T> =
    ThreadLocalElement(value, this)
复制代码

超时

public suspend fun <T> withTimeout(timeMillis: Long, block: suspend CoroutineScope.() -> T): T
// 超过指定时间timeMillis自动结束协程; 
// 当没有超时时返回值获取而且继续执行协程; 
// 当超时会抛出异常TimeoutCancellationException, 可是不会致使程序结束

public suspend fun <T> withTimeoutOrNull(timeMillis: Long, block: suspend CoroutineScope.() -> T): T?
// 若是超时不会结束协程而是返回null
复制代码

没法手动抛出TimeoutCancellationException, 由于其构造函数私有

全局协程做用域

全局协程做用域属于单例对象, 整个JVM虚拟机只有一份实例对象; 他的寿命周期也跟随JVM. 使用全局协程做用域的时候注意避免内存泄漏

public object GlobalScope : CoroutineScope {
    /** * Returns [EmptyCoroutineContext]; */
    override val coroutineContext: CoroutineContext
        get() = EmptyCoroutineContext
}
复制代码

全局协程做用域不继承父协程做用域的上下文, 因此也不会由于父协程被取消而自身被取消

启动模式

  • DEFAULT 当即执行协程体
  • ATOMIC 当即执行协程体,但在开始执行协程以前没法取消协程
  • UNDISPATCHED 当即在当前线程执行协程体,第一个挂起函数执行在函数所在线程, 后面执行在函数指定线程
  • LAZY 手动执行startjoin才会执行协程

协程取消

协程体若是已经执行实际上属于不可取消的, 在协程体中经过isActive来判断协程是否处于活跃中

经过取消函数的参数指定异常CancellationException能够自定义异常对象

不可取消的协程做用域

NonCancellable该单例对象用于withContext函数建立一个没法被取消的协程做用域

withContext(NonCancellable) {
  delay(2000)
}
复制代码

示例

fun main() = runBlocking {
  launch {
    delay(1000)
    System.err.println("(Main.kt:19) ")
  }

  launch {
    withContext(NonCancellable) {
      delay(2000)
      System.err.println("(Main.kt:26) ")
    }
  }

  delay(500) // 防止launch还未开启withContext就被取消
  cancel()
}
复制代码
  1. 当子做用域内包含没有终止的任务, 将等待任务完成后才会取消(delay不存在, Thread.sleep能够模拟未结束的任务)
  2. 抛出CancellationException视做结束异常, invokeOnCompletion也会执行(其中包含异常对象), 可是其余异常将不会执行invokeOnCompletion

取消GlobalScope

GlobalScope属于全局协程, 由他开启的协程都不拥有Job, 因此没法取消协程. 可是能够经过给GlobalScope开启的协程做用域指定Job而后就可使用Job取消协程

协程异常

经过CoroutineExceptionHandler函数能够建立一个同名的对象, 该接口继承自CoroutineContext, 一样经过制定上下文参数传递给全局协程做用域使用, 看成用域抛出异常时会被该对象的回调函数接收到, 而且不会抛出异常

  1. CoroutineExceptionHandler 只有做为最外层的父协程上下文才有效, 由于异常会层层上抛, 除非配合SupervisorJob监督做业禁止异常上抛, 子做用域的异常处理器才能捕获到异常

  2. CoroutineExceptionHandler异常处理器并不能阻止协程做用域取消, 只是监听到协程的异常信息避免JVM抛出异常退出程序而已

  3. 只要发生异常就会致使父协程和其全部子协程都被取消, 这种属于双向的异常取消机制, 后面提到的监督做业(SupervisorJob)属于单向向下传递(即不会向上抛出)

  4. CoroutineExceptionHandler会被做用域一直做为协程上下文向下传递给子做用域(除非子做用域单独指定)

(以下示例)不要尝试使用try/catch捕捉launch做用域的异常, 没法被捕捉.

try {
  launch {
    throw NullPointerException()
  }
} catch (e: Exception) {
  e.printStackTrace()
}
复制代码

后面专门介绍如何捕获协程异常避免抛出.

协程取消异常

取消协程的做业(Job)会引起异常, 可是会被默认的异常处理器给忽略, 可是咱们能够经过捕捉能够看到异常信息

fun main() = runBlocking<Unit> {
  val job = GlobalScope.launch {

    try {
      delay(1000)
    } catch (e: Exception) {
      e.printStackTrace()
    }
  }

  job.cancel(CancellationException("自定义一个用于取消协程的异常"))
  delay(2000)
}
复制代码

Job取消函数

public fun cancel(cause: CancellationException? = null)
复制代码
  • cause: 参数不传默认为JobCancellationException

全局协程做用域的异常处理

val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
       System.err.println("(Main.kt:41):main coroutineContext = $coroutineContext, throwable = $throwable")
}

GlobalScope.launch(exceptionHandler) { 

}
复制代码

子协程设置异常处理器是无效的, 即便设置了错误依然会抛到父协程从而而没有意义. 除非同时使用异常处理器+监督做业(SupervisorJob), 这样就是让子协程的错误不向上抛(后面详解监督做业), 从而被其内部的异常处理器来处理.

异常聚合和解包

全局协程做用域也存在嵌套子父级关系, 故异常可能也会依次抛出多个异常

fun main() = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
				// 第三, 这里的异常是第一个被抛出的异常对象
        println("捕捉的异常: $exception 和被嵌套的异常: ${exception.suppressed.contentToString()}")
    }
    val job = GlobalScope.launch(handler) {
        launch {
            try {
                delay(Long.MAX_VALUE)
            } finally { // 当父协程被取消时其全部子协程都被取消, finally被取消以前或者完成任务以后必定会执行
                throw ArithmeticException() // 第二, 再次抛出异常, 异常被聚合
            }
        }
        launch {
            delay(100)
            throw IOException() // 第一, 这里抛出异常将致使父协程被取消
        }
        delay(Long.MAX_VALUE)
    }
    job.join() // 避免GlobalScope做用域没有执行完毕JVM虚拟机就退出
}
复制代码

监督做业

通常状况子协程发生异常会致使父协程被取消, 同时父协程发生异常会取消全部的子协程; 可是有时候子协程发生异常咱们并不但愿父协程也被取消, 而是仅仅全部子协程取消(仅向下传递异常), 这个使用就是用SupervisorJob做业

建立监督做业对象

fun main() = runBlocking<Unit> {
    CoroutineScope(coroutineContext).launch {
        
        launch(SupervisorJob(coroutineContext[Job]) + CoroutineExceptionHandler { _, _ ->  }) {
            throw NullPointerException()
        }

        delay(500)
        println("( Process.kt:13 ) ")
    }

    println("( Process.kt:16 ) finish")
}
复制代码
  1. 必须添加CoroutineExceptionHandler处理异常, 不然异常依然会向上传递取消父协程

  2. 直接建立 SupervisorJob() 对象传入做用域中会致使该做用域和父协程生命周期不统一的问题, 即父协程取消之后该子协程依然处于活跃状态, 故须要指定参数为coroutineContext[Job]即传入父协程的做业对象

  3. SupervisorJob仅能捕捉内部协程做用域的异常, 没法直接捕捉内部协程

    supervisorScope {
        // throw NoSuchFieldException() 抛出崩溃
        
        launch {
             throw NoSuchFieldException() // 不会抛出
        }
    }
    复制代码

监督做业在withContextasync中添加无效

直接建立一个异常向下传递监督做业的做用域

public suspend fun <R> supervisorScope(block: suspend CoroutineScope.() -> R): R
复制代码
  • 该函数属于阻塞
  • 具有返回值
  • supervisorScope函数使用的依然是当前做用域的Job, 因此跟随当前做用域生命周期, 能够被取消
fun main() = runBlocking<Unit> {
    CoroutineScope(coroutineContext).launch {
        
      // 在该做用域内只要设置CoroutineExceptionHandler都仅会向下传递
        supervisorScope {
            launch(CoroutineExceptionHandler { _, _ ->  }) {
                throw NullPointerException()
            }

            launch {
                delay(1000) // 即便上面的launch抛出异常也会继续执行这里
                println("( Process.kt:18 ) ")
            }
        }
    }

    println("( Process.kt:16 ) finish")
}
复制代码

捕获异常

在做用域中的异常捕获和通常的异常捕获有所区别

  • CoroutineExceptionHandler能够捕获全部子做用域内异常
  • async可使用监督做业能够捕获内部发生的异常, 可是其await要求trycatch
  • launch要求监督做业配合异常处理器同时使用, 缺一不可
  • withContext/supervisorScope/coroutineScope/select能够trycatch捕获异常

原始协程

函数 回调字段 描述
suspendCoroutine Continuation Result
suspendCancellableCoroutine CancellableContinuation 可取消
suspendAtomicCancellableCoroutine CancellableContinuation 可取消

[Continuation]

public val context: CoroutineContext public fun resumeWith(result: Result<T>) 复制代码

[CancellableContinuation] -| Continuation

public val isActive: Boolean
public val isCompleted: Boolean
public val isCancelled: Boolean

public fun resume(value: T, onCancellation: (cause: Throwable) -> Unit)
public fun tryResume(value: T, idempotent: Any? = null): Any?
public fun tryResumeWithException(exception: Throwable): Any?
public fun completeResume(token: Any)

public fun cancel(cause: Throwable? = null): Boolean

public fun invokeOnCancellation(handler: CompletionHandler)
public fun CoroutineDispatcher.resumeUndispatched(value: T)
public fun CoroutineDispatcher.resumeUndispatchedWithException(exception: Throwable)
复制代码

线程不安全

解决线程不安全问题

  1. 互斥锁
  2. 切换线程实现单线程
  3. Channel

互斥

至关于Java中的Lock替代品: Mutex

建立互斥对象

public fun Mutex(locked: Boolean = false): Mutex
// p: 设置初始状态, 是否当即上锁
复制代码

使用扩展函数能够自动加锁和解锁

public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T
// owner: 钥匙
复制代码

函数

public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T
public fun holdsLock(owner: Any): Boolean
// owner是否被用于锁
public fun tryLock(owner: Any? = null): Boolean
// 使用owner来上锁, 若是owner已上锁则返回false
复制代码

Channel

  1. 多个做用域能够经过一个Channel对象来进行数据的发送和接收
  2. Channel设计参考Go语言的chan设计, 可用于控制做用域的阻塞和继续(经过配合select)

Channel属于接口没法直接建立, 咱们须要经过函数Channel()来建立其实现类

源码

public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
  when (capacity) {
    RENDEZVOUS -> RendezvousChannel() // 无缓存
    UNLIMITED -> LinkedListChannel() // 无限制
    CONFLATED -> ConflatedChannel()  // 合并
    BUFFERED -> ArrayChannel(CHANNEL_DEFAULT_CAPACITY) // 64
    else -> ArrayChannel(capacity) // 指定缓存大小
  }
复制代码
  • capacity

    缓冲大小, 默认0
    当Channel发送一条数据时就会挂起通道(不继续执行发送后续代码), 只有在接收这条数据时才会解除挂起继续执行; 可是咱们能够设置缓存大小
    复制代码

通道容许被遍历获取当前发送数据

val channel = Channel<Int>()

for (c in channel){

}
复制代码
public suspend fun yield(): Unit
复制代码

Channel

Channel接口同时实现发送渠道(SendChannel)和接收渠道(ReceiveChannel)两个接口, 因此既能发送又能接收数据

public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> 
复制代码

SendChannel

public val isClosedForSend: Boolean
// 是否关闭
public fun close(cause: Throwable? = null): Boolean
// 关闭发送通道

public fun offer(element: E): Boolean
// 推荐使用send函数
public suspend fun send(element: E)
// 发送消息

public fun invokeOnClose(handler: (cause: Throwable?) -> Unit)
// 当通道关闭时执行回调

public val onSend: SelectClause2<E, SendChannel<E>>
// 当即发送数据(若是容许), 在select中使用
复制代码
  • 发送通道关闭后不能继续使用ReceiveChannel接收数据, 会致使ClosedReceiveChannelException抛出

ReceiveChannel

public val isClosedForReceive: Boolean
// SendChannel是否已经关闭通道, 若是关闭通道之后还存在缓存则会接收完缓存以后返回false

public val isEmpty: Boolean // 通道是否为空

public fun poll(): E? // 推荐使用receive函数
public suspend fun receive(): E
// 接受通道事件

public val onReceive: SelectClause1<E> // 若是通道关闭, 抛出异常
public val onReceiveOrNull: SelectClause1<E?> // 废弃函数, 若是通道关闭返回null
public val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
// 在select中使用的监听器, 推荐使用第三个函数

public suspend fun receiveOrClosed(): ValueOrClosed<E>
// `ValueOrClosed`对象能够判断通道是否已关闭

public fun cancel(cause: CancellationException? = null)
// 关闭通道
复制代码
  1. 通道的发送和接收都会致使做用域被阻塞, 可是发送消息能够经过设置缓存让他不阻塞, 或者取消通道可让阻塞继续
  2. 通道只容许在挂起函数中发送和接收, 可是建立通道不限制
  3. 关闭通道会致使receive抛出异常
  4. SendChannel执行close函数后不容许再发送或者接收数据, 不然抛出异常
  5. Channel的send | receive函数所在做用域被取消cancel不会致使通道结束(isClosedForReceive返回false)
  6. receive接收而不是遍历则会致使卡住做用域

consume

ReceiveChannel不只能够经过迭代器来接收事件, 还可使用consume系列函数来接收事件; 本质上consume和迭代没有任何区别只是consume会在发生异常时自动取消通道(经过cancel函数);

源码

public inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R {
    var cause: Throwable? = null
    try {
        return block() // 直接返回
    } catch (e: Throwable) {
        cause = e
        throw e
    } finally {
        cancelConsumed(cause) // 若是发生异常取消通道
    }
}
复制代码

consumeEach函数仅是迭代接收事件且异常自动取消; 通常建议使用consume函数来接收事件

BroadcastChannel

这个通道和通常的通道区别在于他的每一个数据能够被每一个做用域所有接收到; 默认的通道一个数据被接收后其余的协程是没法再接收到数据的

广播通道经过全局函数建立对象

public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E>
复制代码

自己广播通道继承自SendChannel, 只能发送数据, 经过函数能够拿到接收通道

public fun openSubscription(): ReceiveChannel<E>
复制代码

取消通道

public fun cancel(cause: CancellationException? = null)
复制代码

将Channel转成BroadcastChannel

fun <E> ReceiveChannel<E>.broadcast( capacity: Int = 1, start: CoroutineStart = CoroutineStart.LAZY ): BroadcastChannel<E>
复制代码

经过扩展函数在协程做用域中快速建立一个广播发送通道

public fun <E> CoroutineScope.broadcast( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 1, start: CoroutineStart = CoroutineStart.LAZY, onCompletion: CompletionHandler? = null, @BuilderInference block: suspend ProducerScope<E>.() -> Unit ): BroadcastChannel<E> 
复制代码

迭代通道

接收通道实现操做符重载可使用迭代

public operator fun iterator(): ChannelIterator<E>
复制代码

示例

for (i in produce){
	// 收到每一个发型的消息
}
复制代码

当多个协程接收同一个渠道数据会依次轮流接收到数据, 渠道对于多个协程是公平的

Produce

上面介绍的属于建立Channel对象来发送和接收数据, 可是还能够经过扩展函数快速建立并返回一个具有发送数据的ReceiveChannel对象

public fun <E> CoroutineScope.produce( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, @BuilderInference block: suspend ProducerScope<E>.() -> Unit ): ReceiveChannel<E>
复制代码
  • context: 能够经过协程上下文决定调度器等信息
  • capacity: 初始化通道空间

ProducerScope 该接口继承自SendChannel以及CoroutineScope, 具有发送通道数据以及协程做用域做用;

当produce做用域执行完成会关闭通道, 前面已经说起关闭通道没法继续接收数据

等待取消

该函数会在通道被取消时回调其函数参数, 前面说起协程取消时能够经过finally来释放内存等操做, 可是通道取消没法使用finally只能使用该函数

public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) 
// [SendChannel.close] or [ReceiveChannel.cancel] 表明取消通道
复制代码

Actor

能够经过actor函数建立一个具有通道做用的协程做用域

public fun <E> CoroutineScope.actor( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, // todo: Maybe Channel.DEFAULT here? start: CoroutineStart = CoroutineStart.DEFAULT, onCompletion: CompletionHandler? = null, block: suspend ActorScope<E>.() -> Unit ): SendChannel<E>
复制代码
  • context: 协程上下文
  • capacity: 通道缓存空间
  • start: 协程启动模式
  • onCompletion: 完成回调
  • block: 回调函数中能够进行发送数据

该函数和produce函数类似,

  1. produce返回ReceiveChannel, 外部进行数据接收; actor返回的SendChannel, 外部进行数据发送
  2. actor的回调函数拥有属性channel:Channel, 既能够发送数据又能够接收数据, produce的属性channel属于SendChannel
  3. 不管是produce或者actor他们的通道都属于Channel, 既能够发送又能够接收数据, 只须要类型强转便可;
  4. 自己Channel能够进行双向数据通讯, 可是设计produce和actor属于设计思想中的生产者和消费者模式
  5. 他们都属于协程做用域和数据通道的结合

轮循器

不管是RxJava仍是协程都支持轮循器的功能, 在个人网络请求库中还赋予了轮循器暂停|继续|多个观察者|重置等功能

这里的协程轮循器就比较简陋

public fun ticker( delayMillis: Long, initialDelayMillis: Long = delayMillis, context: CoroutineContext = EmptyCoroutineContext, mode: TickerMode = TickerMode.FIXED_PERIOD ): ReceiveChannel<Unit>
复制代码

该通道返回的数据是Unit

默认状况下能够理解为通道会在指定间隔时间后一直发送Unit数据

fun main() = runBlocking<Unit> {
  
    val tickerChannel = ticker(delayMillis = 1000, initialDelayMillis = 0)

  // 每秒打印
    for (unit in tickerChannel) {
        System.err.println("unit = $unit")
    }
}
复制代码

可是若是下游不是在发送数据之后当即接收数据, 而是延迟使用receive函数来接收通道数据

TickerMode该枚举拥有两个字段

  • FIXED_PERIOD 默认值, 动态调节通道发送数据的时间间隔, 时间间隔能够看作是上游发送数据的
  • FIXED_DELAY 只有当接收数据后才会开始计算间隔时间, 时间间隔能够看作是下游接收数据的

这个轮循器不支持多订阅|暂停|继续|重置|完成, 可是个人Net库中Interval对象已实现全部功能

Select

select函数回调中监听多个Deferred/Channel的结果, 且只会执行最快接收数据的通道或者结果回调.

动做

在前面的函数介绍中能够看到一系列on{动做}变量, 他们的值所有是SelectClause{数字}接口对象;

[SelectBuilder]

public interface SelectBuilder<in R> {
    public operator fun SelectClause0.invoke(block: suspend () -> R)
    public operator fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R)
    public operator fun <P, Q> SelectClause2<P, Q>.invoke(param: P, block: suspend (Q) -> R)
    public operator fun <P, Q> SelectClause2<P?, Q>.invoke(block: suspend (Q) -> R) = invoke(null, block)
    @ExperimentalCoroutinesApi
    public fun onTimeout(timeMillis: Long, block: suspend () -> R)
}
复制代码

根据这定义的扩展函数就能够直接使用动做

对象 使用的函数
SelectClause0 onJoin
SelectClause1 OnReceive
SelectClause2 onSend

示例

@ObsoleteCoroutinesApi
@UseExperimental(InternalCoroutinesApi::class)
suspend fun selectMulti(a: Channel<Int>, b: Channel<Int>): String = select<String> {

    b.onReceive {
        "b $it" // 优先执行第一个, 不是函数缘由, 而是顺序
    }

    b.onReceiveOrClosed {
        "b $it"
    }
    
    a.onSend(23) {
        "发送 23"
    }
}

fun main() = runBlocking<Unit> {
    val a = Channel<Int>(1) // 缓冲数量, 避免发送数据时阻塞
    val b = Channel<Int>(1)
    
    launch {
        b.send(24)
        val s = selectMulti(a, b)
        println("结果 = $s")
    }
}
复制代码
  • onReceive 在关闭通道时会致使抛出异常, 若是不想抛出异常应当使用onReceiveOrClosed来替换
  • onSend 该函数等效于Channel.send, 就是发送一个值, 假设注册多个onSend确定是第一个先回调返回结果
  • 即便已经有成员被选中(select)也不会致使其余的成员协程做用域结束

[ValueOrClosed]

public val isClosed: Boolean // 通道是否已关闭

public val value: T
public val valueOrNull: T?
// 二者都是获取通道内的值, 可是第2个若是通道关闭不会抛出异常而是返回NULL
复制代码
  1. 当在select中一个通道同时存在发送和接收监听时, 若是二者都执行到(即select没有被打断都执行到)会致使异常抛出
  2. 若是通道重复监听(多个动做), 优先执行第一个
  3. 关闭通道一样会收到数据, onReceive抛出异常, onReceiveOrClose数据为null

Flow

Flow类似于RxJava一样分为三个部分:

  1. 上游
  2. 操做符
  3. 下游

下游接收事件要求在协程做用域内执行(suspend函数)

建立Flow

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T>
复制代码

示例

fun shoot() = flow {
    for (i in 1.;3) {
        delay(1000) // 伪装咱们在这里作了一些有用的事情
        emit(i) // 发送下一个值
    }
}
复制代码
  • 集合或者Sequence均可以经过asFlow函数转成Flow对象

  • 也能够像建立集合同样经过fowOf直接建立Flow对象

  • Channel通道转成Flow

    public fun <T> ReceiveChannel<T>.consumeAsFlow(): Flow<T>                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   
    复制代码
  • 甚至挂起函数也能够转成Flow

    public fun <T> (suspend () -> T).asFlow(): Flow<T>
    复制代码

collect和flow的回调函数自己属于suspend函数能够开启协程做用域

建立Flow的函数

函数 描述
flow 普通Flow
channelFlow 建立通道, 其支持缓冲通道, 容许不一样的CorotineContext发送事件
callbackFlow 与channelFlow函数除了不使用awaitClose会报错之外没有区别
emptyFlow 空的Flow
flowOf 直接发送数据

flow的发射函数emit不是线程安全的不容许其余线程调用, 若是须要线程安全请使用channelFlow而不是flow

channelFlow使用send函数发送数据

发射数据示例

flow<Int> {
  emit(23)
}

channelFlow<Int> {
  send(23) // offer(23)
}
复制代码
  1. offer能够在非suspend函数中使用, send必须在suspend函数中使用
  2. offer存在一个返回值, 假设没有元素空间则会直接返回false, send则会挂起阻塞等待新的元素空间.

Flow在取消做用域时释放资源可使用callbackFlow. 这里演示注册和取消一个广播AppWidgetProvider

callbackFlow<Int> {
  val appWidgetProvider = AppWidgetProvider()
  registerReceiver(appWidgetProvider, IntentFilter()) // 注册
  awaitClose {  // 该回调会在协程做用域被取消时回调
    unregisterReceiver(appWidgetProvider) // 注销
  }
}.collect { 

}
复制代码

收集

收集数据

Flow是冷数据, 要求调用函数collect收集数据时才会进行数据的发射; 该系列函数也成为末端操做符;

flow {
  emit(23)
}.collect {
	System.err.println("(Demo.kt:9) it = $it")
}
复制代码

查看源码会发现这个emit实际上就是执行collect的参数函数

collect函数表示接收上游发送的数据

public suspend fun Flow<*>.collect() 
// 不作任何处理的收集器, 仅仅为了触发发射数据

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit
// 收集

public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit)
// 和上个函数的区别是: 若是在下游没有处理完状况下上游继续下个发射会致使上次的下游被取消

public suspend inline fun <T> Flow<T>.collectIndexed( crossinline action: suspend (index: Int, value: T) -> Unit ): Unit
// 具有索引和值的收集器

public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job
// 将Flow运行在指定的协程做用域内
复制代码

[FlowCollector] 发射器

public suspend fun emit(value: T)
// 发送一个数据

public suspend inline fun <T> FlowCollector<T>.emitAll(flow: Flow<T>)
// 发射另外一个flow对象
复制代码

调度器

调度器

Flow默认使用的是其所在的当前线程或者协程上下文, Flow不容许在内部使用withContext来切换调度器, 而是应该使用flowOn函数

public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>
复制代码

该函数改变的是Flow函数内部发射时的线程, 而在collect收集数据时会自动切回建立Flow时的线程

缓存

不须要等待收集执行就当即执行发射数据, 只是数据暂时被缓存而已, 提升性能

默认切换调度器时会自动缓存

public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T>
复制代码

合并函数, 这个函数实际上就是buffer, 当下游没法及时处理上游的数据时会丢弃掉该数据

public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
复制代码

合并

将多个事件合并后发送给下游

zip

将两个Flow在回调函数中进行处理返回一个新的值 R

当两个flow的长度不等时只发送最短长度的事件

public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>
复制代码

示例

val nums = (1.;3).asFlow().onEach { delay(300) } // 发射数字 1.;3,间隔 300 毫秒
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒发射一次字符串
val startTime = System.currentTimeMillis() // 记录开始的时间
nums.zip(strs) { a, b -> "$a -> $b" } // 使用“zip”组合单个字符串
    .collect { value -> // 收集并打印
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 
复制代码

combine

public fun <T1, T2, R> Flow<T1>.combine( flow: Flow<T2>, transform: suspend (a: T1, b: T2) -> R ): Flow<R>
// 组合两个流,在通过第一次发射之后,任意方有新数据来的时候就能够发射,另外一方有多是已经发射过的数据

public fun <T1, T2, R> Flow<T1>.combineTransform( flow: Flow<T2>, @BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit ): Flow<R>
复制代码

集合

Flow直接转成集合函数

public suspend fun <T> Flow<T>.toList(destination: MutableList<T> = ArrayList()): List<T>
public suspend fun <T> Flow<T>.toSet(destination: MutableSet<T> = LinkedHashSet()): Set<T>
public suspend fun <T, C : MutableCollection<in T>> Flow<T>.toCollection(destination: C): C
复制代码

叠加

public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S

public suspend inline fun <T, R> Flow<T>.fold( initial: R, crossinline operation: suspend (acc: R, value: T) -> R ): R
// `acc`为上次回调函数返回值, 第一次为初始值, 等同于叠加效果; 该函数和reduce的区别就是支持初始值; reduce累计两次元素才会回调函数

public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T>
public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R>
public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R>
复制代码

转换

public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R>
public inline fun <T, R: Any> Flow<T>.mapNotNull(crossinline transform: suspend (value: T) -> R?): Flow<R>

public fun <T, R> Flow<T>.flatMapMerge( concurrency: Int = DEFAULT_CONCURRENCY, transform: suspend (value: T) -> Flow<R> ): Flow<R> = map(transform).flattenMerge(concurrency)
// 上游先发送全部的元素, 而后上游每一个元素会致使回调函数中的Flow发送全部元素一次

public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R>
// 等同于RxJava的FlatMap

public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T>
// 串行收集数据

public fun <T> Flow<Flow<T>>.flattenMerge( concurrency: Int = DEFAULT_CONCURRENCY ): Flow<T>
// 并发收集数据

public inline fun <T, R> Flow<T>.flatMapLatest( @BuilderInference crossinline transform: suspend (value: T) -> Flow<R> ): Flow<R> 
// 在每次 emit 新的数据之后,会取消先前的 collect

public fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>>
// 包含元素索引
复制代码

生命周期

public fun <T> Flow<T>.onStart( action: suspend FlowCollector<T>.() -> Unit ): Flow<T>
// 开始

public fun <T> Flow<T>.onCompletion( action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit ): Flow<T>
// 回调函数中的参数`cause`若是为null表示正常完成没有抛出异常, 反之则抛出异常非正常结束, 
// 和catch函数同样只能监听到上游发生的异常, 可是没法避免异常抛出只能在异常抛出以前执行回调函数

public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(cause: Throwable) -> Unit): Flow<T>
// 该函数只能捕获上游异常, 若是异常处于函数调用之下则依然会被抛出
复制代码

过滤

限制流发送

public fun <T> Flow<T>.take(count: Int): Flow<T>
// 只接受指定数量事件
public fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T>

public fun <T> Flow<T>.drop(count: Int): Flow<T>
// 丢弃指定数量事件
public fun <T> Flow<T>.dropWhile(predicate: suspend (T) -> Boolean): Flow<T>
// 回调函数判断是否丢弃或者接收, 只要丢弃或者接收后面就不会继续发送事件(结束流)

public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T>
public inline fun <T> Flow<T>.filterNot(crossinline predicate: suspend (T) -> Boolean): Flow<T>
public inline fun <reified R> Flow<*>.filterIsInstance(): Flow<R> = filter { it is R } as Flow<R>
public fun <T: Any> Flow<T?>.filterNotNull(): Flow<T>

public suspend fun <T> Flow<T>.single(): T
// 期待只有一个元素, 不然抛出`IllegalStateException`
public suspend fun <T: Any> Flow<T>.singleOrNull(): T?
// 不抛出异常, 但若是不是仅有元素则返回null

public suspend fun <T> Flow<T>.first(): T
// 若是不存在一个元素则会抛出`NoSuchElementException`
public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T
// 返回回调函数判断为true的第一个条件符合的元素
复制代码

重试

public fun <T> Flow<T>.retry( retries: Long = Long.MAX_VALUE, // 重试次数 predicate: suspend (cause: Throwable) -> Boolean = { true } ): Flow<T>

public fun <T> Flow<T>.retryWhen( predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean ): Flow<T>
复制代码

过滤

public inline fun <T, R> Flow<T>.transform( @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit ): Flow<R>
// 转换函数, 能够在回调函数中发送新的元素

public fun <T, R> Flow<T>.transformLatest( @BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Unit ): Flow<R>
复制代码

scan和reduce的区别在于

  • reduce是所有叠加计算完成后被收集
  • scan是每次叠加一次后收集一次数据

StateFlow/SharedFlow

类关系

SharedFlow

|- MutableSharedFlow

|- StateFlow

​ |- MutableStateFlow

SharedFlow属于热流数据, 既没有收集(collect)状况下也会发送, 而后在收集时进行重放(replay). 可使用shareIn将冷流转成热流. 也能够直接使用如下函数建立

public fun <T> MutableSharedFlow( replay: Int = 0, // 重放数量 extraBufferCapacity: Int = 0, // 缓存数量(不包含重放数量) onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ): MutableSharedFlow<T>
复制代码

使用BufferOverflow

  1. DROP_LATEST 丢弃最新值
  2. DROP_OLDEST 丢失最旧值
  3. SUSPEND 挂起阻塞

StateFlow能够看作在Flow的基础上加上了LiveData的特性. 可是没法不存在生命周期跟随, 一直均可以收集数据

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
) : ViewModel() {

    // Backing property to avoid state updates from other classes
    private val _uiState = MutableStateFlow(LatestNewsUiState.Success(emptyList()))
    // The UI collects from this StateFlow to get its state updates
    val uiState: StateFlow<LatestNewsUiState> = _uiState

    init {
        viewModelScope.launch {
            newsRepository.favoriteLatestNews
                // Update View with the latest favorite news
                // Writes to the value property of MutableStateFlow,
                // adding a new element to the flow and updating all
                // of its collectors
                .collect { favoriteNews ->
                    _uiState.value = LatestNewsUiState.Success(favoriteNews)
                }
        }
    }
}
复制代码

示例

将flow从冷流转换成热流使用函数shareIn

public fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
)
复制代码

SharingStarted:

  1. WhileSubscribed 在第一个订阅者出现后开始共享数据,并使数据流永远保持活跃状态
  2. Lazily 存在订阅者时,将使上游提供方保持活跃状态
  3. Eagerly 当即启动提供方

Android

Google发行的Jetpack库中不少组件都附有KTX扩展依赖, 这种依赖主要是增长kotlin和协程支持

Lifecycle

官方提供生命周期协程做用域的快速建立实现;

  • 指定生命周期运行协程
  • 自动在onDestory取消协程

引入ktx依赖库

implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.2.0-rc03"
复制代码

当执行到某个生命周期时运行协程

fun launchWhenCreated(block: suspend CoroutineScope.() -> Unit): Job

fun launchWhenStarted(block: suspend CoroutineScope.() -> Unit): Job

fun launchWhenResumed(block: suspend CoroutineScope.() -> Unit): Job

suspend fun <T> Lifecycle.whenStateAtLeast( minState: Lifecycle.State, block: suspend CoroutineScope.() -> T )
复制代码

这些函数都属于LifecycleLifecycleOwner的扩展函数

LiveData

依赖

implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0-rc03"
复制代码

提供开发者使用的只有这两个函数, 两个函数功能同样, 只是每一个参数接收时间单位不一致

fun <T> liveData( context: CoroutineContext = EmptyCoroutineContext, timeoutInMs: Long = DEFAULT_TIMEOUT, @BuilderInference block: suspend LiveDataScope<T>.() -> Unit ): LiveData<T> = CoroutineLiveData(context, timeoutInMs, block)

fun <T> liveData( context: CoroutineContext = EmptyCoroutineContext, timeout: Duration, @BuilderInference block: suspend LiveDataScope<T>.() -> Unit ): LiveData<T> = CoroutineLiveData(context, timeout.toMillis(), block)
复制代码
  • timeout: 若是liveData的没有处于活跃的观察者则在指定的时间内(单位毫秒)会取消其做用域[block]
  • block: 该做用域只在活跃状态才会触发, 默认在Dispatchers.Main.immediate调度器

liveData做用域具有发射数据和LiveData的做用

interface LiveDataScope<T> {
    /** * Set's the [LiveData]'s value to the given [value]; If you've called [emitSource] previously, * calling [emit] will remove that source. * * Note that this function suspends until the value is set on the [LiveData]; * * @param value The new value for the [LiveData] * * @see emitSource */
    suspend fun emit(value: T)

    /** * Add the given [LiveData] as a source, similar to [MediatorLiveData.addSource]; Calling this * method will remove any source that was yielded before via [emitSource]; * * @param source The [LiveData] instance whose values will be dispatched from the current * [LiveData]; * * @see emit * @see MediatorLiveData.addSource * @see MediatorLiveData.removeSource */
    suspend fun emitSource(source: LiveData<T>): DisposableHandle

    /** * References the current value of the [LiveData]; * * If the block never `emit`ed a value, [latestValue] will be `null`; You can use this * value to check what was then latest value `emit`ed by your `block` before it got cancelled. * * Note that if the block called [emitSource], then `latestValue` will be last value * dispatched by the `source` [LiveData]; */
    val latestValue: T?
}
复制代码
  1. 若是emitSource在emit以前执行则无效
  2. 该做用域会在每次处于活跃状态时都执行一遍, 若是将应用从后台切换到前台则会返回执行该做用域, 可是观察者只会在活跃时才收到数据
相关文章
相关标签/搜索