Kotlin协程: Coroutine/Channel/Flow 以及实际应用

协程这个概念在1958年就开始出现, 目前某些语言开始原生支持(目前主流语言我感受只有Java彻底不支持协程). Java没有原生协程可是能够大型公司都本身或者使用第三方库来支持协程编程, 可是Kotlin原生支持协程.java

Android领域的网络请求库通常由Rxjava实现, 包括我本身写的网络请求库一样也是采用的RxJava. 可是这些RxJava实现的网络请求库一样很难方便的实现并发android

我认为协程的核心就是一个词: 做用域, 理解什么是做用域就理解协程了git

什么是协程:github

线程和协程的关系属于一对多关系, 一个线程上容许存在多个协程, 即主线程你也能异步执行代码. 可是让某个线程执行太多协程效率过低下, 因此针对不一样的场景建议使用调度器切换线程, 使用协程开始就不须要考虑线程的问题, 只须要在不一样场景使用不一样的调度器(调度器会对特定任务进行优化)就好, 协程英文名是Coroutine.数据库

特性

使用场景编程

假设首页存在七个接口网络请求(后端人员处理差)的状况一个个使用串行网络请求的时间比并发网络请求慢了接近七倍.后端

目前计算机都是经过多核CPU提高计算能力, 因此熟练掌握并发编程是将来的趋势缓存

协程优点安全

  1. 并发实现方便
  2. 没有回调嵌套发生, 代码结构清晰
  3. 建立协程性能开销优于建立线程, 一个线程能够运行多个协程, 单线程便可异步

实验特性bash

协程在Kotlin1.3时候放出正式版本, 可是目前仍然存在不稳定的函数变更, 不过这个我认为不影响项目中实际使用

@FlowPreview 表明可能之后存在Api函数变更

@ExperimentalCoroutinesApi  表明目前可能存在不稳定的因素的函数

@ObsoleteCoroutinesApi 可能存在被废弃的可能
复制代码

Kotlin的协程主要构成分为三部分

  1. CoroutineScope 协程做用域: 每一个协程体都存在一个做用域, 异步仍是同步由该做用域决定
  2. Channel 通道: 数据如同一个通道进行发送和接收, 能够在协程之间互相传递数据
  3. Flow 响应流: 相似RxJava等结构写法

为方便网络请求和简化异步做用域开启可使用我实现的一个库: Net

1.0+版本为RxJava实现, 2.0+版本为Coroutine实现

本文章后续会根据Kotlin的版本中的协程迭代进行更新

经常使用事件分发框架为EventBus或者RxBus, 我以前使用RxJava的时候也写了RxBus来使用, 使用协程后我又用协程实现一个: Channel

展望

协程对于后端高并发优点很大, 相信Spring的Kt版本后续会跟进

至于Google的Jetpack基本上都有针对协程扩展

咱们公司项目属于 MVVM+Kotlin+Coroutine+JetPack, 最明显的是并发网络请求速度翻倍. 同时代码更加结构清晰

建立协程

开启主协程的三种方式

生命周期和App一致, 没法取消(不存在Job), 不存在线程阻塞

fun main() {
    GlobalScope.launch { // 在后台启动一个新的协程并继续
        delay(1000L)
        println("World!")
    }
    Thread.sleep(2000) // 防止JVM虚拟机退出
}
复制代码

不存在线程阻塞, 能够取消, 能够经过CoroutineContext控制协程生命周期

fun main() {
    CoroutineScope(Dispatchers.IO).launch {
    }
    Thread.sleep(1000)
}
复制代码

线程阻塞, 适用于单元测试, 不须要延迟阻塞防止JVM虚拟机退出. runBlocking属于全局函数能够在任意地方调用

通常咱们在项目中是不会使用runBlocking, 由于阻塞主线程没有开启的任何意义

fun main() = runBlocking { 
    // 阻塞线程直到协程做用域内部全部协程执行完毕
}
复制代码

建立做用域

协程内部还可使用函数建立其余协程做用域, 分为两种建立函数:

  1. CoroutineScope的扩展函数, 只有在做用域内部才能建立其余的做用域
  2. suspend修饰的函数内部
  3. 协程永远会等待其内部做用域内全部协程都执行完毕后才会关闭协程

在主协程内还能够建立子协程做用域, 建立函数分为两种

  1. 阻塞做用域(串行): 会阻塞当前做用域

  2. 挂起做用域(并发): 不会阻塞当前做用域

同步做用域函数

都属于suspend函数

  • withContext: 能够切换调度器, 有返回结果
  • coroutineScope: 建立一个协程做用域, 该做用域会阻塞当前所在做用域而且等待其子协程执行完才会恢复, 有返回结果
  • supervisorScope: 使用SupervisorJob的coroutineScope, 异常不会取消父协程
public suspend fun <T> withContext( context: CoroutineContext, block: suspend CoroutineScope.() -> T
): T
// 返回结果. 能够和当前协程的父协程存在交互关系, 主要做用为来回切换调度器

public suspend inline operator fun <T> CoroutineDispatcher.invoke( noinline block: suspend CoroutineScope.() -> T
): T = withContext(this, block)
// withContext工具函数而已

public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R

public suspend fun <R> supervisorScope(block: suspend CoroutineScope.() -> R): R

public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T
复制代码

异步做用域函数

这两个函数都不属于suspend, 只须要CoroutineScope就能够调用

  • launch: 异步并发, 没有返回结果
  • async: 异步并发, 有返回结果
public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit
): Job


public fun <T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T
): Deferred<T>

复制代码

并发

同一个协程做用域中的异步任务遵照顺序原则开始执行. 适用于串行网络请求, 在一个异步任务须要上个异步任务的结果时.

协程挂起须要时间, 因此异步协程永远比同步代码执行慢

fun main() = runBlocking<Unit> {
    launch {
        System.err.println("(Main.kt:34) 后执行")
    }

    System.err.println("(Main.kt:37) 先执行")
}
复制代码

当在协程做用域中使用async函数时能够建立并发任务

public fun <T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T
): Deferred<T>
复制代码

示例

fun main() = runBlocking<Unit> {
    val name = async { getName() }
    val title = async { getTitle() }

    System.err.println("(Main.kt:35) result = ${name.await() + title.await()}")
    delay(2000)
}
复制代码
  1. 返回对象Deferred. 经过函数await获取结果值.
  2. Deferred集合还可使用awaitAll()等待所有完成.
  3. 不执行await任务也会等待执行完协程关闭
  4. 若是Deferred不执行await则async内部抛出的异常不会被logCat和tryCatch捕获, 可是依然会致使做用域取消和异常崩溃. 但当执行await时异常信息会从新抛出.

惰性并发

将async函数中的start设置为CoroutineStart.LAZY时则只有调用Deferred对象的await时才会开始执行异步任务(或者执行start函数).

启动模式

  1. DEFAULT 当即执行
  2. LAZY 直到Job执行start或者join才开始执行
  3. ATOMIC 在做用域开始执行以前没法取消
  4. UNDISPATCHED 不执行任何调度器, 直接在当前线程中执行, 可是会根据第一个挂起函数的调度器切换

异常

协程中发生异常, 则父协程取消而且父协程其余的子协程一样所有取消

Deferred

继承自Job

提供一个全局函数用于建立CompletableDeferred对象, 该对象能够实现自定义Deferred功能

示例

fun main() = runBlocking<Unit> {
    val deferred = CompletableDeferred<Int>()
    
    launch {
        delay(1000 )
        deferred.complete(23)
    }

    System.err.println("(Demo.kt:72) 结果 = ${deferred.await()}")
}
复制代码

建立CompletableDeferred的全局函数

public fun <T> CompletableDeferred(parent: Job? = null): CompletableDeferred<T>

public fun <T> CompletableDeferred(value: T): CompletableDeferred<T>
复制代码

CompletableDeferred函数

public fun complete(value: T): Boolean
// 结果

public fun completeExceptionally(exception: Throwable): Boolean
// 抛出异常, 异常发生在`await()`时
复制代码

CoroutineScope

建立此对象表示建立一个协程做用域

结构化并发

若是你看协程的教程可能会常常看到这个词, 这就是做用域内部开启新的协程. 父协程会限制子协程的生命周期, 子协程承接父协程的上下文, 这种层级关系就是结构化并发.

在一个协程做用域里面开启多个子协程进行并发行为

CoroutineContext

协程上下文, 我认为协程上下文能够看作包含协程基本信息的一个Context(上下文). 其能够决定协程的名称或者运行

建立一个新的调度器

fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher

fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher
复制代码

建立新的调度器比较消耗资源, 建议复用且当不须要的时候使用close函数释放

调度器

Dispatchers继承自CoroutineContext, 该枚举拥有三个实现. 表示不一样的线程调度. 当函数不使用调度器时承接当前做用域的调度器

  1. Dispatchers.Unconfined 不指定线程,

    若是子协程切换线程那么接下来的代码也运行在该线程上
    复制代码
  2. Dispatchers.IO

    适用于IO读写
    复制代码
  3. Dispatchers.Main

    根据平台不一样而有所差, Android上为主线程
    复制代码
  4. Dispatchers.Default 默认调度器

    在线程池中执行协程体, 适用于计算操做
    复制代码

当即执行

Dispatchers.Main.immediate
复制代码

immediate属于全部调度器都有的属性, 该属性表明着若是当前正处于该调度器中不执行调度器切换直接执行, 能够理解为在同一调度器内属于同步协程做用域

例如launch函数开启做用域会比后续代码执行顺序低, 可是使用该属性协程属于顺序执行

示例

CoroutineScope(Job() + Dispatchers.Main.immediate).launch {
	// 执行顺序 1
}

// 执行顺序 2

CoroutineScope(Job() + Dispatchers.Main).launch {
		// 执行顺序 4
}

// 执行顺序 3
复制代码

协程命名

经过建立一个CoroutineName对象, 在构造函数中指定参数为协程名称, CoroutineName继承自CoroutineContext.

launch(CoroutineName("吴彦祖")){

}
复制代码

协程上下文名称用于方便调试使用

协程挂起

yield函数可让当前协程暂时挂起执行其余协程体, 若是没有其余正在并发的协程体则继续执行当前协程体(至关于无效调用).

public suspend fun yield(): Unit
复制代码

JOB

在协程中Job一般被称为做业, 表示一个协程工做任务, 他一样继承自CoroutineContext

val job = launch {

}
复制代码

Job属于接口

interface Job : CoroutineContext.Element
复制代码

函数

public suspend fun join()
// 等待协程执行完毕都阻塞当前线程

public fun cancel(cause: CancellationException? = null)
// 取消协程

public suspend fun Job.cancelAndJoin()
// 阻塞而且在协程结束之后取消协程

public fun start(): Boolean

public val children: Sequence<Job>
// 所有子做业

public fun invokeOnCompletion( onCancelling: Boolean = false, // true则取消做用域不会执行 invokeImmediately: Boolean = true, //  handler: CompletionHandler): DisposableHandle

public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
// 当其做用域完成之后执行, 主协程指定才有效, 直接给CoroutineScope指定时无效的
// 手动抛出CancellationException一样会赋值给cause

public fun getCancellationException(): CancellationException

public val onJoin: SelectClause0
public val children: Sequence<Job>
// 后面说起的选择器中使用
复制代码

状态

经过字段能够获取JOB当前处于状态

public val isActive: Boolean

public val isCancelled: Boolean

public val isCompleted: Boolean
复制代码

扩展函数

public fun Job.cancelChildren(cause: CancellationException? = null)

public suspend fun Job.cancelAndJoin()
复制代码

每一个协程做用域都存在coroutineContext. 而协程上下文中都存在Job对象

coroutineContext[Job]
复制代码

结束协程

若是协程做用域内存在计算任务(一直打日志也算)则没法被取消, 若是使用delay函数则能够被取消.

fun main() = runBlocking<Unit> {

  val job = launch(Dispatchers.Default) {
    while (true){
      delay(100) // 这行代码存在则能够成功取消协程, 不存在则没法取消
      System.err.println("(Main.kt:30) ")
    }
  }
  
  delay(500)
  job.cancel() 
  System.err.println("(Main.kt:42) 结束")
}
复制代码

经过使用协程内部isActive属性来判断是否应该结束

fun main() = runBlocking<Unit> {

    val job = launch(Dispatchers.Default) {
        while (isActive) { // 一旦协程被取消则为false
            System.err.println("(Main.kt:30) ")
        }
    }

    delay(500)
    job.cancel()
    System.err.println("(Main.kt:42) 结束")
}
复制代码

释放资源

协程存在被手动取消的状况, 可是有些资源须要在协程取消的时候释放资源, 这个操做能够在finally中执行.

不管如何finally都会被执行

fun main() = runBlocking<Unit> {

    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        try {
            repeat(1000){
                System.err.println("(Main.kt:31) it = $it")
                delay(500)
            }
        } finally {
           // 已被取消的协程没法继续挂起
        }
    }
    delay(1500)
    job.cancel()
    System.err.println("(Main.kt:42) ")
}
复制代码

再次开启协程

经过withContextNonCancellable能够在已被取消的协程中继续挂起协程. 这种用法其实能够看作建立一个没法取消的任务

withContext(NonCancellable) {
                println("job: I'm running finally")
                delay(1000L)
                println("job: And I've just delayed for 1 sec because I'm non-cancellable")
            }
复制代码

上下文组合

协程做用域能够接收多个CoroutineContext做为上下文参数. CoroutineContext自己属于接口, 不少上下文相关的类都实现与他.

配置多个CoroutineContext能够经过+符号同时指定多个协程上下文, 每一个实现对象可能包含一部分信息能够存在覆盖行为故相加时的顺序存在覆盖行为.

val a = CoroutineScope(SupervisorJob() + coroutineContext).launch(handler) {
  delay(1000)
  System.err.println("(Main.kt:51) ${Thread.currentThread()}")
}
复制代码
launch(Dispatchers.IO + CoroutineName("吴彦祖")){}
复制代码

协程局部变量

使用ThreadLocal能够获取线程的局部变量, 可是要求使用扩展函数asContextElement转为协程上下文做为参数传入在建立协程的时候.

该局部变量做用于持有该协程上下文的协程做用域内

public fun <T> ThreadLocal<T>.asContextElement(value: T = get()): ThreadContextElement<T> =
    ThreadLocalElement(value, this)
复制代码

超时

public suspend fun <T> withTimeout(timeMillis: Long, block: suspend CoroutineScope.() -> T): T
// 超过指定时间timeMillis自动结束协程. 
// 当没有超时时返回值获取而且继续执行协程. 
// 当超时会抛出异常TimeoutCancellationException, 可是不会致使程序结束

public suspend fun <T> withTimeoutOrNull(timeMillis: Long, block: suspend CoroutineScope.() -> T): T?
// 若是超时不会结束协程而是返回null
复制代码

没法手动抛出TimeoutCancellationException, 由于其构造函数私有

全局协程做用域

全局协程做用域属于单例对象, 整个JVM虚拟机只有一份实例对象. 他的寿命周期也跟随JVM. 使用全局协程做用域的时候注意避免内存泄漏

public object GlobalScope : CoroutineScope {
    /** * Returns [EmptyCoroutineContext]. */
    override val coroutineContext: CoroutineContext
        get() = EmptyCoroutineContext
}
复制代码

全局协程做用域不继承父协程做用域的上下文, 因此也不会由于父协程被取消而自身被取消.

启动模式

  • DEFAULT 当即执行协程体
  • ATOMIC 当即执行协程体,但在开始执行协程以前没法取消协程
  • UNDISPATCHED 当即在当前线程执行协程体,第一个挂起函数执行在函数所在线程, 后面执行在函数指定线程
  • LAZY 手动执行startjoin才会执行协程

协程取消

协程体若是已经执行实际上属于不可取消的, 在协程体中经过isActive来判断协程是否处于活跃中

经过取消函数的参数指定异常CancellationException能够自定义异常对象

不可取消的协程做用域

NonCancellable该单例对象用于withContext函数建立一个没法被取消的协程做用域

withContext(NonCancellable) {
  delay(2000)
}
复制代码

示例

fun main() = runBlocking {
    launch {
        delay(1000)
        System.err.println("(Main.kt:19) ")
    }

    launch {
        withContext(NonCancellable) {
            delay(2000)
            System.err.println("(Main.kt:26) ")
        }
    }

    delay(500) // 防止launch还未开启withContext就被取消
    cancel()
}
复制代码

GlobalScope取消

GlobalScope属于全局协程, 由他开启的协程都不拥有Job, 因此没法取消协程. 可是能够经过给GlobalScope开启的协程做用域指定Job而后就可使用Job取消协程.

协程异常

经过CoroutineExceptionHandler函数能够建立一个同名的对象, 该接口继承自CoroutineContext. 一样经过制定上下文参数传递给全局协程做用域使用, 看成用域抛出异常时会被该对象的回调函数接收到, 而且不会抛出异常.

只要发生异常就会致使父协程和其全部子协程都被取消, 这种属于双向的异常取消机制, 后面提到的监督做业属于发生异常只会取消全部子协程, 属于单向.

CoroutineExceptionHandler异常处理器并不能阻止协程取消, 只是监听到协程的异常信息避免JVM抛出异常退出程序而已

不要尝试直接使用try/catch捕捉协程做用域的异常

try {
  launch {
    throw NullPointerException()
  }
} catch (e: Exception) {
  e.printStackTrace()
}
复制代码

错误示例, 没法捕捉到异常

协程取消异常

取消协程的做业(Job)会引起异常, 可是会被默认的异常处理器给忽略, 可是咱们能够经过捕捉能够看到异常信息

fun main() = runBlocking<Unit> {
    val job = GlobalScope.launch {

        try {
            delay(1000)
        } catch (e: Exception) {
            e.printStackTrace()
        }
    }

    job.cancel(CancellationException("自定义一个用于取消协程的异常"))

    delay(2000)
}
复制代码

Job取消函数

public fun cancel(cause: CancellationException? = null)
复制代码
  • cause: 参数不传默认为JobCancellationException

全局协程做用域的异常处理

CoroutineExceptionHandler没法捕捉async|produce抛出的异常

val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
                                                  System.err.println("(Main.kt:41):main coroutineContext = $coroutineContext, throwable = $throwable")
                                                 }

GlobalScope.launch(exceptionHandler) { 

}
复制代码

子协程没法设置异常处理器, 即便设置了也会被父协程覆盖而没有意义. 除非使用异常处理器+Job对象, 可是第一个子协程launch容许设置

异常聚合和解包

全局协程做用域也存在嵌套子父级关系, 故异常可能也会依次抛出多个异常

fun main() = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
				// 这里的异常是第一个被抛出的异常对象
        println("捕捉的异常: $exception 和被嵌套的异常: ${exception.suppressed.contentToString()}")
    }
    val job = GlobalScope.launch(handler) {
        launch {
            try {
                delay(Long.MAX_VALUE)
            } finally { // 当父协程被取消时其全部子协程都被取消, finally被取消以前或者完成任务以后必定会执行
                throw ArithmeticException() // 再次抛出异常, 异常被聚合
            }
        }
        launch {
            delay(100)
            throw IOException() // 这里抛出异常将致使父协程被取消
        }
        delay(Long.MAX_VALUE)
    }
    job.join() // 避免GlobalScope做用域没有执行完毕JVM虚拟机就退出
}
复制代码

监督做业

通常状况子协程发生异常会致使父协程被取消, 同时父协程发生异常会取消全部的子协程. 可是有时候子协程发生异常咱们并不但愿父协程也被取消, 而是仅仅全部子协程取消, 这个使用就是用SupervisorJob做业.

建立监督做业对象

val job = SupervisorJob()

launch(job) {  }
复制代码

监督做业在withContextasync中添加无效

直接建立 SupervisorJob() 对象传入做用域中会致使该做用域和父协程生命周期不统一的问题, 即父协程取消之后该子协程依然处于活跃状态.

该函数能够解决我上面提到的生命周期不统一问题, 直接建立向下传播异常的协程做用域

public suspend fun <R> supervisorScope(block: suspend CoroutineScope.() -> R): R
复制代码
  • 该函数属于阻塞
  • supervisorScope函数使用的依然是当前做用域的Job, 因此跟随当前做用域生命周期, 能够被取消.
  • 返回值
  • 该做用域内若是不trycatch异常否仍是会致使做用域被取消(在设置CoroutineExceptionHandler状况下)

下面我写个即便抛出异常也不会致使协程取消的

supervisorScope {
  try {
    throw NullPointerException()
  } catch (e: Exception) {
    e.printStackTrace()
  }
}
复制代码

线程不安全

解决线程不安全问题

  1. 单协程上下文操做
  2. 互斥锁
  3. 切换线程实现单线程
  4. Channel

互斥

至关于Java中的Lock替代品: Mutex

建立互斥对象

public fun Mutex(locked: Boolean = false): Mutex
// `locked`是否当即上锁
复制代码

使用扩展函数能够自动加锁和解锁

public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T
// owner 钥匙
复制代码

Channel

经过在不一样的协程内部使用Channel实例能够数据通信. 通道能够连续顺序传输N个元素

多个协程容许发送和接收同一个通道数据

Channel属于接口没法直接建立, 咱们须要经过函数Channel()来建立其实现类

源码

public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
    when (capacity) {
        RENDEZVOUS -> RendezvousChannel() // 无缓存
        UNLIMITED -> LinkedListChannel() // 无限制
        CONFLATED -> ConflatedChannel()  // 合并
        BUFFERED -> ArrayChannel(CHANNEL_DEFAULT_CAPACITY) // 64
        else -> ArrayChannel(capacity) // 指定缓存大小
    }
复制代码
  • capacity

    缓冲大小, 默认0
    当Channel发送一条数据时就会挂起通道(不继续执行发送后续代码), 只有在接收这条数据时才会解除挂起继续执行. 可是咱们能够设置缓存大小
    复制代码

通道容许被遍历获取当前发送数据

val channel = Channel<Int>()

for (c in channel){

}
复制代码
public suspend fun yield(): Unit
复制代码

Channel

Channel接口同时实现发送渠道(SendChannel)和接收渠道(ReceiveChannel)两个接口

public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> 
复制代码

SendChannel

public val isClosedForSend: Boolean

public suspend fun send(element: E)
// 发送消息

public fun offer(element: E): Boolean

public fun close(cause: Throwable? = null): Boolean
// 关闭发送通道

public fun invokeOnClose(handler: (cause: Throwable?) -> Unit)

public val onSend: SelectClause2<E, SendChannel<E>>
复制代码
  • 发送通道关闭后不能继续使用ReceiveChannel接收数据, 会致使ClosedReceiveChannelException抛出

ReceiveChannel

public val isClosedForReceive: Boolean
// SendChannel是否已经关闭通道, 若是关闭通道之后还存在缓存则会接收完缓存以后返回false

public val isEmpty: Boolean

public suspend fun receive(): E
// 接受当前被发送的消息

public val onReceive: SelectClause1<E>
// 监听事件发送
public val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
// 会抛出异常, 不推荐使用

public val onReceiveOrNull: SelectClause1<E?>
// Null表示通道已关闭

public suspend fun receiveOrClosed(): ValueOrClosed<E>
// `ValueOrClosed`对象能够判断通道是否已关闭

public fun poll(): E?

public fun cancel(cause: CancellationException? = null)
// 取消
复制代码
  1. 通道的发送和接收都会致使做用域被阻塞, 可是发送消息能够经过设置缓存让他不阻塞, 或者取消通道可让阻塞继续
  2. 通道只容许在挂起函数中发送和接收, 可是建立通道不限制
  3. 若是不关闭通道, 其所在做用域不会被结束
  4. 通道执行close函数后不容许再发送或者接收数据, 不然抛出异常
  5. 通道的send|receive函数所在做用域被取消cancel不会致使通道结束(isClosedForReceive返回false)

BroadcastChannel

这个通道和通常的通道区别在于他的每一个数据能够被每一个做用域所有接收到. 默认的通道一个数据被接收后其余的协程是没法再接收到数据的

广播通道经过全局函数建立对象

public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E>
复制代码

自己广播通道继承自SendChannel, 只能发送数据, 经过函数能够拿到接收通道

public fun openSubscription(): ReceiveChannel<E>
复制代码

取消通道

public fun cancel(cause: CancellationException? = null)
复制代码

将Channel转成BroadcastChannel

fun <E> ReceiveChannel<E>.broadcast(
    capacity: Int = 1,
    start: CoroutineStart = CoroutineStart.LAZY
): BroadcastChannel<E>
复制代码

经过扩展函数在协程做用域中快速建立一个广播发送通道

public fun <E> CoroutineScope.broadcast(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 1,
    start: CoroutineStart = CoroutineStart.LAZY,
    onCompletion: CompletionHandler? = null,
    @BuilderInference block: suspend ProducerScope<E>.() -> Unit
): BroadcastChannel<E> 
复制代码

迭代通道

接收通道实现操做符重载可使用迭代

public operator fun iterator(): ChannelIterator<E>
复制代码

示例

for (i in produce){
	// 收到每一个发型的消息
}
复制代码

当多个协程接收同一个渠道数据会依次轮流接收到数据, 渠道对于多个协程是公平的

Produce

上面介绍的属于建立Channel对象来发送和接收数据, 可是还能够经过扩展函数快速建立并返回一个具有发送数据的ReceiveChannel对象

public fun <E> CoroutineScope.produce( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, @BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E>
复制代码
  • context: 能够经过协程上下文决定调度器等信息
  • capacity: 初始化通道空间

ProducerScope 该接口继承自SendChannel以及CoroutineScope, 具有发送通道数据以及协程做用域做用.

当produce做用域执行完成会关闭通道, 前面已经说起关闭通道没法继续接收数据

等待取消

该函数会在通道被取消时回调其函数参数. 前面说起协程取消时能够经过finally来释放内存等操做, 可是通道取消没法使用finally只能使用该函数.

public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) 

// [SendChannel.close] or [ReceiveChannel.cancel] 表明取消通道
复制代码

Actor

能够经过actor函数建立一个具有渠道做用的协程做用域

public fun <E> CoroutineScope.actor( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, // todo: Maybe Channel.DEFAULT here? start: CoroutineStart = CoroutineStart.DEFAULT, onCompletion: CompletionHandler? = null, block: suspend ActorScope<E>.() -> Unit
): SendChannel<E>
复制代码
  • context: 协程上下文
  • capacity: 通道缓存空间
  • start: 协程启动模式
  • onCompletion: 完成回调
  • block: 回调函数中能够进行发送数据

该函数和produce函数类似,

  1. produce返回ReceiveChannel, 外部协程只能进行数据接收
  2. actor返回的SendChannel, 外部协程只能进行数据发送
  3. actor的回调函数拥有属性channel:Channel, 既能够发送数据又能够接收数据, produce的属性channel属于SendChannel
  4. 不管是produce或者actor他们的通道都属于Channel, 既能够发送又能够接收数据, 只须要类型强转便可.
  5. 自己Channel能够进行双向数据通讯, 可是设计produce和actor属于设计思想中的生产者和消费者模式
  6. 他们都属于协程做用域和数据通道的结合

轮循器

不管是RxJava仍是协程都支持轮循器的功能, 在个人网络请求库中还赋予了轮循器暂停|继续|多个观察者|重置等功能

这里的协程轮循器就比较简陋

public fun ticker( delayMillis: Long, initialDelayMillis: Long = delayMillis, context: CoroutineContext = EmptyCoroutineContext, mode: TickerMode = TickerMode.FIXED_PERIOD ): ReceiveChannel<Unit>
复制代码

该通道返回的数据是Unit

默认状况下能够理解为通道会在指定间隔时间后一直发送Unit数据

fun main() = runBlocking<Unit> {
  
    val tickerChannel = ticker(delayMillis = 1000, initialDelayMillis = 0)

  // 每秒打印
    for (unit in tickerChannel) {
        System.err.println("unit = $unit")
    }
}
复制代码

可是若是下游不是在发送数据之后当即接收数据, 而是延迟使用receive函数来接收通道数据

TickerMode该枚举拥有两个字段

  • FIXED_PERIOD 默认值, 动态调节通道发送数据的时间间隔, 时间间隔能够看作是上游发送数据的
  • FIXED_DELAY 只有当接收数据后才会开始计算间隔时间, 时间间隔能够看作是下游接收数据的

这个轮循器不支持多订阅|暂停|继续|重置|完成, 可是个人Net库中Interval对象已实现该功能.

Select

select闭包中能够建立多个协程做用域或者通道, 且只会执行最快接收数据的通道或者结果.

通道

@InternalCoroutinesApi
suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String = select<String> {
    a.onReceiveOrNull {
        if (it == null) "Channel 'a' is closed"
        else "a -> '$it.valueOrNull'"
    }

    b.onReceiveOrNull {
        if (it == null) "Channel 'b' is closed"
        else "b -> '$it.valueOrNull'"
    }
}

@InternalCoroutinesApi
fun main() = runBlocking<Unit> {

    val a = produce<String> {
        repeat(4) { send("Hello $it") }
    }

    val b = produce<String> {
        repeat(4) { send("World $it") }
    }

    repeat(8) {
        // 打印最先的八个结果
        println(selectAorB(a, b))
    }

    coroutineContext.cancelChildren()
}
复制代码

Flow

Flow不属于挂起函数能够在任意位置建立. 默认执行在当前协程上下文中.

Flow和RxJava很类似, 分为上游和下游及中间操做符, 可是Flow内部属于协程做用域, 其调度器依靠挂起函数切换异步.

JetBrains公司也认可Flow属于参考Reactive Stream等框架的产物, 这样我相信不少人就能理解Flow的存在乎义了. 这种上下游观察者模式在ROOM官方数据库中一样支持.

Flow的操做符不如RxJava丰富, 可是Flow的开发时间还很短还未正式完成. 后面能够跟进

建立Flow

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T>
复制代码

示例

fun shoot() = flow {
    for (i in 1..3) {
        delay(1000) // 伪装咱们在这里作了一些有用的事情
        emit(i) // 发送下一个值
    }
}
复制代码
  • 集合或者Sequence均可以经过asFlow函数转成Flow对象

  • 也能够像建立集合同样经过fowOf直接建立Flow对象

  • Channel通道转成Flow

    public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow
    复制代码
  • 甚至挂起函数也能够转成Flow

    public fun <T> (suspend () -> T).asFlow(): Flow<T>
    复制代码

Flow的完成和异常不须要经过发射器Emitter去发送, RxJava须要.

和RxJava同样理解基本用法和思想以后就仅须要熟悉不一样的操做符了. 基本的操做符每一个ReactiveStream框架都是雷同

下面介绍Flow的函数(操做符).

Collect

收集数据

Flow是冷数据, 要求调用函数collect收集数据时才会进行数据的发射. 该系列函数也成为末端操做符.

shoot().collect {
	System.err.println("(Demo.kt:9) it = $it")
}
复制代码

函数

public suspend fun Flow<*>.collect() 

public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit)

public suspend inline fun <T> Flow<T>.collectIndexed( crossinline action: suspend (index: Int, value: T) -> Unit
): Unit

public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job
// 将Flow运行在指定的协程做用域内

public suspend inline fun <T> FlowCollector<T>.emitAll(flow: Flow<T>)
复制代码

Context

调度器

Flow默认使用的是其所在的当前线程或者协程上下文, Flow不容许在内部使用withContext来切换调度器, 而是应该使用flowOn函数

public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>
复制代码

该函数改变的是Flow函数内部发射时的线程, 而在collect收集数据时会自动切回建立Flow时的线程.

public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T>

public inline fun <T> callbackFlow(@BuilderInference noinline block: suspend ProducerScope<T>.() -> Unit): Flow<T>
复制代码

缓存

不须要等待收集执行就当即执行发射数据. 只是数据暂时被缓存而已. 提升性能.

默认切换调度器时会自动缓存

public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T>
复制代码

合并函数, 这个函数实际上就是buffer

当下游没法及时处理上游的数据时会丢弃掉该数据

public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
复制代码

Zip

将多个事件合并后发送给下游

zip

将两个Flow在回调函数中进行处理返回一个新的值 R

当两个flow的长度不等时只发送最短长度的事件

public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>
复制代码

示例

val nums = (1..3).asFlow().onEach { delay(300) } // 发射数字 1..3,间隔 300 毫秒
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒发射一次字符串
val startTime = System.currentTimeMillis() // 记录开始的时间
nums.zip(strs) { a, b -> "$a -> $b" } // 使用“zip”组合单个字符串
    .collect { value -> // 收集并打印
            println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    } 
复制代码

combine

public fun <T1, T2, R> Flow<T1>.combine(flow: Flow<T2>, transform: suspend (a: T1, b: T2) -> R): Flow<R>

public fun <T1, T2, R> Flow<T1>.combineTransform( flow: Flow<T2>, @BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
): Flow<R>
复制代码

Collection

Flow直接转成集合函数

public suspend fun <T> Flow<T>.toList(destination: MutableList<T> = ArrayList()): List<T>

public suspend fun <T> Flow<T>.toSet(destination: MutableSet<T> = LinkedHashSet()): Set<T>

public suspend fun <T, C : MutableCollection<in T>> Flow<T>.toCollection(destination: C): C
复制代码

Reduce

public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S

public suspend inline fun <T, R> Flow<T>.fold( initial: R, crossinline operation: suspend (acc: R, value: T) -> R
): R
// `acc`为上次回调函数返回值, 第一次为初始值, 等同于叠加效果. 该函数和reduce的区别就是支持初始值. reduce累计两次元素才会回调函数

public suspend fun <T> Flow<T>.single(): T
// 期待只有一个元素, 不然抛出`IllegalStateException`

public suspend fun <T: Any> Flow<T>.singleOrNull(): T?
// 不抛出异常, 但若是不是仅有元素则返回null

public suspend fun <T> Flow<T>.first(): T
// 若是不存在一个元素则会抛出`NoSuchElementException`

public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T
// 返回回调函数判断为true的第一个条件符合的元素
复制代码

Merge

public fun <T, R> Flow<T>.flatMapMerge( concurrency: Int = DEFAULT_CONCURRENCY, transform: suspend (value: T) -> Flow<R>
): Flow<R> = map(transform).flattenMerge(concurrency)
// 上游先发送全部的元素, 而后上游每一个元素会致使回调函数中的Flow发送全部元素一次

public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R>
// 等同于RxJava的FlatMap

public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T>

public fun <T> Flow<Flow<T>>.flattenMerge( concurrency: Int = DEFAULT_CONCURRENCY ): Flow<T>

public fun <T, R> Flow<T>.transformLatest( @BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R>

public inline fun <T, R> Flow<T>.flatMapLatest( @BuilderInference crossinline transform: suspend (value: T) -> Flow<R>
): Flow<R> 

public inline fun <T, R> Flow<T>.flatMapLatest( @BuilderInference crossinline transform: suspend (value: T) -> Flow<R>
): Flow<R>
复制代码

Emitter

public inline fun <T, R> Flow<T>.transform( @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R>
// 转换函数, 能够在回调函数中发送新的元素

public fun <T> Flow<T>.onStart( action: suspend FlowCollector<T>.() -> Unit
): Flow<T>
// 开始

public fun <T> Flow<T>.onCompletion( action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T>
// 回调函数中的参数`cause`若是为null表示正常完成没有抛出异常, 反之则抛出异常非正常结束
// 和catch函数同样只能监听到上游发生的异常, 可是没法避免异常抛出只能在异常抛出以前执行回调函数
复制代码

Limit

限制流发送

public fun <T> Flow<T>.take(count: Int): Flow<T>
// 只接受指定数量事件

public fun <T> Flow<T>.drop(count: Int): Flow<T>
// 丢弃指定数量事件

public fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T>
public fun <T> Flow<T>.dropWhile(predicate: suspend (T) -> Boolean): Flow<T>
// 回调函数判断是否丢弃或者接收, 只要丢弃或者接收后面就不会继续发送事件(结束流)
复制代码

Error

捕捉异常

public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(cause: Throwable) -> Unit): Flow<T>
复制代码

该函数只能捕获上游异常, 若是异常处于函数调用之下则依然会被抛出

重试

public fun <T> Flow<T>.retry( retries: Long = Long.MAX_VALUE, // 重试次数 predicate: suspend (cause: Throwable) -> Boolean = { true }
): Flow<T>

public fun <T> Flow<T>.retryWhen( predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean
): Flow<T>
复制代码

Transform

public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T>

public inline fun <T> Flow<T>.filterNot(crossinline predicate: suspend (T) -> Boolean): Flow<T>

public inline fun <reified R> Flow<*>.filterIsInstance(): Flow<R> = filter { it is R } as Flow<R>

public fun <T: Any> Flow<T?>.filterNotNull(): Flow<T>

public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R>

public inline fun <T, R: Any> Flow<T>.mapNotNull(crossinline transform: suspend (value: T) -> R?): Flow<R>


public fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>>

public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T>

public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R>

public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R>
复制代码

Android

Google发行的Jetpack库中不少组件都附有KTX扩展依赖, 这种依赖主要是增长kotlin和协程支持

Lifecycle

官方提供生命周期协程做用域的快速建立实现.

  • 指定生命周期运行协程
  • 自动在onDestory取消协程

引入ktx依赖库

implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.2.0-rc03"
复制代码

当执行到某个生命周期时运行协程

fun launchWhenCreated(block: suspend CoroutineScope.() -> Unit): Job

fun launchWhenStarted(block: suspend CoroutineScope.() -> Unit): Job

fun launchWhenResumed(block: suspend CoroutineScope.() -> Unit): Job

suspend fun <T> Lifecycle.whenStateAtLeast(
    minState: Lifecycle.State,
    block: suspend CoroutineScope.() -> T
)
复制代码

这些函数都属于LifecycleLifecycleOwner的扩展函数

LiveData

依赖

implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0-rc03"
复制代码

提供开发者使用的只有这两个函数, 两个函数功能同样, 只是每一个参数接收时间单位不一致

fun <T> liveData( context: CoroutineContext = EmptyCoroutineContext, timeoutInMs: Long = DEFAULT_TIMEOUT, @BuilderInference block: suspend LiveDataScope<T>.() -> Unit
): LiveData<T> = CoroutineLiveData(context, timeoutInMs, block)

fun <T> liveData( context: CoroutineContext = EmptyCoroutineContext, timeout: Duration, @BuilderInference block: suspend LiveDataScope<T>.() -> Unit
): LiveData<T> = CoroutineLiveData(context, timeout.toMillis(), block)
复制代码
  • timeout: 若是liveData的没有处于活跃的观察者则在指定的时间内(单位毫秒)会取消其做用域[block]
  • block: 该做用域只在活跃状态才会触发, 默认在Dispatchers.Main.immediate调度器

liveData做用域具有发射数据和LiveData的做用

interface LiveDataScope<T> {
    /** * Set's the [LiveData]'s value to the given [value]. If you've called [emitSource] previously, * calling [emit] will remove that source. * * Note that this function suspends until the value is set on the [LiveData]. * * @param value The new value for the [LiveData] * * @see emitSource */
    suspend fun emit(value: T)

    /** * Add the given [LiveData] as a source, similar to [MediatorLiveData.addSource]. Calling this * method will remove any source that was yielded before via [emitSource]. * * @param source The [LiveData] instance whose values will be dispatched from the current * [LiveData]. * * @see emit * @see MediatorLiveData.addSource * @see MediatorLiveData.removeSource */
    suspend fun emitSource(source: LiveData<T>): DisposableHandle

    /** * References the current value of the [LiveData]. * * If the block never `emit`ed a value, [latestValue] will be `null`. You can use this * value to check what was then latest value `emit`ed by your `block` before it got cancelled. * * Note that if the block called [emitSource], then `latestValue` will be last value * dispatched by the `source` [LiveData]. */
    val latestValue: T?
}
复制代码
  1. 若是emitSource在emit以前执行则无效
  2. 该做用域会在每次处于活跃状态时都执行一遍, 若是将应用从后台切换到前台则会返回执行该做用域, 可是观察者只会在活跃时才收到数据
相关文章
相关标签/搜索