关键词:Kotlin 异步编程 协程java
上一篇咱们知道了协程启动的几种模式,也经过示例认识了
launch
启动协程的使用方法,本文将延续这些内容从调度的角度来进一步为你们揭示协程的奥义。android
调度器本质上就是一个协程上下文的实现,咱们先来介绍下上下文。编程
前面咱们提到 launch
函数有三个参数,第一个参数叫 上下文,它的接口类型是 CoroutineContext
,一般咱们见到的上下文的类型是 CombinedContext
或者 EmptyCoroutineContext
,一个表示上下文的组合,另外一个表示什么都没有。咱们来看下 CoroutineContext
的接口方法:缓存
@SinceKotlin("1.3")
public interface CoroutineContext {
public operator fun <E : Element> get(key: Key<E>): E?
public fun <R> fold(initial: R, operation: (R, Element) -> R): R
public operator fun plus(context: CoroutineContext): CoroutineContext = ...
public fun minusKey(key: Key<*>): CoroutineContext
public interface Key<E : Element>
public interface Element : CoroutineContext {
public val key: Key<*>
...
}
}
复制代码
不知道你们有没有发现,它简直就是一个以 Key
为索引的 List
:安全
CoroutineContext | List |
---|---|
get(Key) | get(Int) |
plus(CoroutineContext) | plus(List) |
minusKey(Key) | removeAt(Int) |
表中的
List.plus(List)
实际上指的是扩展方法Collection<T>.plus(elements: Iterable<T>): List<T>
bash
CoroutineContext
做为一个集合,它的元素就是源码中看到的 Element
,每个 Element
都有一个 key
,所以它能够做为元素出现,同时它也是 CoroutineContext
的子接口,所以也能够做为集合出现。数据结构
讲到这里,你们就会明白,CoroutineContext
原来是个数据结构啊。若是你们对于 List
的递归定义比较熟悉的话,那么对于 CombinedContext
和 EmptyCoroutineContext
也就很容易理解了,例如 scala 的 List
是这么定义的:多线程
sealed abstract class List[+A] extends ... {
...
def head: A
def tail: List[A]
...
}
复制代码
在模式匹配的时候,List(1,2,3,4)
是能够匹配 x::y
的,x
就是 1,y
则是 List(2,3,4)
。并发
CombinedContext
的定义也很是相似:框架
internal class CombinedContext(
private val left: CoroutineContext,
private val element: Element
) : CoroutineContext, Serializable {
...
}
复制代码
只不过它是反过来的,前面是集合,后面是单独的一个元素。咱们在协程体里面访问到的 coroutineContext
大可能是这个 CombinedContext
类型,表示有不少具体的上下文实现的集合,咱们若是想要找到某一个特别的上下文实现,就须要用对应的 Key
来查找,例如:
suspend fun main(){
GlobalScope.launch {
println(coroutineContext[Job]) // "coroutine#1":StandaloneCoroutine{Active}@1ff62014
}
println(coroutineContext[Job]) // null,suspend main 虽然也是协程体,但它是更底层的逻辑,所以没有 Job 实例
}
复制代码
这里的 Job
其实是对它的 companion object
的引用
public interface Job : CoroutineContext.Element {
/** * Key for [Job] instance in the coroutine context. */
public companion object Key : CoroutineContext.Key<Job> { ... }
...
}
复制代码
因此咱们也能够仿照
Thread.currentThread()
来一个获取当前Job
的方法:suspend inline fun Job.Key.currentJob() = coroutineContext[Job] suspend fun coroutineJob(){ GlobalScope.launch { log(Job.currentJob()) } log(Job.currentJob()) } 复制代码
咱们能够经过指定上下文为协程添加一些特性,一个很好的例子就是为协程添加名称,方便调试:
GlobalScope.launch(CoroutineName("Hello")) {
...
}
复制代码
若是有多个上下文须要添加,直接用 +
就能够了:
GlobalScope.launch(Dispatchers.Main + CoroutineName("Hello")) {
...
}
复制代码
Dispatchers.Main
是调度器的一个实现,不用担忧,咱们很快就会认识它了。
费了好大劲儿说完上下文,这里就要说一个比较特殊的存在了——拦截器。
public interface ContinuationInterceptor : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
...
}
复制代码
拦截器也是一个上下文的实现方向,拦截器能够左右你的协程的执行,同时为了保证它的功能的正确性,协程上下文集合永远将它放在最后面,这真可谓是天选之子了。
它拦截协程的方法也很简单,由于协程的本质就是回调 + “黑魔法”,而这个回调就是被拦截的 Continuation
了。用过 OkHttp 的小伙伴一下就兴奋了,拦截器我经常使用的啊,OkHttp 用拦截器作缓存,打日志,还能够模拟请求,协程拦截器也是同样的道理。调度器就是基于拦截器实现的,换句话说调度器就是拦截器的一种。
咱们能够本身定义一个拦截器放到咱们的协程上下文中,看看会发生什么。
class MyContinuationInterceptor: ContinuationInterceptor{
override val key = ContinuationInterceptor
override fun <T> interceptContinuation(continuation: Continuation<T>) = MyContinuation(continuation)
}
class MyContinuation<T>(val continuation: Continuation<T>): Continuation<T> {
override val context = continuation.context
override fun resumeWith(result: Result<T>) {
log("<MyContinuation> $result" )
continuation.resumeWith(result)
}
}
复制代码
咱们只是在回调处打了一行日志。接下来咱们把用例拿出来:
suspend fun main() {
GlobalScope.launch(MyContinuationInterceptor()) {
log(1)
val job = async {
log(2)
delay(1000)
log(3)
"Hello"
}
log(4)
val result = job.await()
log("5. $result")
}.join()
log(6)
}
复制代码
这多是迄今而止咱们给出的最复杂的例子了,不过请你们不要被它吓到,它依然很简单。咱们经过 launch
启动了一个协程,为它指定了咱们本身的拦截器做为上下文,紧接着在其中用 async
启动了一个协程,async
与 launch
从功能上是同等类型的函数,它们都被称做协程的 Builder 函数,不一样之处在于 async
启动的 Job
也就是实际上的 Deferred
能够有返回结果,能够经过 await
方法获取。
可想而知,result
的值就是 Hello。那么这段程序运行的结果如何呢?
15:31:55:989 [main] <MyContinuation> Success(kotlin.Unit) // ①
15:31:55:992 [main] 1
15:31:56:000 [main] <MyContinuation> Success(kotlin.Unit) // ②
15:31:56:000 [main] 2
15:31:56:031 [main] 4
15:31:57:029 [kotlinx.coroutines.DefaultExecutor] <MyContinuation> Success(kotlin.Unit) // ③
15:31:57:029 [kotlinx.coroutines.DefaultExecutor] 3
15:31:57:031 [kotlinx.coroutines.DefaultExecutor] <MyContinuation> Success(Hello) // ④
15:31:57:031 [kotlinx.coroutines.DefaultExecutor] 5. Hello
15:31:57:031 [kotlinx.coroutines.DefaultExecutor] 6
复制代码
“// ①” 不是程序输出的内容,仅为后续讲解方便而作的标注。
你们可能就要奇怪了,你不是说 Continuation
是回调么,这里面回调调用也就一次啊(await
那里),怎么日志打印了四次呢?
别慌,咱们按顺序给你们介绍。
首先,全部协程启动的时候,都会有一次 Continuation.resumeWith
的操做,这一次操做对于调度器来讲就是一次调度的机会,咱们的协程有机会调度到其余线程的关键之处就在于此。 ①、② 两处都是这种状况。
其次,delay
是挂起点,1000ms 以后须要继续调度执行该协程,所以就有了 ③ 处的日志。
最后,④ 处的日志就很容易理解了,正是咱们的返回结果。
可能有朋友还会有疑问,我并无在拦截器当中切换线程,为何从 ③ 处开始有了线程切换的操做?这个切换线程的逻辑源自于 delay
,在 JVM 上 delay
其实是在一个 ScheduledExcecutor
里面添加了一个延时任务,所以会发生线程切换;而在 JavaScript 环境中则是基于 setTimeout,若是运行在 Nodejs 上,delay
就不会切线程了,毕竟人家是单线程的。
若是咱们在拦截器当中本身处理了线程切换,那么就实现了本身的一个简单的调度器,你们有兴趣能够本身去尝试。
思考:拦截器能够有多个吗?
有了前面的基础,咱们对于调度器的介绍就变得水到渠成了。
public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
...
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
...
}
复制代码
它自己是协程上下文的子类,同时实现了拦截器的接口, dispatch
方法会在拦截器的方法 interceptContinuation
中调用,进而实现协程的调度。因此若是咱们想要实现本身的调度器,继承这个类就能够了,不过一般咱们都用现成的,它们定义在 Dispatchers
当中:
val Default: CoroutineDispatcher
val Main: MainCoroutineDispatcher
val Unconfined: CoroutineDispatcher
复制代码
这个类的定义涉及到了 Kotlin MPP 的支持,所以你在 Jvm 版本当中还会看到 val IO: CoroutineDispatcher
,在 js 和 native 当中就只有前面提到的这三个了(对 Jvm 好偏爱呐)。
Jvm | Js | Native | |
---|---|---|---|
Default | 线程池 | 主线程循环 | 主线程循环 |
Main | UI 线程 | 与 Default 相同 | 与 Default 相同 |
Unconfined | 直接执行 | 直接执行 | 直接执行 |
IO | 线程池 | -- | -- |
Kotlin 的用户绝大多数都是 Android 开发者,你们对 UI 的开发需求仍是比较大的。咱们举一个很常见的场景,点击一个按钮作点儿异步的操做再回调刷新 UI:
getUserBtn.setOnClickListener {
getUser { user ->
handler.post {
userNameView.text = user.name
}
}
}
复制代码
咱们简单得给出 getUser
函数的声明:
typealias Callback = (User) -> Unit
fun getUser(callback: Callback){
...
}
复制代码
因为 getUser
函数须要切到其余线程执行,所以回调一般也会在这个非 UI 的线程中调用,因此为了确保 UI 正确被刷新,咱们须要用 handler.post
切换到 UI 线程。上面的写法就是咱们最古老的写法了。
后来又有了 RxJava,那么事情开始变得有趣了起来:
fun getUserObservable(): Observable<User> {
return Observable.create<User> { emitter ->
getUser {
emitter.onNext(it)
}
}
}
复制代码
因而点击按钮的事件能够这么写:
getUserBtn.setOnClickListener {
getUserObservable()
.observeOn(AndroidSchedulers.mainThread())
.subscribe { user ->
userNameView.text = user.name
}
}
复制代码
其实 RxJava 在线程切换上的表现是很是优秀的,也正是如此,不少人甚至用它只是为了切线程方便!
那么咱们如今把这段代码过渡到协程的写法:
suspend fun getUserCoroutine() = suspendCoroutine<User> {
continuation ->
getUser {
continuation.resume(it)
}
}
复制代码
按钮点击时,咱们能够:
getUserBtn.setOnClickListener {
GlobalScope.launch(Dispatchers.Main) {
userNameView.text = getUserCoroutine().name
}
}
复制代码
你们也能够用 anko-coroutines 当中的 View.onClick 扩展,这样咱们就无需本身在这里用
launch
启动协程了。有关 Anko 对协程的支持,咱们后面专门安排一篇文章介绍。
这里又有你们没见过的内容啦,suspendCoroutine
这个方法并非帮咱们启动协程的,它运行在协程当中而且帮咱们获取到当前协程的 Continuation
实例,也就是拿到回调,方便后面咱们调用它的 resume
或者 resumeWithException
来返回结果或者抛出异常。
若是你重复调用
resume
或者resumeWithException
会收获一枚IllegalStateException
,仔细想一想这是为何。
对比前面的 RxJava 的作法,你会发现这段代码其实很容易理解,你甚至会发现协程的使用场景与 RxJava 竟是如此的类似。这里咱们用到了 Dispatchers.Main
来确保 launch
启动的协程在调度时始终调度到 UI 线程,那么下面咱们来看看 Dispatchers.Main
的具体实现。
在 Jvm 上,Main
的实现也比较有意思:
internal object MainDispatcherLoader {
@JvmField
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
private fun loadMainDispatcher(): MainCoroutineDispatcher {
return try {
val factories = MainDispatcherFactory::class.java.let { clz ->
ServiceLoader.load(clz, clz.classLoader).toList()
}
factories.maxBy { it.loadPriority }?.tryCreateDispatcher(factories)
?: MissingMainCoroutineDispatcher(null)
} catch (e: Throwable) {
MissingMainCoroutineDispatcher(e)
}
}
}
复制代码
在 Android 当中,协程框架经过注册 AndroidDispatcherFactory
使得 Main
最终被赋值为 HandlerDispatcher
的实例,有兴趣的能够去看下 kotlinx-coroutines-android 的源码实现。
注意前面对于 RxJava 和协程的实现,咱们都没有考虑异常和取消的问题。有关异常和取消的话题,咱们会在后面的文章中详细介绍。
调度器的目的就是切线程,你不要想着我在 dispatch
的时候根据本身的心情来随机调用,那你是在害你本身(不怕各位笑话,这样的代码我还真写过,仅供娱乐)。那么问题就简单了,咱们只要提供线程,调度器就应该很方便的建立出来:
suspend fun main() {
val myDispatcher= Executors.newSingleThreadExecutor{ r -> Thread(r, "MyThread") }.asCoroutineDispatcher()
GlobalScope.launch(myDispatcher) {
log(1)
}.join()
log(2)
}
复制代码
输出的信息就代表协程运行在咱们本身的线程上。
16:10:57:130 [MyThread] 1
16:10:57:136 [MyThread] 2
复制代码
不过请你们注意,因为这个线程池是咱们本身建立的,所以咱们须要在合适的时候关闭它,否则的话:
咱们能够经过主动关闭线程池或者调用:
myDispatcher.close()
复制代码
来结束它的生命周期,再次运行程序就会正常退出了。
固然有人会说你建立的线程池的线程不是 daemon 的,因此主线程结束时 Jvm 不会中止运行。说的没错,但该释放的仍是要及时释放,若是你只是在程序的整个生命周期当中短暂的用了一下这个调度器,那么一直不关闭它对应的线程池岂不是会有线程泄露吗?这就很尴尬了。
Kotlin 协程设计者也特别惧怕你们注意不到这一点,还特意废弃了两个 API 而且开了一个 issue 说咱们要重作这套 API,这两个可怜的家伙是谁呢?
废弃的两个基于线程池建立调度器的 API
fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher
fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher
复制代码
这两者能够很方便的建立绑定到特定线程的调度器,但过于简洁的 API 彷佛会让人忘记它的风险。Kotlin 一贯不爱作这种不清不楚的事儿,因此您呢,仍是像咱们这一节例子当中那样本身去构造线程池吧,这样好歹本身忘了关闭也怨不着别人(哈哈哈)。
其实在多个线程上运行协程,线程老是这样切来切去其实并不会显得很轻量级,例以下面的例子就是比较可怕的了:
Executors.newFixedThreadPool(10)
.asCoroutineDispatcher().use { dispatcher ->
GlobalScope.launch(dispatcher) {
log(1)
val job = async {
log(2)
delay(1000)
log(3)
"Hello"
}
log(4)
val result = job.await()
log("5. $result")
}.join()
log(6)
}
复制代码
这里面除了 delay
那里有一次不可避免的线程切换外,其余几处协程挂起点的继续操做(Continuation.resume
)都会切线程:
16:28:04:771 [pool-1-thread-1] 1
16:28:04:779 [pool-1-thread-1] 4
16:28:04:779 [pool-1-thread-2] 2
16:28:05:790 [pool-1-thread-3] 3
16:28:05:793 [pool-1-thread-4] 5. Hello
16:28:05:794 [pool-1-thread-4] 6
复制代码
若是咱们的线程池只开 1 个线程,那么这里全部的输出都将在这惟一的线程中打印:
16:40:14:685 [pool-1-thread-1] 1
16:40:14:706 [pool-1-thread-1] 4
16:40:14:710 [pool-1-thread-1] 2
16:40:15:723 [pool-1-thread-1] 3
16:40:15:725 [pool-1-thread-1] 5. Hello
16:40:15:725 [pool-1-thread-1] 6
复制代码
对比这两者,10个线程的状况线程切换次数最少 3次,而 1 个线程的状况则只要 delay
1000ms 以后恢复执行的时候那一次。只是多两次线程切换,到底会有多大影响呢?我在我本身的 2015 款 mbp 上对于两种不一样的状况分别循环运行 100 次,获得的平均时间以下:
线程数 | 10 | 1 |
---|---|---|
耗时ms | 1006.00 | 1004.97 |
注意,为了测试的公平性,在运行 100 次循环以前已经作好了预热,确保全部类都已经加载。测试结果仅供参考。
也就是说多两次线程切换平均能多出 1ms 的耗时。生产环境当中的代码固然会更复杂,若是这样用线程池去调度,结果可想而知。
实际上一般咱们只须要在一个线程当中处理本身的业务逻辑,只有一些耗时的 IO 才须要切换到 IO 线程中处理,因此好的作法能够参考 UI 对应的调度器,本身经过线程池定义调度器的作法自己没什么问题,但最好只用一个线程,由于多线程除了前面说的线程切换的开销外,还有线程安全的问题。
Js 和 Native 的并发模型与 Jvm 不一样,Jvm 暴露了线程 API 给用户,这也使得协程的调度能够由用户更灵活的选择。越多的自由,意味着越多的代价,咱们在 Jvm 上面编写协程代码时须要明白一点的是,线程安全问题在调度器不一样的协程之间仍然存在。
好的作法,就像咱们前面一节提到的,尽可能把本身的逻辑控制在一个线程以内,这样一方面节省了线程切换的开销,另外一方面还能够避免线程安全问题,一箭双鵰。
若是你们在协程代码中使用锁之类的并发工具就反而增长了代码的复杂度,对此个人建议是你们在编写协程代码时尽可能避免对外部做用域的可变变量进行引用,尽可能使用参数传递而非对全局变量进行引用。
如下是一个错误的例子,你们很容易就能想明白:
suspend fun main(){
var i = 0
Executors.newFixedThreadPool(10)
.asCoroutineDispatcher().use { dispatcher ->
List(1000000) {
GlobalScope.launch(dispatcher) {
i++
}
}.forEach {
it.join()
}
}
log(i)
}
复制代码
输出的结果:
16:59:28:080 [main] 999593
复制代码
上一篇文章咱们提到了 suspend main 会启动一个协程,咱们示例中的协程都是它的子协程,但是这个最外层的协程究竟是怎么来的呢?
咱们先给出一个例子:
suspend fun main() {
log(1)
GlobalScope.launch {
log(2)
}.join()
log(3)
}
复制代码
它等价于下面的写法:
fun main() {
runSuspend {
log(1)
GlobalScope.launch {
log(2)
}.join()
log(3)
}
}
复制代码
那你说这个 runSuspend
又是何妨神圣?它是 Kotlin 标准库的一个方法,注意它不是 kotlinx.coroutines 当中的,它实际上属于更底层的 API 了。
internal fun runSuspend(block: suspend () -> Unit) {
val run = RunSuspend()
block.startCoroutine(run)
run.await()
}
复制代码
而这里面的 RunSuspend
则是 Continuation
的实现:
private class RunSuspend : Continuation<Unit> {
override val context: CoroutineContext
get() = EmptyCoroutineContext
var result: Result<Unit>? = null
override fun resumeWith(result: Result<Unit>) = synchronized(this) {
this.result = result
(this as Object).notifyAll()
}
fun await() = synchronized(this) {
while (true) {
when (val result = this.result) {
null -> (this as Object).wait()
else -> {
result.getOrThrow() // throw up failure
return
}
}
}
}
}
复制代码
它的上下文是空的,所以 suspend main 启动的协程并不会有任何调度行为。
经过这个例子咱们能够知道,实际上启动一个协程只须要有一个 lambda 表达式就能够了,想当年 Kotlin 1.1 刚发布的时候,我写了一系列的教程都是以标准库 API 为基础的,后来发现标准库的 API 也许真的不是给咱们用的,因此看看就好。
上述代码在标准库当中被修饰为
internal
,所以咱们没法直接使用它们。不过你能够把 RunSuspend.kt 当中的内容复制到你的工程当中,这样你就能够直接使用啦,其中的var result: Result<Unit>? = null
可能会报错,不要紧,改为private var result: Result<Unit>? = null
就能够了。
在这篇文章当中,咱们介绍了协程上下文,介绍了拦截器,进而最终引出了咱们的调度器,截止目前,咱们还有异常处理、协程取消、Anko 对协程的支持等话题没有讲到,若是你们有协程相关想了解的话题,能够留言哈~
欢迎关注 Kotlin 中文社区!
中文官网:www.kotlincn.net/
中文官方博客:www.kotliner.cn/
公众号:Kotlin
知乎专栏:Kotlin
CSDN:Kotlin中文社区
掘金:Kotlin中文社区
简书:Kotlin中文社区