Kotlin Coroutines(协程) 彻底解析java
最全面的Kotlin协程: Coroutine/Channel/Flow 以及实际应用android
Kotlin协程-Scheduler的优秀设计markdown
Kotlin 1.3
添加了协程 Coroutine
的概念,文档中介绍协程是一种并发设计模式,能够在 Android
平台上使用它来简化异步执行的代码。数据结构
协程具备以下特色:并发
异步代码同步化:使用编写同步代码的方式编写异步代码。框架
轻量:您能够在单个线程上运行多个协程,由于协程支持挂起,不会使正在运行协程的线程阻塞。挂起比阻塞节省内存,且支持多个并行操做。异步
内存泄漏更少:使用结构化并发机制在一个做用域内执行多项操做。async
内置取消支持:取消操做会自动在运行中的整个协程层次结构内传播。ide
Jetpack
集成:许多 Jetpack 库都包含提供全面协程支持的扩展。某些库还提供本身的协程做用域,可供您用于结构化并发。
Kotlin
协程的挂起和恢复本质上是挂起函数的挂起和恢复。
suspend fun suspendFun() {}
复制代码
挂起函数
:suspend
关键字修饰的普通函数。若是在协程体内调用了挂起函数,那么调用处就被称为 挂起点
。挂起点若是出现 异步调用
,那么当前协程就会被挂起,直到对应的 Continuation.resume()
函数被调用才会恢复执行。
挂起函数和普通函数的区别在于:
挂起函数只能在协程体内或其余挂起函数内调用;
挂起函数能够调用任何函数,普通函数只能调用普通函数。
suspend
除用于修饰函数外还可用于修饰 lambda
表达式,在源码分析的章节会详细分析它们的区别。
dependencies {
// Kotlin Coroutines
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2'
// 使用 `Dispatchers.Main` 须要添加以下依赖
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.2'
}
复制代码
kotlin
协程框架为咱们提供了两种便捷的方式启动协程:
GlobalScop.launch
GlobalScope.async
分别来使用两种方式输出 Hello World!
:
fun main() {
GlobalScope.launch { // 使用 GlobalScope.launch 启动协程
delay(1000L) // 非阻塞的等待 1 秒钟(默认时间单位是毫秒)
println("World!") // 在延迟后打印输出
}
print("Hello ") // 协程已在等待时主线程还在继续
Thread.sleep(2000L) // 阻塞主线程 2 秒钟来保证 JVM 存活
}
fun main() {
GlobalScope.async { // 使用 GlobalScope.async 启动协程
delay(1000L)
println("World!")
}
print("Hello ")
Thread.sleep(2000L)
}
复制代码
从上面的例子里看这两种方式好像并无什么区别,其实区别在他们的返回值上
GlobalScop.launch
:返回值 Job
GlobalScope.async
:返回值 Deferred<T>
Deferred<T>
是 Job
的子类,而且能够经过调用 await
函数获取协程的返回值。上面 GlobalScope.async
的例子改造一下:
GlobalScope.launch {
val result = GlobalScope.async { // 使用 GlobalScope.async 启动协程
delay(1000L)
"World!"
}
println("Hello ${result.await()}")
}
Thread.sleep(2000L)
//输出:Hello World!
复制代码
上面的示例把 async
嵌套在了 launch
函数体内部,这是由于 await
是一个挂起函数,而挂起函数不一样于普通函数的就是它必须在协程体或其余挂起函数内部调用。
在协程体内 ({}
内) 能够隐藏 GlobalScope
直接使用 async、launch
启动协程,因此上面的示例能够修改以下:
GlobalScope.launch {
val result = async { // 使用 GlobalScope.async 启动协程
...
}
...
// launch {}
}
...
复制代码
经过了解协程的两种启动方式,咱们知道 GlobalScop.launch、GlobalScop.async
的返回值都是 Job
对象或其子类对象。那 Job
是什么呢? 又有哪些功能。
Job
是一个可取消的后台任务,用于操做协程的执行并记录执行过程当中协程的状态,因此通常来讲 Job
实例也表明了协程。
Job
具备以下几种状态:
| State | [isActive] | [isCompleted] | [isCancelled] |
| --------------------- | -------- | ---------- | ------------ |
| New (可选初始状态) | false
| false
| false
|
| Active (默认初始状态) | true
| false
| false
|
| Completing (瞬态) | true
| false
| false
|
| Cancelling (瞬态) | false
| false
| true
|
| Cancelled (最终状态) | false
| true
| true
|
| Completed (最终状态) | false
| true
| false
|
一般状况下,建立 Job
时会自动启动,状态默认为 _Active_
,可是若是建立时添加参数 CoroutineStart.Lazy
则状态为 _NEW_
,能够经过 start()
或 join()
等函数激活。
Job
状态流程图:
wait children
+-----+ start +--------+ complete +-------------+ finish +-----------+
| New | -----> | Active | ---------> | Completing | -------> | Completed |
+-----+ +--------+ +-------------+ +-----------+
| cancel / fail |
| +----------------+
| |
V V
+------------+ finish +-----------+
| Cancelling | --------------------------------> | Cancelled |
+------------+ +-----------+
复制代码
Job
的可用方法:
cancel(CancellationException)
:取消 Job
对应的协程并发送协程取消错误 (CancellationException
)。
invokeOnCompletion()
:注册当此 Job
状态更新为 Completed
时同步调用的处理程序。
join()
:挂起 Job
对应的协程,当协程完成时,外层协程恢复。
start()
:若是建立 Job
对象时使用的启动模式为 CoroutineStart.Lazy
,经过它能够启动协程。
cancelAndJoin()
:取消 Job
并挂起当前协程,直到 Job
被取消。
当要取消正在运行的协程:
val job = launch {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // 延迟一段时间
println("main: I'm tired of waiting!")
job.cancel() // 取消该做业
job.join() // 等待做业执行结束
println("main: Now I can quit.")
// 输出
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.
复制代码
上面示例中可使用 cancelAndJoin
函数它合并了对 cancel
以及 join
函数的调用。
注意:若是在协程执行过程当中没有挂起点,那么协程是不可被取消的。
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (i < 5) { // 一个执行计算的循环,只是为了占用 CPU
// 每秒打印消息两次
if (System.currentTimeMillis() >= nextPrintTime) {
println("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // 等待一段时间,并保证协程开始执行
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消一个做业而且等待它结束
println("main: Now I can quit.")
// 输出
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm sleeping 3 ...
job: I'm sleeping 4 ...
main: Now I can quit.
复制代码
简单来讲,若是协程体内没有挂起点的话,已开始执行的协程是没法取消的。
下面来介绍,协程启动时传参的含义及做用:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
...
}
复制代码
CoroutineStart
:协程启动模式。协程内提供了四种启动模式:
DEFAULT
:协程建立后,当即开始调度,在调度前若是协程被取消,其将直接进入取消相应的状态。
ATOMIC
:协程建立后,当即开始调度,协程执行到第一个挂起点以前不响应取消。
LAZY
:只有协程被须要时,包括主动调用协程的 start()、join()、await()
等函数时才会开始调度,若是调度前就被取消,那么该协程将直接进入异常结束状态。
UNDISPATCHED
:协程建立后当即执行,直到遇到第一个真正挂起的点。
当即调度和当即执行的区别:当即调度表示协程的调度器会当即接收到调度指令,但具体执行的时机以及在那个线程上执行,还须要根据调度器的具体状况而定,也就是说当即调度到当即执行之间一般会有一段时间。所以,咱们得出如下结论:
DEFAULT
虽然是当即调度,但也有可能在执行前被取消。
UNDISPATCHED
是当即执行,所以协程必定会执行。
ATOMIC
虽然是当即调度,但其将调度和执行两个步骤合二为一了,就像它的名字同样,其保证调度和执行是原子操做,所以协程也必定会执行。
UNDISPATCHED
和 ATOMIC
虽然都会保证协程必定执行,但在第一个挂起点以前,前者运行在协程建立时所在的线程,后者则会调度到指定的调度器所在的线程上执行。
CoroutineContext
:协程上下文。用于控制协程的行为,上文提到的 Job
和准备介绍的调度器都属于 CoroutineContext
。
协程默认提供了四种调度器:
Dispatchers.Default
:默认调度器,若是没有指定协程调度器和其余任何拦截器,那默认都使用它来构建协程。适合处理后台计算,其是一个 CPU
密集型任务调度器。
Dispatchers.IO
:IO
调度器,适合执行 IO
相关操做,其是一个 IO
密集型任务调度器。
Dispatchers.Main
:UI
调度器,会将协程调度到主线程中执行。
Dispatchers.Unconfined
:非受限制调度器,不要求协程执行在特定线程上。协程的调度器若是是 Unconfined
,那么它在挂起点恢复执行时会在恢复所在的线程上直接执行,固然,若是嵌套建立以它为调度器的协程,那么这些协程会在启动时被调度到协程框架内部的时间循环上,以免出现 StackOverflow
。
Dispatchers.Unconfined
:非受限调度器,会在调用它的线程启动协程,但它仅仅只是运行到第一个挂起点。挂起后,它恢复线程中的协程,而这彻底由被调用的挂起函数来决定。
runBlocking {
launch { // 运行在父协程的上下文中,即 runBlocking 主协程
println("main runBlocking : I'm working in thread ${Thread.currentThread().name}")
}
launch(Dispatchers.Unconfined) { // 不受限的——将工做在主线程中
println("Unconfined : I'm working in thread ${Thread.currentThread().name}")
}
launch(Dispatchers.Default) { // 将会获取默认调度器
println("Default : I'm working in thread ${Thread.currentThread().name}")
}
}
//输出结果
Unconfined : I'm working in thread main @coroutine#3
Default : I'm working in thread DefaultDispatcher-worker-1 @coroutine#4
main runBlocking : I'm working in thread main @coroutine#2
复制代码
除了能够在 GlobalScope.launch {}、GlobalScope.async {}
建立协程时设置协程调度器,
与
async {...}.await()
相比withContext
的内存开销更低,所以对于使用async
以后当即调用await
的状况,应当优先使用withContext
。
Kotlin
协程提供了 withTimeout
函数设置超时取消。若是运行超时,取消后会抛出 TimeoutCancellationException
异常。抛出异常的状况下回影响到其余协程,这时候可使用 withTimeoutOrNull
函数,它会在超时的状况下返回 null
而不抛出异常。
runBlocking {
val result = withContext(coroutineContext) {
withTimeoutOrNull(500) {
delay(1000)
"hello"
}
}
println(result)
}
// 输出结果
hello
复制代码
若是想要解决上面示例中的问题可使用 yield
函数。它的做用在于检查所在协程的状态,若是已经取消,则抛出取消异常予以响应。此外它还会尝试出让线程的执行权,给其余协程提供执行机会。
在上面示例中添加 yield
函数:
if (System.currentTimeMillis() >= nextPrintTime) {
yield()
println("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
// 输出结果
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.
复制代码
协程做用域:协程做用域主要用于明确协程之间的父子关系,以及对于取消或者异常处理等方面的传播行为。
协程做用域包括如下三种:
顶级做用域
:没有父协程的协程所在的做用域为顶级做用域。
协同做用域
:协程中启动新的协程,新协程为所在协程的子协程,这种状况下子协程所在的做用域默认为协同做用域。此时子协程抛出的未捕获异常将传递给父协程处理,父协程同时也会被取消。
主从做用域
:与协程做用域在协程的父子关系上一致,区别在于处于该做用域下的协程出现未捕获的异常时不会将异常向上传递给父协程。
父子协程间的关系:
父协程被取消,则全部子协程均被取消。
父协程须要等待子协程执行完毕以后才会最终进入完成状态,无论父协程自身的协程体是否已经执行完毕。
子协程会继承父协程的协程上下文元素,若是自身有相同 key
的成员,则覆盖对应的 key
,覆盖的效果仅限自身范围内有效。
声明顶级做用域:GlobalScope.launch {}
、runBlocking {}
声明协同做用域:coroutineScope {}
声明主从做用域:supervisorScope {}
coroutineScope {}
和 supervisorScope {}
是挂起函数因此它们只能在协程做用域中或挂起函数中调用。
coroutineScope {}
和 supervisorScope {}
的区别在于 SupervisorCoroutine
重写了 childCancelled()
函数使异常不会向父协程传递。
经过上文的介绍能够了解到协程其实就是执行在线程上的代码片断,因此线程的并发处理均可以用在协程上,好比 synchorinzed
、CAS
等。而协程自己也提供了两种方式处理并发:
Mutex
:互斥锁;
Semaphore
:信号量。
Mutex
相似于 synchorinzed
,协程竞争时将协程包装为 LockWaiter
使用双向链表存储。Mutex
还提供了 withLock
扩展函数,以简化使用:
runBlocking<Unit> {
val mutex = Mutex()
var counter = 0
repeat(10000) {
GlobalScope.launch {
mutex.withLock {
counter ++
}
}
}
Thread.sleep(500) //暂停一下子等待全部协程执行结束
println("The final count is $counter")
}
复制代码
Semaphore
用以限制访问特定资源的协程数量。
runBlocking<Unit> {
val semaphore = Semaphore(1)
var counter = 0
repeat(10000) {
GlobalScope.launch {
semaphore.withPermit {
counter ++
}
}
}
Thread.sleep(500) //暂停一下子等待全部协程执行结束
println("The final count is $counter")
}
复制代码
注意:只有在
permits = 1
时才和Mutex
功能相同。
咱们来看 suspend
修饰函数和修饰 lambda
的区别。
挂起函数:
suspend fun suspendFun() {
}
复制代码
编译成 java
代码以下:
@Nullable
public final Object suspendFun(@NotNull Continuation $completion) {
return Unit.INSTANCE;
}
复制代码
能够看到挂起函数其实隐藏着一个 Continuation
协程实例参数,而这个参数其实就来源于协程体或者其余挂起函数,所以挂起函数只能在协程体内或其余函数内调用了。
suspend
修饰 lambda
表达式:
suspend {}
// 反编译结果以下
Function1 var2 = (Function1)(new Function1((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
switch(this.label) {
case 0:
return Unit.INSTANCE;
default:
}
}
@NotNull
public final Continuation create(@NotNull Continuation completion) {
Function1 var2 = new <anonymous constructor>(completion);
return var2;
}
public final Object invoke(Object var1) {
return ((<undefinedtype>)this.create((Continuation)var1)).invokeSuspend(Unit.INSTANCE);
}
});
复制代码
suspend lambda
实际会被编译成 SuspendLambda
的子类。suspendLambda
的继承关系以下图:
经过反编译的代码能够发现咱们在协程体内编写的代码最终是在 invokeSuspend
函数内执行的。而在 BaseContinuationImpl
内实现了 Continuation
协程接口的 resumeWidth
函数,并在其内调用了 invokeSuspend
函数。
suspend
关键字的介绍先到这里,接下来咱们看协程是如何建立并运行的。
文件地址
kotlin.coroutines.Continuation.kt
。
Continuation.kt
文件基本属于协程的基础核心了,搞懂了它也就至关于搞懂了协程的基础原理。
协程接口的定义;
唤醒或启动协程的函数;
四种建立协程的函数;
帮助获取协程内的协程实例对象的函数。
首先是协程的接口声明,很是简单:
/**
* 协程接口,T 表示在最后一个挂起点恢复时的返回值类型
*/
public interface Continuation<in T> {
/**
* 协程上下文
*/
public val context: CoroutineContext
/**
* 这个函数的功能有不少,它能够启动协程,也能够恢复挂点,还能够做为最后一次挂起点恢复时输出协程的结果
*/
public fun resumeWith(result: Result<T>)
}
复制代码
协程接口声明以后 Continuation.kt
文件提供了两个调用 resumeWith
函数的函数:
public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
resumeWith(Result.failure(exception))
复制代码
这两个函数除了传参一成功一失败,它们的功能是如出一辙的,都是直接调用了 resumeWith
函数。至关因而 resumeWith
函数的封装。
再而后就是四种建立协程的方式了:
public fun <T> (suspend () -> T).createCoroutine(
completion: Continuation<T>
): Continuation<Unit> =
SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)
public fun <R, T> (suspend R.() -> T).createCoroutine(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> =
SafeContinuation(createCoroutineUnintercepted(receiver, completion).intercepted(), COROUTINE_SUSPENDED)
public fun <T> (suspend () -> T).startCoroutine(
completion: Continuation<T>
) {
createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}
public fun <R, T> (suspend R.() -> T).startCoroutine(
receiver: R,
completion: Continuation<T>
) {
createCoroutineUnintercepted(receiver, completion).intercepted().resume(Unit)
}
复制代码
这四种方式能够说是类似度超高,createCoroutine
和 startCoroutine
最大的区别在于,经过 createCoroutine
建立的协程须要掉用 resume
函数启动,而 startCoroutine
函数内部已经默认调用了 resume
函数。那咱们先用第一种方式建立一个协程:
// 建立协程
val continuation = suspend {
println("In Coroutine")
}.createCoroutine(object : Continuation<Unit> {
override fun resumeWith(result: Result<Unit>) {
println(result)
}
override val context = EmptyCoroutineContext
})
// 启动协程
continuation.resume(Unit)
复制代码
调用 createCoroutine
函数建立协程时传入了 Continuation
协程的匿名类对象,诶?好像有点不对,为何建立协程的时候要传一个协程实例进去,直接用不就成了。想知道为何的话,那就须要看看 createCoroutine
到底作了什么操做了。
SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)
复制代码
首先调用的是 createCoroutineUnintercepted
函数,它的源码能够在 kotlin.coroutines.intrinsics.IntrinsicsJvm.kt
内找到:
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(probeCompletion)
else
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function1<Continuation<T>, Any?>).invoke(it)
}
}
复制代码
probeCoroutineCreated
函数内直接将参数返回了,而且经过断点的方式,它的返回值和completion
传参是同样的,因此这里先忽略它。
经过断点会发现 (this is BaseContinuationImpl)
判断的返回值是 true
这也就间接证实了上文中 suspend lambda
和 BaseContinuationImpl
的继承关系。最后返回的是 create(Continuation)
函数的返回值,这里能够发现做为参数传入的 Continuation
变量被 suspend lambda
包裹了一层,而后返回,至关于 suspend lambda
成为了 Continuation
的代理。
到这里 createCoroutineUnintercepted(completion)
的含义就搞明白了:
将
object : Continuation<Unit> {}
建立的协程实例传入suspend lambda
,由其代理协程执行操做。
紧接着又调用了 intercepted
函数,intercepted
函数声明也在 IntrinsicsJvm.kt
文件内:
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> = (this as? ContinuationImpl)?.intercepted() ?: this
复制代码
接着看 ContinuationImpl
的 intercepted
函数:
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
复制代码
其中 context[ContinuationInterceptor]?.interceptContinuation(this)
这句代码涉及到协程拦截器的概念,下文会详细分析。这里能够先简单介绍一下,协程拦截器和协程其实也是代理的关系。因此 intercepted()
能够理解为若是协程上下文中添加了协程拦截器,那么就返回协程拦截器,否则就返回 suspend lambda
实例自己,而它们都实现了 Continuation
接口。
先作一个小结,经过上文的介绍基本就清楚了,createCoroutine、startCoroutine
函数其实不是用来建立协程的,协程实例就是它们的传参,它们是为协程添加代理的。
createCoroutineUnintercepted(completion).intercepted()
复制代码
经过上面的代码,为协程添加了代理,分别是 suspend lambda
和协程拦截器。这时候经过协程实例调用 resumeWith
函数时会先执行两层代理内实现的 resumeWith
函数逻辑,最终才会执行到协程的 resumeWith
函数输出最终结果。
在 createCoroutine
函数内,在添加两层代理以后又添加了一层代理,SafeContinuation
。SafeContinuation
内部使用协程的三种状态,并配合 CAS
操做,保证当前返回的 SafeContinuation
实例对象仅能调用一次 resumeWith
函数,屡次调用会报错。
UNDECIDED
:初始状态
COROUTINE_SUSPENDED
:挂起状态
RESUMED
:恢复状态
那为何协程要这么作,很麻烦不是?要弄清楚这个问题先来看 BaseContinuationImpl
的 resumeWith
函数实现吧。
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!!
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
current = completion
param = outcome
} else {
// top-level completion reached -- invoke and return
completion.resumeWith(outcome)
return
}
}
}
}
复制代码
当调用 resume(Unit)
启动协程时,因为代理的存在会调用到 BaseContinuationImpl
的 resumeWith()
函数,函数内会执行 invokeSuspend()
函数,也就说咱们所说的协程体。
查看以下代码的 invokeSuspend
函数:
suspend {5}
// 反编译后的 invokeSuspend 函数
public final Object invokeSuspend(@NotNull Object $result) {
Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
return Boxing.boxInt(5);
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
}
复制代码
能够看到这里直接返回了最终的结果 5
,接着在 ContinuationImpl.resumeWith
函数内最终调用
completion.resumeWith(outcome)
复制代码
输出协程的最终结果。
这是协程执行同步代码的过程,能够看到在整个过程当中,ContinuationImpl
好像并无起到什么做用,那接着来看在协程体内执行异步代码:
suspend {
suspendFunc()
}
suspend fun suspendFunc() = suspendCoroutine<Int> { continuation ->
thread {
Thread.sleep(1000)
continuation.resume(5)
}
}
// 反编译后
public final Object invokeSuspend(@NotNull Object $result) {
Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
Object var10000;
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
this.label = 1;
var10000 = DeepKotlin3Kt.suspendFunc(this);
if (var10000 == var2) {
return var2;
}
break;
case 1:
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return var10000;
}
public static final Object suspendFunc(@NotNull Continuation $completion) {
boolean var1 = false;
boolean var2 = false;
boolean var3 = false;
SafeContinuation var4 = new SafeContinuation(IntrinsicsKt.intercepted($completion));
Continuation continuation = (Continuation)var4;
int var6 = false;
ThreadsKt.thread$default(false, false, (ClassLoader)null, (String)null, 0, (Function0)(new DeepKotlin3Kt$suspendFunc02$2$1(continuation)), 31, (Object)null);
Object var10000 = var4.getOrThrow();
if (var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
DebugProbesKt.probeCoroutineSuspended($completion);
}
return var10000;
}
复制代码
resume
函数启动协程,invokeSuspend
函数第一次执行时 this.label == 0
执行 case 0
代码,this.label
变量赋值为 1
, 而后判断若是 if (var10000 == var2)
为 true
那么 invokeSuspend
函数返回 var2
,也就是 COROUTINE_SUSPENDED
标识,在 resumeWith
函数内,判断若是 invokeSuspend
函数的返回值为 COROUTINE_SUSPENDED
则 reture
。这也就是协程的挂起过程。
当线程执行结束,调用 resume
函数恢复协程时再次执行到 invokeSuspend
函数,这时 this.label == 1
,执行 case 1
代码,直接返回结果 5
。那在 resumeWith
函数内,这时就不会执行 return
了,最终会调用协程的 resumeWith
函数输出最终的结果,这也就是协程的恢复过程。
经过了解协程运行流程能够发现 ContinuationImpl
实际上是协程挂起和恢复逻辑的真正执行者。也正是由于协程挂起和恢复逻辑的存在,因此咱们能够像编写同步代码同样调用异步代码:
suspend {
println("Coroutine start")
println("Coroutine: ${System.currentTimeMillis()}")
val resultFun = suspendThreadFun()
println("Coroutine: suspendThreadFun-$resultFun-${System.currentTimeMillis()}")
val result = suspendNoThreadFun()
println("Coroutine: suspendNoThreadFun-$result-${System.currentTimeMillis()}")
}.startCoroutine(object : Continuation<Unit> {
override val context = EmptyCoroutineContext
override fun resumeWith(result: Result<Unit>) {
println("Coroutine End: $result")
}
})
suspend fun suspendThreadFun() = suspendCoroutine<Int> { continuation ->
thread {
Thread.sleep(1000)
continuation.resumeWith(Result.success(5))
}
}
suspend fun suspendNoThreadFun() = suspendCoroutine<Int> { continuation ->
continuation.resume(5)
}
//输出:
Coroutine start
Coroutine: 1627014868152
Coroutine: suspendThreadFun-5-1627014869182
Coroutine: suspendNoThreadFun-5-1627014869186
Coroutine End: Success(kotlin.Unit)
复制代码
在经过 createCoroutine
建立协程时,你会发现还可为它传递 receiver
参数,这个参数的做用是用于扩展协程体,通常称其为 协程做用域
。
public fun <R, T> (suspend R.() -> T).createCoroutine(
receiver: R,
completion: Continuation<T>
): Continuation<Unit> =
SafeContinuation(createCoroutineUnintercepted(receiver, completion).intercepted(), COROUTINE_SUSPENDED)
复制代码
能够看到 suspend lambda
表达式也出现了变化。咱们知道 () -> T
是 Function0
的 lambda
表达式,R.() -> T
至关于 R
类的 () -> T
扩展。若是了解扩展函数的话就知道扩展函数会将所扩展的类做为其参数,那么 R.() -> T
也就是 Function1
的 lambda
表达式了。
固然因为
suspend
关键字的做用,又增长了Continuation
参数,因此最终看到的就是Function1
和Function2
。
由于扩展函数的做用,因此能够在协程体内经过 this
(可隐藏)调用 receiver
的函数或者属性。示例以下:
launchCoroutine(ProducerScope<Int>()) {
produce(1000)
}
fun <R, T> launchCoroutine(receiver: R, block: suspend R.() -> T) {
block.startCoroutine(receiver, object : Continuation<T> {
override val context = EmptyCoroutineContext
override fun resumeWith(result: Result<T>) {
println("Coroutine End: $result")
}
})
}
class ProducerScope<T> {
fun produce(value: T) {
println(value)
}
}
复制代码
了解上文建立协程的逻辑以后再来分析 GlobalScope.launch
就很是简单了。GlobalScope.launch
最终会执行到 CoroutineStart.invoke
函数:
AbstractCoroutine.kt
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}
复制代码
CoroutineStart.kt
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}
复制代码
代码基本跟上文分析的一致。
协程上下文在协程中的做用很是大,有它在至关于协程有了装备卡槽同样。你能够将你想添加的上下文对象合并到 CoroutineContext
参数上,而后在其余地方使用。
CoroutineContext
的数据结构有以下特色:
能够经过 []
以相似 List
的方式访问任何一个协程上下文对象,[]
内是目标协程上下文。
协程上下文能够经过 +
的方式依次累加,固然 +=
也是可用的。
咱们来自定义一个协程上下文给协程添加一个名字:
public data class CoroutineName(val name: String) : AbstractCoroutineContextElement(CoroutineName) {
public companion object Key : CoroutineContext.Key<CoroutineName>
override fun toString(): String = "CoroutineName($name)"
}
复制代码
应用到示例中:
var coroutineContext: CoroutineContext = EmptyCoroutineContext
coroutineContext += CoroutineName("c0-01")
suspend {
println("Run Coroutine")
}.startCoroutine(object : Continuation<Unit> {
override fun resumeWith(result: Result<Unit>) {
println("${context[CoroutineName]?.name}")
}
override val context = coroutineContext
})
//输出:
Run Coroutine
c0-01
复制代码
其实协程已经为咱们提供了
CoroutineName
实现。
经过实现拦截器接口 ContinuationInterceptor
来定义拦截器,由于拦截器也是协程上下文的一类实现,因此使用拦截器时将其添加到对应的协程上下文中便可。
声明一个日志拦截器:
class LogInterceptor : ContinuationInterceptor {
override val key = ContinuationInterceptor
override fun <T> interceptContinuation(continuation: Continuation<T>) = LogContinuation(continuation)
}
class LogContinuation<T>(private val continuation: Continuation<T>) : Continuation<T> by continuation {
override fun resumeWith(result: Result<T>) {
println("before resumeWith: $result")
continuation.resumeWith(result)
println("after resumeWith")
}
}
复制代码
拦截器的关键拦截函数是 interceptContinuation
,能够根据须要返回一个新的 Continuation
实例。
在协程生命周期内每次恢复调用都会触发拦截器。恢复调用有以下两种状况:
协程启动时调用一次,经过恢复调用来开始执行协程体从开始到下一次挂起之间的逻辑。
挂起点处若是异步挂起,则在恢复时再调用一次。
由此可知,恢复调用的次数为 n+1
次,其中 n
是协程体内真正挂起执行异步逻辑的挂起点的个数。
改写上面的例子:
// 异步挂起函数
suspend fun suspendFunc02() = suspendCoroutine<Int> { continuation ->
thread {
continuation.resumeWith(Result.success(5))
}
}
// 开启协程 - 未添加日志拦截器
suspend {
suspendFunc02()
suspendFunc02()
}.startCoroutine(object : Continuation<Int> {
override val context: CoroutineContext = EmptyCoroutineContext
override fun resumeWith(result: Result<Int>) {
...
result.onSuccess {
println("Coroutine End: ${context[CoroutineName]?.name}, $result")
}
}
})
// 输出以下
Coroutine End: Success(5)
// 开启协程 - 添加日志拦截器
suspend {
suspendFunc02()
suspendFunc02()
}.startCoroutine(object : Continuation<Int> {
override val context: CoroutineContext = LogInterceptor()
override fun resumeWith(result: Result<Int>) {
...
result.onSuccess {
println("Coroutine End: ${context[CoroutineName]?.name}, $result")
}
}
})
// 输出以下:
before resumeWith: Success(kotlin.Unit)
after resumeWith
before resumeWith: Success(5)
after resumeWith
before resumeWith: Success(5)
after resumeWith
复制代码