协程这个概念在1958年就开始出现, 目前某些语言开始原生支持(目前主流语言我感受只有Java彻底不支持协程). Java没有原生协程可是能够大型公司都本身或者使用第三方库来支持协程编程, 可是Kotlin原生支持协程.java
Android领域的网络请求库通常由Rxjava实现, 包括我本身写的网络请求库一样也是采用的RxJava. 可是这些RxJava实现的网络请求库一样很难方便的实现并发android
我认为协程的核心就是一个词: 做用域, 理解什么是做用域就理解协程了git
什么是协程:github
线程和协程的关系属于一对多关系, 一个线程上容许存在多个协程, 即主线程你也能异步执行代码. 可是让某个线程执行太多协程效率过低下, 因此针对不一样的场景建议使用调度器切换线程, 使用协程开始就不须要考虑线程的问题, 只须要在不一样场景使用不一样的调度器(调度器会对特定任务进行优化)就好, 协程英文名是Coroutine.数据库
使用场景编程
假设首页存在七个接口网络请求(后端人员处理差)的状况一个个使用串行网络请求的时间比并发网络请求慢了接近七倍.后端
目前计算机都是经过多核CPU提高计算能力, 因此熟练掌握并发编程是将来的趋势缓存
协程优点安全
实验特性bash
协程在Kotlin1.3时候放出正式版本, 可是目前仍然存在不稳定的函数变更, 不过这个我认为不影响项目中实际使用
@FlowPreview 表明可能之后存在Api函数变更
@ExperimentalCoroutinesApi 表明目前可能存在不稳定的因素的函数
@ObsoleteCoroutinesApi 可能存在被废弃的可能
复制代码
Kotlin的协程主要构成分为三部分
为方便网络请求和简化异步做用域开启可使用我实现的一个库: Net
1.0+版本为RxJava实现, 2.0+版本为Coroutine实现
本文章后续会根据Kotlin的版本中的协程迭代进行更新
经常使用事件分发框架为EventBus或者RxBus, 我以前使用RxJava的时候也写了RxBus来使用, 使用协程后我又用协程实现一个: Channel
展望
协程对于后端高并发优点很大, 相信Spring的Kt版本后续会跟进
至于Google的Jetpack基本上都有针对协程扩展
咱们公司项目属于 MVVM+Kotlin+Coroutine+JetPack, 最明显的是并发网络请求速度翻倍. 同时代码更加结构清晰
开启主协程的三种方式
生命周期和App一致, 没法取消(不存在Job), 不存在线程阻塞
fun main() {
GlobalScope.launch { // 在后台启动一个新的协程并继续
delay(1000L)
println("World!")
}
Thread.sleep(2000) // 防止JVM虚拟机退出
}
复制代码
不存在线程阻塞, 能够取消, 能够经过CoroutineContext控制协程生命周期
fun main() {
CoroutineScope(Dispatchers.IO).launch {
}
Thread.sleep(1000)
}
复制代码
线程阻塞, 适用于单元测试, 不须要延迟阻塞防止JVM虚拟机退出. runBlocking属于全局函数能够在任意地方调用
通常咱们在项目中是不会使用runBlocking, 由于阻塞主线程没有开启的任何意义
fun main() = runBlocking {
// 阻塞线程直到协程做用域内部全部协程执行完毕
}
复制代码
协程内部还可使用函数建立其余协程做用域, 分为两种建立函数:
CoroutineScope
的扩展函数, 只有在做用域内部才能建立其余的做用域suspend
修饰的函数内部在主协程内还能够建立子协程做用域, 建立函数分为两种
阻塞做用域(串行): 会阻塞当前做用域
挂起做用域(并发): 不会阻塞当前做用域
同步做用域函数
都属于suspend函数
withContext
: 能够切换调度器, 有返回结果coroutineScope
: 建立一个协程做用域, 该做用域会阻塞当前所在做用域而且等待其子协程执行完才会恢复, 有返回结果supervisorScope
: 使用SupervisorJob的coroutineScope, 异常不会取消父协程public suspend fun <T> withContext( context: CoroutineContext, block: suspend CoroutineScope.() -> T
): T
// 返回结果. 能够和当前协程的父协程存在交互关系, 主要做用为来回切换调度器
public suspend inline operator fun <T> CoroutineDispatcher.invoke( noinline block: suspend CoroutineScope.() -> T
): T = withContext(this, block)
// withContext工具函数而已
public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R
public suspend fun <R> supervisorScope(block: suspend CoroutineScope.() -> R): R
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T
复制代码
异步做用域函数
这两个函数都不属于suspend, 只须要CoroutineScope就能够调用
launch
: 异步并发, 没有返回结果async
: 异步并发, 有返回结果public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit
): Job
public fun <T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T
): Deferred<T>
复制代码
同一个协程做用域中的异步任务遵照顺序原则开始执行. 适用于串行网络请求, 在一个异步任务须要上个异步任务的结果时.
协程挂起须要时间, 因此异步协程永远比同步代码执行慢
fun main() = runBlocking<Unit> {
launch {
System.err.println("(Main.kt:34) 后执行")
}
System.err.println("(Main.kt:37) 先执行")
}
复制代码
当在协程做用域中使用async
函数时能够建立并发任务
public fun <T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T
): Deferred<T>
复制代码
示例
fun main() = runBlocking<Unit> {
val name = async { getName() }
val title = async { getTitle() }
System.err.println("(Main.kt:35) result = ${name.await() + title.await()}")
delay(2000)
}
复制代码
Deferred
. 经过函数await
获取结果值.awaitAll()
等待所有完成.await
任务也会等待执行完协程关闭await
则async内部抛出的异常不会被logCat和tryCatch捕获, 可是依然会致使做用域取消和异常崩溃. 但当执行await
时异常信息会从新抛出.惰性并发
将async函数中的start
设置为CoroutineStart.LAZY
时则只有调用Deferred对象的await
时才会开始执行异步任务(或者执行start
函数).
启动模式
异常
协程中发生异常, 则父协程取消而且父协程其余的子协程一样所有取消
继承自Job
提供一个全局函数用于建立CompletableDeferred对象, 该对象能够实现自定义Deferred功能
示例
fun main() = runBlocking<Unit> {
val deferred = CompletableDeferred<Int>()
launch {
delay(1000 )
deferred.complete(23)
}
System.err.println("(Demo.kt:72) 结果 = ${deferred.await()}")
}
复制代码
建立CompletableDeferred的全局函数
public fun <T> CompletableDeferred(parent: Job? = null): CompletableDeferred<T>
public fun <T> CompletableDeferred(value: T): CompletableDeferred<T>
复制代码
CompletableDeferred函数
public fun complete(value: T): Boolean
// 结果
public fun completeExceptionally(exception: Throwable): Boolean
// 抛出异常, 异常发生在`await()`时
复制代码
建立此对象表示建立一个协程做用域
若是你看协程的教程可能会常常看到这个词, 这就是做用域内部开启新的协程. 父协程会限制子协程的生命周期, 子协程承接父协程的上下文, 这种层级关系就是结构化并发.
在一个协程做用域里面开启多个子协程进行并发行为
协程上下文, 我认为协程上下文能够看作包含协程基本信息的一个Context(上下文). 其能够决定协程的名称或者运行
建立一个新的调度器
fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher
fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher
复制代码
建立新的调度器比较消耗资源, 建议复用且当不须要的时候使用close
函数释放
Dispatchers
继承自CoroutineContext, 该枚举拥有三个实现. 表示不一样的线程调度. 当函数不使用调度器时承接当前做用域的调度器
Dispatchers.Unconfined 不指定线程,
若是子协程切换线程那么接下来的代码也运行在该线程上
复制代码
Dispatchers.IO
适用于IO读写
复制代码
Dispatchers.Main
根据平台不一样而有所差, Android上为主线程
复制代码
Dispatchers.Default 默认调度器
在线程池中执行协程体, 适用于计算操做
复制代码
当即执行
Dispatchers.Main.immediate
复制代码
immediate
属于全部调度器都有的属性, 该属性表明着若是当前正处于该调度器中不执行调度器切换直接执行, 能够理解为在同一调度器内属于同步协程做用域
例如launch
函数开启做用域会比后续代码执行顺序低, 可是使用该属性协程属于顺序执行
示例
CoroutineScope(Job() + Dispatchers.Main.immediate).launch {
// 执行顺序 1
}
// 执行顺序 2
CoroutineScope(Job() + Dispatchers.Main).launch {
// 执行顺序 4
}
// 执行顺序 3
复制代码
经过建立一个CoroutineName
对象, 在构造函数中指定参数为协程名称, CoroutineName
继承自CoroutineContext.
launch(CoroutineName("吴彦祖")){
}
复制代码
协程上下文名称用于方便调试使用
yield函数可让当前协程暂时挂起执行其余协程体, 若是没有其余正在并发的协程体则继续执行当前协程体(至关于无效调用).
public suspend fun yield(): Unit
复制代码
在协程中Job一般被称为做业, 表示一个协程工做任务, 他一样继承自CoroutineContext
val job = launch {
}
复制代码
Job属于接口
interface Job : CoroutineContext.Element
复制代码
函数
public suspend fun join()
// 等待协程执行完毕都阻塞当前线程
public fun cancel(cause: CancellationException? = null)
// 取消协程
public suspend fun Job.cancelAndJoin()
// 阻塞而且在协程结束之后取消协程
public fun start(): Boolean
public val children: Sequence<Job>
// 所有子做业
public fun invokeOnCompletion( onCancelling: Boolean = false, // true则取消做用域不会执行 invokeImmediately: Boolean = true, // handler: CompletionHandler): DisposableHandle
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
// 当其做用域完成之后执行, 主协程指定才有效, 直接给CoroutineScope指定时无效的
// 手动抛出CancellationException一样会赋值给cause
public fun getCancellationException(): CancellationException
public val onJoin: SelectClause0
public val children: Sequence<Job>
// 后面说起的选择器中使用
复制代码
状态
经过字段能够获取JOB当前处于状态
public val isActive: Boolean
public val isCancelled: Boolean
public val isCompleted: Boolean
复制代码
扩展函数
public fun Job.cancelChildren(cause: CancellationException? = null)
public suspend fun Job.cancelAndJoin()
复制代码
每一个协程做用域都存在coroutineContext. 而协程上下文中都存在Job对象
coroutineContext[Job]
复制代码
若是协程做用域内存在计算任务(一直打日志也算)则没法被取消, 若是使用delay函数则能够被取消.
fun main() = runBlocking<Unit> {
val job = launch(Dispatchers.Default) {
while (true){
delay(100) // 这行代码存在则能够成功取消协程, 不存在则没法取消
System.err.println("(Main.kt:30) ")
}
}
delay(500)
job.cancel()
System.err.println("(Main.kt:42) 结束")
}
复制代码
经过使用协程内部isActive
属性来判断是否应该结束
fun main() = runBlocking<Unit> {
val job = launch(Dispatchers.Default) {
while (isActive) { // 一旦协程被取消则为false
System.err.println("(Main.kt:30) ")
}
}
delay(500)
job.cancel()
System.err.println("(Main.kt:42) 结束")
}
复制代码
协程存在被手动取消的状况, 可是有些资源须要在协程取消的时候释放资源, 这个操做能够在finally
中执行.
不管如何finally都会被执行
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
try {
repeat(1000){
System.err.println("(Main.kt:31) it = $it")
delay(500)
}
} finally {
// 已被取消的协程没法继续挂起
}
}
delay(1500)
job.cancel()
System.err.println("(Main.kt:42) ")
}
复制代码
再次开启协程
经过withContext
和NonCancellable
能够在已被取消的协程中继续挂起协程. 这种用法其实能够看作建立一个没法取消的任务
withContext(NonCancellable) {
println("job: I'm running finally")
delay(1000L)
println("job: And I've just delayed for 1 sec because I'm non-cancellable")
}
复制代码
协程做用域能够接收多个CoroutineContext做为上下文参数. CoroutineContext自己属于接口, 不少上下文相关的类都实现与他.
配置多个CoroutineContext能够经过+
符号同时指定多个协程上下文, 每一个实现对象可能包含一部分信息能够存在覆盖行为故相加时的顺序存在覆盖行为.
val a = CoroutineScope(SupervisorJob() + coroutineContext).launch(handler) {
delay(1000)
System.err.println("(Main.kt:51) ${Thread.currentThread()}")
}
复制代码
launch(Dispatchers.IO + CoroutineName("吴彦祖")){}
复制代码
协程局部变量
使用ThreadLocal
能够获取线程的局部变量, 可是要求使用扩展函数asContextElement
转为协程上下文
做为参数传入在建立协程的时候.
该局部变量做用于持有该协程上下文的协程做用域内
public fun <T> ThreadLocal<T>.asContextElement(value: T = get()): ThreadContextElement<T> =
ThreadLocalElement(value, this)
复制代码
public suspend fun <T> withTimeout(timeMillis: Long, block: suspend CoroutineScope.() -> T): T
// 超过指定时间timeMillis自动结束协程.
// 当没有超时时返回值获取而且继续执行协程.
// 当超时会抛出异常TimeoutCancellationException, 可是不会致使程序结束
public suspend fun <T> withTimeoutOrNull(timeMillis: Long, block: suspend CoroutineScope.() -> T): T?
// 若是超时不会结束协程而是返回null
复制代码
没法手动抛出TimeoutCancellationException, 由于其构造函数私有
全局协程做用域
全局协程做用域属于单例对象, 整个JVM虚拟机只有一份实例对象. 他的寿命周期也跟随JVM. 使用全局协程做用域的时候注意避免内存泄漏
public object GlobalScope : CoroutineScope {
/** * Returns [EmptyCoroutineContext]. */
override val coroutineContext: CoroutineContext
get() = EmptyCoroutineContext
}
复制代码
全局协程做用域不继承父协程做用域的上下文, 因此也不会由于父协程被取消而自身被取消.
启动模式
DEFAULT
当即执行协程体ATOMIC
当即执行协程体,但在开始执行协程以前没法取消协程UNDISPATCHED
当即在当前线程执行协程体,第一个挂起函数执行在函数所在线程, 后面执行在函数指定线程LAZY
手动执行start
或join
才会执行协程协程体若是已经执行实际上属于不可取消的, 在协程体中经过isActive
来判断协程是否处于活跃中
经过取消函数的参数指定异常CancellationException能够自定义异常对象
不可取消的协程做用域
NonCancellable该单例对象用于withContext
函数建立一个没法被取消的协程做用域
withContext(NonCancellable) {
delay(2000)
}
复制代码
示例
fun main() = runBlocking {
launch {
delay(1000)
System.err.println("(Main.kt:19) ")
}
launch {
withContext(NonCancellable) {
delay(2000)
System.err.println("(Main.kt:26) ")
}
}
delay(500) // 防止launch还未开启withContext就被取消
cancel()
}
复制代码
GlobalScope取消
GlobalScope属于全局协程, 由他开启的协程都不拥有Job, 因此没法取消协程. 可是能够经过给GlobalScope开启的协程做用域指定Job而后就可使用Job取消协程.
经过CoroutineExceptionHandler
函数能够建立一个同名的对象, 该接口继承自CoroutineContext
. 一样经过制定上下文参数传递给全局协程做用域使用, 看成用域抛出异常时会被该对象的回调函数接收到, 而且不会抛出异常.
只要发生异常就会致使父协程和其全部子协程都被取消, 这种属于双向的异常取消机制, 后面提到的监督做业属于发生异常只会取消全部子协程, 属于单向.
CoroutineExceptionHandler异常处理器并不能阻止协程取消, 只是监听到协程的异常信息避免JVM抛出异常退出程序而已
不要尝试直接使用try/catch
捕捉协程做用域的异常
try {
launch {
throw NullPointerException()
}
} catch (e: Exception) {
e.printStackTrace()
}
复制代码
错误示例, 没法捕捉到异常
协程取消异常
取消协程的做业(Job)会引起异常, 可是会被默认的异常处理器给忽略, 可是咱们能够经过捕捉能够看到异常信息
fun main() = runBlocking<Unit> {
val job = GlobalScope.launch {
try {
delay(1000)
} catch (e: Exception) {
e.printStackTrace()
}
}
job.cancel(CancellationException("自定义一个用于取消协程的异常"))
delay(2000)
}
复制代码
Job取消函数
public fun cancel(cause: CancellationException? = null)
复制代码
全局协程做用域的异常处理
CoroutineExceptionHandler没法捕捉async|produce
抛出的异常
val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
System.err.println("(Main.kt:41):main coroutineContext = $coroutineContext, throwable = $throwable")
}
GlobalScope.launch(exceptionHandler) {
}
复制代码
子协程没法设置异常处理器, 即便设置了也会被父协程覆盖而没有意义. 除非使用异常处理器+Job对象, 可是第一个子协程launch容许设置
异常聚合和解包
全局协程做用域也存在嵌套子父级关系, 故异常可能也会依次抛出多个异常
fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
// 这里的异常是第一个被抛出的异常对象
println("捕捉的异常: $exception 和被嵌套的异常: ${exception.suppressed.contentToString()}")
}
val job = GlobalScope.launch(handler) {
launch {
try {
delay(Long.MAX_VALUE)
} finally { // 当父协程被取消时其全部子协程都被取消, finally被取消以前或者完成任务以后必定会执行
throw ArithmeticException() // 再次抛出异常, 异常被聚合
}
}
launch {
delay(100)
throw IOException() // 这里抛出异常将致使父协程被取消
}
delay(Long.MAX_VALUE)
}
job.join() // 避免GlobalScope做用域没有执行完毕JVM虚拟机就退出
}
复制代码
通常状况子协程发生异常会致使父协程被取消, 同时父协程发生异常会取消全部的子协程. 可是有时候子协程发生异常咱们并不但愿父协程也被取消, 而是仅仅全部子协程取消, 这个使用就是用SupervisorJob
做业.
建立监督做业对象
val job = SupervisorJob()
launch(job) { }
复制代码
监督做业在withContext
和async
中添加无效
直接建立 SupervisorJob()
对象传入做用域中会致使该做用域和父协程生命周期不统一的问题, 即父协程取消之后该子协程依然处于活跃状态.
该函数能够解决我上面提到的生命周期不统一问题, 直接建立向下传播异常的协程做用域
public suspend fun <R> supervisorScope(block: suspend CoroutineScope.() -> R): R
复制代码
supervisorScope
函数使用的依然是当前做用域的Job, 因此跟随当前做用域生命周期, 能够被取消.下面我写个即便抛出异常也不会致使协程取消的
supervisorScope {
try {
throw NullPointerException()
} catch (e: Exception) {
e.printStackTrace()
}
}
复制代码
解决线程不安全问题
至关于Java中的Lock替代品: Mutex
建立互斥对象
public fun Mutex(locked: Boolean = false): Mutex
// `locked`是否当即上锁
复制代码
使用扩展函数能够自动加锁和解锁
public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T
// owner 钥匙
复制代码
经过在不一样的协程内部使用Channel实例能够数据通信. 通道能够连续顺序传输N个元素
多个协程容许发送和接收同一个通道数据
Channel属于接口没法直接建立, 咱们须要经过函数Channel()
来建立其实现类
源码
public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
when (capacity) {
RENDEZVOUS -> RendezvousChannel() // 无缓存
UNLIMITED -> LinkedListChannel() // 无限制
CONFLATED -> ConflatedChannel() // 合并
BUFFERED -> ArrayChannel(CHANNEL_DEFAULT_CAPACITY) // 64
else -> ArrayChannel(capacity) // 指定缓存大小
}
复制代码
capacity
缓冲大小, 默认0
当Channel发送一条数据时就会挂起通道(不继续执行发送后续代码), 只有在接收这条数据时才会解除挂起继续执行. 可是咱们能够设置缓存大小
复制代码
通道容许被遍历获取当前发送数据
val channel = Channel<Int>()
for (c in channel){
}
复制代码
public suspend fun yield(): Unit
复制代码
Channel
Channel接口同时实现发送渠道(SendChannel)和接收渠道(ReceiveChannel)两个接口
public interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
复制代码
public val isClosedForSend: Boolean
public suspend fun send(element: E)
// 发送消息
public fun offer(element: E): Boolean
public fun close(cause: Throwable? = null): Boolean
// 关闭发送通道
public fun invokeOnClose(handler: (cause: Throwable?) -> Unit)
public val onSend: SelectClause2<E, SendChannel<E>>
复制代码
ClosedReceiveChannelException
抛出public val isClosedForReceive: Boolean
// SendChannel是否已经关闭通道, 若是关闭通道之后还存在缓存则会接收完缓存以后返回false
public val isEmpty: Boolean
public suspend fun receive(): E
// 接受当前被发送的消息
public val onReceive: SelectClause1<E>
// 监听事件发送
public val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
// 会抛出异常, 不推荐使用
public val onReceiveOrNull: SelectClause1<E?>
// Null表示通道已关闭
public suspend fun receiveOrClosed(): ValueOrClosed<E>
// `ValueOrClosed`对象能够判断通道是否已关闭
public fun poll(): E?
public fun cancel(cause: CancellationException? = null)
// 取消
复制代码
close
函数后不容许再发送或者接收数据, 不然抛出异常send|receive
函数所在做用域被取消cancel
不会致使通道结束(isClosedForReceive返回false)这个通道和通常的通道区别在于他的每一个数据能够被每一个做用域所有接收到. 默认的通道一个数据被接收后其余的协程是没法再接收到数据的
广播通道经过全局函数建立对象
public fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E>
复制代码
自己广播通道继承自SendChannel, 只能发送数据, 经过函数能够拿到接收通道
public fun openSubscription(): ReceiveChannel<E>
复制代码
取消通道
public fun cancel(cause: CancellationException? = null)
复制代码
将Channel转成BroadcastChannel
fun <E> ReceiveChannel<E>.broadcast(
capacity: Int = 1,
start: CoroutineStart = CoroutineStart.LAZY
): BroadcastChannel<E>
复制代码
经过扩展函数在协程做用域中快速建立一个广播发送通道
public fun <E> CoroutineScope.broadcast(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 1,
start: CoroutineStart = CoroutineStart.LAZY,
onCompletion: CompletionHandler? = null,
@BuilderInference block: suspend ProducerScope<E>.() -> Unit
): BroadcastChannel<E>
复制代码
迭代通道
接收通道实现操做符重载可使用迭代
public operator fun iterator(): ChannelIterator<E>
复制代码
示例
for (i in produce){
// 收到每一个发型的消息
}
复制代码
当多个协程接收同一个渠道数据会依次轮流接收到数据, 渠道对于多个协程是公平的
上面介绍的属于建立Channel对象来发送和接收数据, 可是还能够经过扩展函数快速建立并返回一个具有发送数据的ReceiveChannel
对象
public fun <E> CoroutineScope.produce( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, @BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E>
复制代码
ProducerScope 该接口继承自SendChannel以及CoroutineScope, 具有发送通道数据以及协程做用域做用.
当produce做用域执行完成会关闭通道, 前面已经说起关闭通道没法继续接收数据
等待取消
该函数会在通道被取消时回调其函数参数. 前面说起协程取消时能够经过finally
来释放内存等操做, 可是通道取消没法使用finally只能使用该函数.
public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {})
// [SendChannel.close] or [ReceiveChannel.cancel] 表明取消通道
复制代码
能够经过actor
函数建立一个具有渠道做用的协程做用域
public fun <E> CoroutineScope.actor( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, // todo: Maybe Channel.DEFAULT here? start: CoroutineStart = CoroutineStart.DEFAULT, onCompletion: CompletionHandler? = null, block: suspend ActorScope<E>.() -> Unit
): SendChannel<E>
复制代码
该函数和produce
函数类似,
channel:Channel
, 既能够发送数据又能够接收数据, produce的属性channel属于SendChannelproduce
或者actor
他们的通道都属于Channel, 既能够发送又能够接收数据, 只须要类型强转便可.不管是RxJava仍是协程都支持轮循器的功能, 在个人网络请求库中还赋予了轮循器暂停|继续|多个观察者|重置等功能
这里的协程轮循器就比较简陋
public fun ticker( delayMillis: Long, initialDelayMillis: Long = delayMillis, context: CoroutineContext = EmptyCoroutineContext, mode: TickerMode = TickerMode.FIXED_PERIOD ): ReceiveChannel<Unit>
复制代码
该通道返回的数据是Unit
默认状况下能够理解为通道会在指定间隔时间后一直发送Unit
数据
fun main() = runBlocking<Unit> {
val tickerChannel = ticker(delayMillis = 1000, initialDelayMillis = 0)
// 每秒打印
for (unit in tickerChannel) {
System.err.println("unit = $unit")
}
}
复制代码
可是若是下游不是在发送数据之后当即接收数据, 而是延迟使用receive函数来接收通道数据
TickerMode
该枚举拥有两个字段
这个轮循器不支持多订阅|暂停|继续|重置|完成, 可是个人Net库中Interval
对象已实现该功能.
在select
闭包中能够建立多个协程做用域或者通道, 且只会执行最快接收数据的通道或者结果.
通道
@InternalCoroutinesApi
suspend fun selectAorB(a: ReceiveChannel<String>, b: ReceiveChannel<String>): String = select<String> {
a.onReceiveOrNull {
if (it == null) "Channel 'a' is closed"
else "a -> '$it.valueOrNull'"
}
b.onReceiveOrNull {
if (it == null) "Channel 'b' is closed"
else "b -> '$it.valueOrNull'"
}
}
@InternalCoroutinesApi
fun main() = runBlocking<Unit> {
val a = produce<String> {
repeat(4) { send("Hello $it") }
}
val b = produce<String> {
repeat(4) { send("World $it") }
}
repeat(8) {
// 打印最先的八个结果
println(selectAorB(a, b))
}
coroutineContext.cancelChildren()
}
复制代码
Flow不属于挂起函数能够在任意位置建立. 默认执行在当前协程上下文中.
Flow和RxJava很类似, 分为上游和下游及中间操做符, 可是Flow内部属于协程做用域, 其调度器依靠挂起函数切换异步.
JetBrains公司也认可Flow属于参考Reactive Stream等框架的产物, 这样我相信不少人就能理解Flow的存在乎义了. 这种上下游观察者模式在ROOM官方数据库中一样支持.
Flow的操做符不如RxJava丰富, 可是Flow的开发时间还很短还未正式完成. 后面能够跟进
建立Flow
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T>
复制代码
示例
fun shoot() = flow {
for (i in 1..3) {
delay(1000) // 伪装咱们在这里作了一些有用的事情
emit(i) // 发送下一个值
}
}
复制代码
集合或者Sequence均可以经过asFlow
函数转成Flow对象
也能够像建立集合同样经过fowOf
直接建立Flow对象
Channel通道转成Flow
public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T>
复制代码
甚至挂起函数也能够转成Flow
public fun <T> (suspend () -> T).asFlow(): Flow<T>
复制代码
Flow的完成和异常不须要经过发射器Emitter去发送, RxJava须要.
和RxJava同样理解基本用法和思想以后就仅须要熟悉不一样的操做符了. 基本的操做符每一个ReactiveStream框架都是雷同
下面介绍Flow的函数(操做符).
收集数据
Flow是冷数据, 要求调用函数collect
收集数据时才会进行数据的发射. 该系列函数也成为末端操做符.
shoot().collect {
System.err.println("(Demo.kt:9) it = $it")
}
复制代码
函数
public suspend fun Flow<*>.collect()
public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit)
public suspend inline fun <T> Flow<T>.collectIndexed( crossinline action: suspend (index: Int, value: T) -> Unit
): Unit
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job
// 将Flow运行在指定的协程做用域内
public suspend inline fun <T> FlowCollector<T>.emitAll(flow: Flow<T>)
复制代码
调度器
Flow默认使用的是其所在的当前线程或者协程上下文, Flow不容许在内部使用withContext
来切换调度器, 而是应该使用flowOn
函数
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>
复制代码
该函数改变的是Flow函数内部发射时的线程, 而在collect
收集数据时会自动切回建立Flow时的线程.
public fun <T> channelFlow(@BuilderInference block: suspend ProducerScope<T>.() -> Unit): Flow<T>
public inline fun <T> callbackFlow(@BuilderInference noinline block: suspend ProducerScope<T>.() -> Unit): Flow<T>
复制代码
缓存
不须要等待收集执行就当即执行发射数据. 只是数据暂时被缓存而已. 提升性能.
默认切换调度器时会自动缓存
public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED): Flow<T>
复制代码
合并函数, 这个函数实际上就是buffer
当下游没法及时处理上游的数据时会丢弃掉该数据
public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
复制代码
将多个事件合并后发送给下游
zip
将两个Flow在回调函数中进行处理返回一个新的值 R
当两个flow的长度不等时只发送最短长度的事件
public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>
复制代码
示例
val nums = (1..3).asFlow().onEach { delay(300) } // 发射数字 1..3,间隔 300 毫秒
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒发射一次字符串
val startTime = System.currentTimeMillis() // 记录开始的时间
nums.zip(strs) { a, b -> "$a -> $b" } // 使用“zip”组合单个字符串
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
复制代码
combine
public fun <T1, T2, R> Flow<T1>.combine(flow: Flow<T2>, transform: suspend (a: T1, b: T2) -> R): Flow<R>
public fun <T1, T2, R> Flow<T1>.combineTransform( flow: Flow<T2>, @BuilderInference transform: suspend FlowCollector<R>.(a: T1, b: T2) -> Unit
): Flow<R>
复制代码
Flow直接转成集合函数
public suspend fun <T> Flow<T>.toList(destination: MutableList<T> = ArrayList()): List<T>
public suspend fun <T> Flow<T>.toSet(destination: MutableSet<T> = LinkedHashSet()): Set<T>
public suspend fun <T, C : MutableCollection<in T>> Flow<T>.toCollection(destination: C): C
复制代码
public suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S
public suspend inline fun <T, R> Flow<T>.fold( initial: R, crossinline operation: suspend (acc: R, value: T) -> R
): R
// `acc`为上次回调函数返回值, 第一次为初始值, 等同于叠加效果. 该函数和reduce的区别就是支持初始值. reduce累计两次元素才会回调函数
public suspend fun <T> Flow<T>.single(): T
// 期待只有一个元素, 不然抛出`IllegalStateException`
public suspend fun <T: Any> Flow<T>.singleOrNull(): T?
// 不抛出异常, 但若是不是仅有元素则返回null
public suspend fun <T> Flow<T>.first(): T
// 若是不存在一个元素则会抛出`NoSuchElementException`
public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T
// 返回回调函数判断为true的第一个条件符合的元素
复制代码
public fun <T, R> Flow<T>.flatMapMerge( concurrency: Int = DEFAULT_CONCURRENCY, transform: suspend (value: T) -> Flow<R>
): Flow<R> = map(transform).flattenMerge(concurrency)
// 上游先发送全部的元素, 而后上游每一个元素会致使回调函数中的Flow发送全部元素一次
public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R>
// 等同于RxJava的FlatMap
public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T>
public fun <T> Flow<Flow<T>>.flattenMerge( concurrency: Int = DEFAULT_CONCURRENCY ): Flow<T>
public fun <T, R> Flow<T>.transformLatest( @BuilderInference transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R>
public inline fun <T, R> Flow<T>.flatMapLatest( @BuilderInference crossinline transform: suspend (value: T) -> Flow<R>
): Flow<R>
public inline fun <T, R> Flow<T>.flatMapLatest( @BuilderInference crossinline transform: suspend (value: T) -> Flow<R>
): Flow<R>
复制代码
public inline fun <T, R> Flow<T>.transform( @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R>
// 转换函数, 能够在回调函数中发送新的元素
public fun <T> Flow<T>.onStart( action: suspend FlowCollector<T>.() -> Unit
): Flow<T>
// 开始
public fun <T> Flow<T>.onCompletion( action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T>
// 回调函数中的参数`cause`若是为null表示正常完成没有抛出异常, 反之则抛出异常非正常结束
// 和catch函数同样只能监听到上游发生的异常, 可是没法避免异常抛出只能在异常抛出以前执行回调函数
复制代码
限制流发送
public fun <T> Flow<T>.take(count: Int): Flow<T>
// 只接受指定数量事件
public fun <T> Flow<T>.drop(count: Int): Flow<T>
// 丢弃指定数量事件
public fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T>
public fun <T> Flow<T>.dropWhile(predicate: suspend (T) -> Boolean): Flow<T>
// 回调函数判断是否丢弃或者接收, 只要丢弃或者接收后面就不会继续发送事件(结束流)
复制代码
捕捉异常
public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(cause: Throwable) -> Unit): Flow<T>
复制代码
该函数只能捕获上游异常, 若是异常处于函数调用之下则依然会被抛出
重试
public fun <T> Flow<T>.retry( retries: Long = Long.MAX_VALUE, // 重试次数 predicate: suspend (cause: Throwable) -> Boolean = { true }
): Flow<T>
public fun <T> Flow<T>.retryWhen( predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean
): Flow<T>
复制代码
public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T>
public inline fun <T> Flow<T>.filterNot(crossinline predicate: suspend (T) -> Boolean): Flow<T>
public inline fun <reified R> Flow<*>.filterIsInstance(): Flow<R> = filter { it is R } as Flow<R>
public fun <T: Any> Flow<T?>.filterNotNull(): Flow<T>
public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R>
public inline fun <T, R: Any> Flow<T>.mapNotNull(crossinline transform: suspend (value: T) -> R?): Flow<R>
public fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>>
public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T>
public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R>
public fun <T, R> Flow<T>.scan(initial: R, @BuilderInference operation: suspend (accumulator: R, value: T) -> R): Flow<R>
复制代码
Google发行的Jetpack库中不少组件都附有KTX扩展依赖, 这种依赖主要是增长kotlin和协程支持
官方提供生命周期协程做用域的快速建立实现.
onDestory
取消协程引入ktx依赖库
implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.2.0-rc03"
复制代码
当执行到某个生命周期时运行协程
fun launchWhenCreated(block: suspend CoroutineScope.() -> Unit): Job
fun launchWhenStarted(block: suspend CoroutineScope.() -> Unit): Job
fun launchWhenResumed(block: suspend CoroutineScope.() -> Unit): Job
suspend fun <T> Lifecycle.whenStateAtLeast(
minState: Lifecycle.State,
block: suspend CoroutineScope.() -> T
)
复制代码
这些函数都属于Lifecycle
和LifecycleOwner
的扩展函数
依赖
implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0-rc03"
复制代码
提供开发者使用的只有这两个函数, 两个函数功能同样, 只是每一个参数接收时间单位不一致
fun <T> liveData( context: CoroutineContext = EmptyCoroutineContext, timeoutInMs: Long = DEFAULT_TIMEOUT, @BuilderInference block: suspend LiveDataScope<T>.() -> Unit
): LiveData<T> = CoroutineLiveData(context, timeoutInMs, block)
fun <T> liveData( context: CoroutineContext = EmptyCoroutineContext, timeout: Duration, @BuilderInference block: suspend LiveDataScope<T>.() -> Unit
): LiveData<T> = CoroutineLiveData(context, timeout.toMillis(), block)
复制代码
Dispatchers.Main.immediate
调度器liveData做用域具有发射数据和LiveData的做用
interface LiveDataScope<T> {
/** * Set's the [LiveData]'s value to the given [value]. If you've called [emitSource] previously, * calling [emit] will remove that source. * * Note that this function suspends until the value is set on the [LiveData]. * * @param value The new value for the [LiveData] * * @see emitSource */
suspend fun emit(value: T)
/** * Add the given [LiveData] as a source, similar to [MediatorLiveData.addSource]. Calling this * method will remove any source that was yielded before via [emitSource]. * * @param source The [LiveData] instance whose values will be dispatched from the current * [LiveData]. * * @see emit * @see MediatorLiveData.addSource * @see MediatorLiveData.removeSource */
suspend fun emitSource(source: LiveData<T>): DisposableHandle
/** * References the current value of the [LiveData]. * * If the block never `emit`ed a value, [latestValue] will be `null`. You can use this * value to check what was then latest value `emit`ed by your `block` before it got cancelled. * * Note that if the block called [emitSource], then `latestValue` will be last value * dispatched by the `source` [LiveData]. */
val latestValue: T?
}
复制代码