破解 Kotlin 协程(5) - 协程取消篇

关键词:Kotlin 协程 协程取消 任务中止java

协程的任务的取消须要靠协程内部调用的协做支持,这就相似于咱们线程中断以及对中断状态的响应同样。git

1. 线程的中断

咱们先从你们熟悉的话题讲起。线程有一个被废弃的 stop 方法,这个方法会让线程当即死掉,而且释放它持有的锁,这样会让它正在读写的存储处于一个不安全的状态,所以 stop 被废弃了。若是咱们启动了一个线程并让它执行一些任务,但很快咱们就后悔了,stop 还不让用,那该怎么办?程序员

val thread = thread {
    ...
}

thread.stop() // !!! Deprecated!!!

复制代码

咱们应该想办法让线程内部正在运行的任务跟咱们合做把任务停掉,这样线程内部的任务中止以前还有机会清理一些资源,好比关闭流等等。github

val thread = thread {
    try {
        Thread.sleep(10000)
    } catch (e: InterruptedException) {
        log("Interrupted, do cleaning stuff.")
    }
}

thread.interrupt()

复制代码

sleep 这样的方法调用,文档明确指出它支持 InterruptedException,所以当线程被标记为中断状态时,它就会抛出 InterruptedException ,那么咱们天然就能够捕获异常并作资源清理了。api

因此请注意所谓的协做式的任务终止,协程的取消也就是 cancel 机制的思路也是如此。安全

2. 协程相似的例子

咱们来看一个协程取消的例子:bash

fun main() = runBlocking {
    val job1 = launch { // ①
        log(1)
        delay(1000) // ②
        log(2)
    }
    delay(100)
    log(3)
    job1.cancel() // ③
    log(4)
}

复制代码

此次咱们用了一个不同的写法,咱们没有用 suspend main,而是直接用 runBlocking 启动协程,这个方法在 Native 上也存在,都是基于当前线程启动一个相似于 Android 的 Looper 的死循环,或者叫消息队列,能够不断的发送消息给它进行处理。runBlocking 会启动一个 Job,所以这里也存在默认的做用域,不过这对于咱们今天的讨论暂时没有太大影响。网络

这段代码 ① 处启动了一个子协程,它内部先输出 1,接着开始 delaydelay 与线程的 sleep 不一样,它不会阻塞线程,你能够认为它实际上就是触发了一个延时任务,告诉协程调度系统 1000ms 以后再来执行后面的这段代码(也就是 log(2));而在这期间,咱们在 ③ 处对刚才启动的协程触发了取消,所以在 ② 处的 delay 尚未回调的时候协程就被取消了,由于 delay 能够响应取消,所以 delay 后面的代码就不会再次调度了,不调度的缘由也很简单,② 处的 delay 会抛一个 CancellationException框架

...
log(1)
try {
    delay(1000)
} catch (e: Exception) {
    log("cancelled. $e")
}
log(2)
...

复制代码

那么输出的结果就不同了:异步

06:54:56:361 [main] 1
06:54:56:408 [main] 3
06:54:56:411 [main] 4
06:54:56:413 [main] cancelled. kotlinx.coroutines.JobCancellationException: Job was cancelled; job=StandaloneCoroutine{Cancelling}@e73f9ac
06:54:56:413 [main] 2
复制代码

你们看,这与线程的中断逻辑是否是很是的相似呢?

3. 完善咱们以前的例子

以前咱们有个例子,上一篇文章已经加入了异常处理逻辑,那么此次咱们给它加上取消逻辑。以前是这样:

suspend fun getUserCoroutine() = suspendCoroutine<User> { continuation ->
    getUser(object : Callback<User> {
        override fun onSuccess(value: User) {
            continuation.resume(value)
        }

        override fun onError(t: Throwable) {
            continuation.resumeWithException(t)
        }
    })
}

复制代码

加取消逻辑,那须要咱们的 getUser 回调版本支持取消,咱们看下咱们的 getUser 是怎么实现的:

fun getUser(callback: Callback<User>) {
    val call = OkHttpClient().newCall(
            Request.Builder()
                    .get().url("https://api.github.com/users/bennyhuo")
                    .build())

    call.enqueue(object : okhttp3.Callback {
        override fun onFailure(call: Call, e: IOException) {
            callback.onError(e)
        }

        override fun onResponse(call: Call, response: Response) {
            response.body()?.let {
                try {
                    callback.onSuccess(User.from(it.string()))
                } catch (e: Exception) {
                    callback.onError(e) // 这里多是解析异常
                }
            }?: callback.onError(NullPointerException("ResponseBody is null."))
        }
    })
}

复制代码

咱们发了个网络请求给 Github,让它把一个叫 bennyhuo 的用户信息返回来,咱们知道 OkHttp 的这个 Call 是支持 cancel 的, 取消后,网络请求过程当中若是读取到这个取消的状态,就会把请求给中止掉。既然这样,咱们干脆直接改造 getUser 好了,这样还能省掉咱们本身的 Callback 回调过程:

suspend fun getUserCoroutine() = suspendCancellableCoroutine<User> { continuation ->
    val call = OkHttpClient().newCall(...)

    continuation.invokeOnCancellation { // ①
        log("invokeOnCancellation: cancel the request.")
        call.cancel()
    }

    call.enqueue(object : okhttp3.Callback {
        override fun onFailure(call: Call, e: IOException) {
            log("onFailure: $e")
            continuation.resumeWithException(e)
        }

        override fun onResponse(call: Call, response: Response) {
            log("onResponse: ${response.code()}")
            response.body()?.let {
                try {
                    continuation.resume(User.from(it.string()))
                } catch (e: Exception) {
                    continuation.resumeWithException(e)
                }
            } ?: continuation.resumeWithException(NullPointerException("ResponseBody is null."))
        }
    })
}

复制代码

咱们这里用到了 suspendCancellableCoroutine,而不是以前的 suspendCoroutine,这就是为了让咱们的挂起函数支持协程的取消。该方法将获取到的 Continuation 包装成了一个 CancellableContinuation,经过调用它的 invokeOnCancellation 方法能够设置一个取消事件的回调,一旦这个回调被调用,那么意味着 getUserCoroutine 调用所在的协程被取消了,这时候咱们也要相应的作出取消的响应,也就是把 OkHttp 发出去的请求给取消掉。

那么咱们在调用它的时候,若是遇到了取消,会怎么样呢?

val job1 = launch { //①
    log(1)
    val user = getUserCoroutine()
    log(user)
    log(2)
}
delay(10)
log(3)
job1.cancel()
log(4)

复制代码

注意咱们启动 ① 以后仅仅延迟了 10ms 就取消了它,网络请求的速度通常来说还不会这么快,所以取消的时候大几率 getUserCoroutine 被挂起了,所以结果大几率是:

07:31:30:751 [main] 1
07:31:31:120 [main] 3
07:31:31:124 [main] invokeOnCancellation: cancel the request.
07:31:31:129 [main] 4
07:31:31:131 [OkHttp https://api.github.com/...] onFailure: java.io.IOException: Canceled
复制代码

咱们发现,取消的回调被调用了,OkHttp 在收到咱们的取消指令以后,也确实中止了网络请求,而且回调给咱们一个 IO 异常,这时候咱们的协程已经被取消,在处于取消状态的协程上调用 Continuation.resumeContinuation.resumeWithException 或者 Continuation.resumeWith 都会被忽略,所以 OkHttp 回调中咱们收到 IO 异常后调用的 continuation.resumeWithException(e) 不会有任何反作用。

4. 再谈 Retrofit 的协程扩展

4.1 Jake Wharton 的 Adapter 存在的问题

我在破解 Kotlin 协程 - 入门篇 提到了 Jake Wharton 大神为 Retrofit 写的 协程 Adapter,

implementation 'com.jakewharton.retrofit:retrofit2-kotlin-coroutines-adapter:0.9.2'
复制代码

它确实能够完成网络请求,不过有细心的小伙伴发现了它的问题:它怎么取消呢?咱们把使用它的代码贴出来:

interface GitHubServiceApi {
    @GET("users/{login}")
    fun getUserCoroutine(@Path("login") login: String): Deferred<User>
}

复制代码

定义好接口,建立 Retrofit 实例的时候传入对应的 Adapter:

val gitHubServiceApi by lazy {
    val retrofit = retrofit2.Retrofit.Builder()
            .baseUrl("https://api.github.com")
            .addConverterFactory(GsonConverterFactory.create())
            .addCallAdapterFactory(CoroutineCallAdapterFactory()) // 这里添加 Adapter
            .build()

    retrofit.create(GitHubServiceApi::class.java)
}

复制代码

用的时候就这样:

val deferred = gitHubServiceApi.getUserCoroutine("bennyhuo")
try {
    showUser(deferred.await())
} catch (e: Exception) {
    showError(e)
}

复制代码

若是要取消,咱们能够直接调用 deferred.cancel(),例如:

log("1")
val deferred = gitHubServiceApi.getUserCoroutine("bennyhuo")
log("2")
withContext(Dispatchers.IO){
    deferred.cancel()
}
try {
    showUser(deferred.await())
} catch (e: Exception) {
    showError(e)
}

复制代码

运行结果以下:

12:59:54:185 [DefaultDispatcher-worker-1] 1
12:59:54:587 [DefaultDispatcher-worker-1] 2
kotlinx.coroutines.JobCancellationException: Job was cancelled; job=CompletableDeferredImpl{Cancelled}@36699211
复制代码

这种状况下,其实网络请求确实是被取消的,这一点咱们能够看下源码的处理:

...
override fun adapt(call: Call<T>): Deferred<T> {
      val deferred = CompletableDeferred<T>()

      deferred.invokeOnCompletion { // ①
        if (deferred.isCancelled) {
          call.cancel()
        }
      }

      call.enqueue(object : Callback<T> {
        ...
      }     
}
...

复制代码

注意 ① 处,invokeOnCompletion 在协程进入完成状态时触发,包括异常和正常完成,那么在这时候若是发现它的状态是已经取消的,那么结果就直接调用 Call 的取消便可。

这看上去确实很正常啊~ 不过 @阿永 在公众号的评论里面提到了一个 Case,仔细一看还真是有问题。咱们给出示例来复现这个 Case:

val job = GlobalScope.launch {
    log("1")
    val deferred = gitHubServiceApi.getUserCoroutine("bennyhuo")
    log("2")
    deferred.invokeOnCompletion {
        log("invokeOnCompletion, $it, ${deferred.isCancelled}")
    }
    try {
        showUser(deferred.await())
    } catch (e: Exception) {
        showError(e)
    }
    log(3)
}
delay(10)
job.cancelAndJoin()

复制代码

咱们启动一个协程,在其中执行网络请求,那么正常来讲,这时候 getUserCoroutine 返回的 Deferred 能够当作一个子协程,它应当遵循默认的做用域规则,在父做用域取消时被取消掉,但现实却并非这样:

13:06:54:332 [DefaultDispatcher-worker-1] 1
13:06:54:829 [DefaultDispatcher-worker-1] 2
kotlinx.coroutines.JobCancellationException: Job was cancelled; job=StandaloneCoroutine{Cancelling}@19aea38c
13:06:54:846 [DefaultDispatcher-worker-1] 3
13:06:56:937 [OkHttp https://api.github.com/...] invokeOnCompletion, null, false
复制代码

咱们看到在调用 deferred.await() 的时候抛了个取消异常,这主要是由于 await() 所在的协程已经被咱们用 cancelAndJoin() 取消,但从随后 invokeOnCompletion 的回调结果来看, getUserCoroutine 返回的 Deferred 并无被取消,再仔细一看,时间上这个回调比前面的操做晚了 2s,那必然是网络请求返回以后才回调的。

因此问题究竟在哪里?在 CoroutineCallAdapterFactory 的实现中,为了实现异步转换,手动建立了一个 CompletableDeferred

override fun adapt(call: Call<T>): Deferred<T> {
  val deferred = CompletableDeferred<T>() // ①
  ...
}

复制代码

这个 CompletableDeferred 自己就是一个 Job 的实现,它的构造可接受一个 Job 实例做为它的父协程,那么问题来了,这里并无告诉它父协程到底是谁,所以也就谈不上做用域的事儿了,这好像咱们用 GlobalScope.launch 启动了一个协程同样。若是你们在 Android 当中使用 MainScope,那么一样由于前面说到的这个缘由,致使 CompletableDeferred 没有办法被取消。

@阿永 在公众号评论中提到这个问题,并提到了一个比较好的解决方案,下面咱们为你们详细介绍。感谢 @阿永。

说到这里咱们再简单回顾下,做用域主要有 GlobalScopecoroutineScopesupervisorScope,对于取消,除了 supervisorScope 比较特别是单向取消,即父协程取消后子协程都取消,Android 中 MainScope 就是一个调度到 UI 线程的 supervisorScopecoroutineScope 的逻辑则是父子相互取消的逻辑;而 GlobalScope 会启动一个全新的做用域,与它外部隔离,内部遵循默认的协程做用域规则。

那么有没有办法解决这个问题呢?

直接解决仍是比较困难的,由于 CompletableDeferred 构造所处的调用环境不是 suspend 函数,于是也没有办法拿到(极可能根本就没有!)父协程。

4.2 如何正确的将回调转换为协程

前面咱们提到既然 adapt 方法不是 suspend 方法,那么咱们是否是应该在其余位置建立协程呢?

其实咱们前面在讲 getUserCoroutine 的时候就不断为你们展现了如何将一个回调转换为协程调用的方法:

suspend fun getUserCoroutine() = suspendCancellableCoroutine<User> { continuation ->
    ...
}
复制代码

suspendCancellableCoroutine 跟最初咱们提到的 suspendCoroutine 同样,都是要获取当前协程的 Continuation 实例,这实际上就至关于要继承当前协程的上下文,所以咱们只须要在真正须要切换协程的时候再去作这个转换便可:

public suspend fun <T : Any> Call<T>.await(): T {
    return suspendCancellableCoroutine { continuation ->
        enqueue(object : Callback<T> {
            override fun onResponse(call: Call<T>?, response: Response<T?>) {
                continuation.resumeWith(runCatching { // ①
                    if (response.isSuccessful) {
                        response.body()
                            ?: throw NullPointerException("Response body is null: $response")
                    } else {
                        throw HttpException(response)
                    }
                })
            }

            override fun onFailure(call: Call<T>, t: Throwable) {
                if (continuation.isCancelled) return // ②
                continuation.resumeWithException(t)
            }
        })

        continuation.invokeOnCancellation {
            try {
                cancel()
            } catch (ex: Throwable) {  // ③
                //Ignore cancel exception 
            }
        }
    }
}

复制代码

你们看着这段代码会不会很眼熟?这与咱们 getUserCoroutine 的写法几乎一模一样,不过有几处细节值得关注,我用数字标注了他们的位置:

  • ① 处 runCatching 能够将一段代码的运行结果或者抛出的异常封装到一个 Result 类型当中,Kotlin 1.3 开始新增了 Continuation.resumeWith(Result) 这个方法, 这个点比起咱们前面的写法更具 Kotlin 风格。
  • ② 处在异常抛出时,判断了是否已经被取消。实际上若是网络请求被取消,这个回调确实会被调到,那么因为取消的操做是协程的由 Continuation 的取消发起的,所以这时候不必再调用 continuation.resumeWithException(t) 来将异常再抛回来了。尽管咱们前面其实也提到过,这时候继续调用 continuation.resumeWithException(t) 也没有任何逻辑上的反作用,但性能上多少仍是会有一些开销。
  • ③ 处,尽管 Call.cancel 的调用比较安全,但网络环境和状态不免状况复杂,所以对异常进行捕获会让这段代码更加健壮。若是 cancel 抛异常而没有捕获的话,那么等同于协程体内部抛出异常,具体如何传播看所在做用域的相关定义了。

须要指出的是,这段代码片断源自 gildor/kotlin-coroutines-retrofit ,你们也能够直接添加依赖进行使用:

compile 'ru.gildor.coroutines:kotlin-coroutines-retrofit:1.1.0'
复制代码

这个框架代码量不多,但通过各路 Kotlin 协程专家的锤炼,逻辑手法很细腻,值得你们学习。

5. 小结

这篇文章咱们从线程中断的概念切入,类比学习协程的取消,实际上你们就会发现这两者从逻辑上和场景上有多么的类似。接着咱们将以前咱们一直提到的回调转协程的例子进一步升级,支持取消,这样你们就能够轻易的将回调转变为协程的挂起调用了。最后咱们还分析了一下 Retrofit 的协程扩展的一些问题和解决方法,这个例子也进一步能够引起咱们对协程做用域以及如何将现有程序协程化的思考。

再稍微提一句,协程不是一个简单的东西,毕竟它的原理涉及到对操做系统调度、程序运行机制这样程序界毕竟原始的话题,但你说若是我对前面提到的这些都不是很熟悉或者根本没有接触过,是否是就要跟协程拜拜了呢,其实也不是,只不过若是你对这些都不熟悉,那么可能须要多加练习培养出感受,而没必要一开始就关注原理和细节,依样画葫芦同样能够用的很好,就像你们不知道 RxJava 原理同样能够用的很好同样,协程也能够作到这一点的。

固然,做为一个有追求的程序员,咱们不止要会用,还要用得好,不管如何咱们都须要知道前因后果,这其中涉及到的基础知识的欠缺也是须要尽快补充的,不能偷懒哈 :)


欢迎关注 Kotlin 中文社区!

中文官网:www.kotlincn.net/

中文官方博客:www.kotliner.cn/

公众号:Kotlin

知乎专栏:Kotlin

CSDN:Kotlin中文社区

掘金:Kotlin中文社区

简书:Kotlin中文社区

相关文章
相关标签/搜索