协程,英文名是 Coroutine
, 本质上,协程是轻量级的线程, 它的调度切换是协做式的,能够主动挂起和恢复java
先来看看咱们最经常使用的retrofit2
,在使用协程和不实用协程的代码区别在哪里android
注意retrofit2
在2.6.0
才开始支持协程,因此必定要将retrofit2
升级到2.6.0
及以上数据库
先分别定义两个api,一个是结合rxjava2
的用法,一个结合协程的用法api
interface TestApi {
@GET("api/4/news/latest")
fun getLatestNews(): Flowable<LatestNews>
@GET("api/4/news/latest")
suspend fun getLatestNews2(): LatestNews
}
复制代码
可见retrofit2
支持用suspend
定义 getLatestNews2
api为一个挂起函数,便可在协程中使用这个apibash
再来看看怎么使用两个不一样的api网络
class CoroutineActivity : AppCompatActivity() {
...
// 这是一个咱们使用retrofit2 请求数据+切换线程最经常使用的方法
fun requestData1() {
testApi.getLatestNews()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : DisposableSubscriber<LatestNews>() {
override fun onComplete() {}
override fun onNext(t: LatestNews) {
tv_text.text = Gson().toJson(t)
}
override fun onError(t: Throwable?) {
tv_text.text = "error"
}
})
}
// 使用协程 请求+渲染数据
fun requestData2() {
GlobalScope.launch(Dispatchers.Main) {
try {
tv_text.text = Gson().toJson(testApi.getLatestNews2())
} catch (e: Exception) {
tv_text.text = "error"
}
}
}
}
复制代码
rxjava2是使用回调的方式渲染数据,这个你们都知道并发
而协程须要先使用GlobalScope.launch
启动一个协程(启动协程的方法不少,请自行查看官方文档),并使用Dispatchers.Main
指定协程调度器为主线程(即ui线程), 而后经过 try catch
分别处理正常和异常的状况(暂时使用GlobalScope
上下文启动协程,下面会介绍一种专门再android中启动协程的方法)异步
这样看来是否是使用协程能够简化不少代码,使代码看起来更加优雅jvm
咱们再来看看多个请求并发和串行的状况async
先多添加几个api,方便操做
interface TestApi {
@GET("api/3/news/latest")
fun getLatestNews(): Flowable<LatestNews>
@GET("api/3/news/{id}")
fun getNewsDetail(@Path("id") id: Long): Flowable<News>
@GET("api/4/news/latest")
suspend fun getLatestNews2(): LatestNews
@GET("api/3/news/{id}")
suspend fun getNewsDetail2(@Path("id") id: Long): News
}
复制代码
好比咱们先调用getLatestNews()
方法请求一系列的新闻列表,而后在调用getNewsDetail
请求第一个新闻的详情,代码以下
// 非协程用法
testApi.getLatestNews()
.flatMap {
testApi.getNewsDetail(it.stories!![0].id!!)
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : DisposableSubscriber<News>() {
override fun onComplete() {}
override fun onNext(t: News) {
tv_text.text = t.title
}
override fun onError(t: Throwable?) {
tv_text.text = "error"
}
})
// 协程用法
GlobalScope.launch(Dispatchers.Main) {
try {
val lastedNews = testApi.getLatestNews2()
val detail = testApi.getNewsDetail2(lastedNews.stories!![0].id!!)
tv_text.text = detail.title
} catch(e: Exception) {
tv_text.text = "error"
}
}
复制代码
再好比若是咱们想调用getNewsDetail
同时请求多个新闻详情数据
// 非协程用法
testApi.getLatestNews()
.flatMap {
Flowable.zip(
testApi.getNewsDetail(it.stories!![0].id!!),
testApi.getNewsDetail(it.stories!![1].id!!),
BiFunction<News, News, List<News>> { news1, news2->
listOf(news1, news2)
}
)
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(object : DisposableSubscriber<List<News>>() {
override fun onComplete() {}
override fun onNext(t: List<News>) {
tv_text.text = t[0].title + t[1].title
}
override fun onError(t: Throwable?) {
tv_text.text = "error"
}
})
// 协程的用法
GlobalScope.launch(Dispatchers.Main) {
try {
// 先请求新闻列表
val lastedNews = testApi.getLatestNews2()
// 再使用async 并发请求第一个和第二个新闻的详情
val detail1 = async { testApi.getNewsDetail2(lastedNews.stories!![0].id!!) }
val detail2 = async { testApi.getNewsDetail2(lastedNews.stories!![1].id!!) }
tv_text.text = detail1.await().title + detail2.await().title
} catch(e: Exception) {
tv_text.text = "error"
}
}
复制代码
可见相对于非协程的写法(代码中使用rxjava2),协程能让你的代码更加简洁、优雅,能更加清晰的描述你第一步想作什么、第二步想作什么等等
room
数据库在2.1.0
开始支持协程, 而且须要导入room-ktx
依赖
implementation "androidx.room:room-ktx:2.1.0"
复制代码
而后在Dao
中使用suspend
定义挂起函数
@Dao
abstract class UserDao {
@Query("select * from tab_user")
abstract suspend fun getAll(): List<User>
}
复制代码
最后就像上面retrofit2
那样使用协程便可
class RoomActivity : AppCompatActivity() {
private var adapter: RoomAdapter? = null
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_room)
...
}
...
private fun loadUser() {
GlobalScope.launch(Dispatchers.Main) {
adapter!!.data = AppDataBase.getInstance().userDao().getAll()
}
}
}
复制代码
这里指介绍room数据库的协程用法,对于room数据库的介绍和其余用法请查看Android Jetpack ROOM数据库用法介绍和android Jetpack ROOM数据库结合其它Library的使用介绍
上面的example
都是使用GlobalScope
上下文来启动协程, 其实真正在android中通常不建议直接使用GlobalScope
,由于使用GlobalScope.launch
时,咱们会建立一个顶层协程。虽然它很轻量,但它运行时仍会消耗一些内存资源,若是咱们忘记保持对新启动的协程的引用,它还会继续运行,因此咱们必须保持全部对GlobalScope.launch
启动协程的引用,而后在activity
destory
(或其它须要cancel
)的时候cancel
掉全部的协程,不然就会形成内存泄露等一系列问题
好比:
class CoroutineActivity : AppCompatActivity() {
private lateinit var testApi: TestApi
private var job1: Job? = null
private var job2: Job? = null
private var job3: Job? = null
...
override fun onDestroy() {
super.onDestroy()
job1?.cancel()
job2?.cancel()
job3?.cancel()
}
...
// 启动第一个顶级协程
fun requestData1() {
job1 = GlobalScope.launch(Dispatchers.Main) {
try {
val lastedNews = testApi.getLatestNews2()
tv_text.text = lastedNews.stories!![0].title
} catch(e: Exception) {
tv_text.text = "error"
}
}
}
// 启动第二个顶级协程
fun requestData2() {
job2 = GlobalScope.launch(Dispatchers.Main) {
try {
val lastedNews = testApi.getLatestNews2()
// 在协程内部启动第三个顶级协程
job3 = GlobalScope.launch(Dispatchers.Main) {
try {
val detail = testApi.getNewsDetail2(lastedNews.stories!![0].id!!)
tv_text.text = detail.title
} catch (e: Exception) {
tv_text.text = "error"
}
}
} catch(e: Exception) {
tv_text.text = "error"
}
}
}
}
复制代码
可见若是使用GlobalScope
启动的协程越多,就必须定义越多的变量持有对启动协程的引用,并在onDestroy
的时候cancel
掉全部协程
下面咱们就介绍MainScope
代替GlobalScope
的使用
class CoroutineActivity : AppCompatActivity() {
private var mainScope = MainScope()
private lateinit var testApi: TestApi
...
override fun onDestroy() {
super.onDestroy()
// 只须要调用mainScope.cancel,就会cancel掉全部使用mainScope启动的全部协程
mainScope.cancel()
}
fun requestData1() {
mainScope.launch {
try {
val lastedNews = testApi.getLatestNews2()
tv_text.text = lastedNews.stories!![0].title
} catch(e: Exception) {
tv_text.text = "error"
}
}
}
fun requestData2() {
mainScope.launch {
try {
val lastedNews = testApi.getLatestNews2()
val detail = testApi.getNewsDetail2(lastedNews.stories!![0].id!!)
tv_text.text = detail.title
} catch (e: Exception) {
tv_text.text = "error"
}
}
}
}
复制代码
又或者是使用kotlin委托模式实现以下:
class CoroutineActivity : AppCompatActivity(), CoroutineScope by MainScope() {
private lateinit var testApi: TestApi
...
override fun onDestroy() {
super.onDestroy()
cancel()
}
fun requestData1() {
launch {
try {
val lastedNews = testApi.getLatestNews2()
tv_text.text = lastedNews.stories!![0].title
} catch(e: Exception) {
tv_text.text = "error"
}
}
}
fun requestData2() {
launch {
try {
val lastedNews = testApi.getLatestNews2()
val detail = testApi.getNewsDetail2(lastedNews.stories!![0].id!!)
tv_text.text = detail.title
} catch (e: Exception) {
tv_text.text = "error"
}
}
}
}
复制代码
同时咱们先来看看MainScope
的定义
@Suppress("FunctionName")
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
复制代码
可见使用MainScope
很是简单,只须要在activity onDestroy
中调用MainScope
的cancel
方法便可,而不须要定义其它协程的引用, 而且MainScope
的调度器是Dispatchers.Main
, 因此也不须要手动指定Main调度器
发现Lifecycle
组件库在2.2.0
的alpha
版中已经有了对于协程的支持
须要添加lifecycle-runtime-ktx
依赖(正式版出来以后,请使用正式版)
implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0-alpha05"
复制代码
lifecycle-runtime-ktx
中 给LifecycleOwner
添加了 lifecycleScope
扩展属性(类于上面介绍的MainScope
),用于方便的操做协程;
先看看源码
val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope
get() = lifecycle.coroutineScope
val Lifecycle.coroutineScope: LifecycleCoroutineScope
get() {
while (true) {
val existing = mInternalScopeRef.get() as LifecycleCoroutineScopeImpl?
if (existing != null) {
return existing
}
val newScope = LifecycleCoroutineScopeImpl(
this,
// SupervisorJob 指定协程做用域是单向传递
// Dispatchers.Main.immediate 指定协程体 在主线程中执行
// Dispatchers.Main.immediate 跟 Dispatchers.Main惟一的区别是,若是当前在主线程,这立马执行协程体,而不是走Dispatcher分发流程
SupervisorJob() + Dispatchers.Main.immediate
)
if (mInternalScopeRef.compareAndSet(null, newScope)) {
newScope.register()
return newScope
}
}
}
复制代码
同时LifecycleCoroutineScope
还提供了绑定LifecycleOwner
生命周期(通常是指activity
和fragment
)的启动协程的方法;以下:
abstract class LifecycleCoroutineScope internal constructor() : CoroutineScope {
internal abstract val lifecycle: Lifecycle
// 当 activity 处于created的时候执行 协程体
fun launchWhenCreated(block: suspend CoroutineScope.() -> Unit): Job = launch {
lifecycle.whenCreated(block)
}
// 当 activity 处于start的时候执行 协程体
fun launchWhenStarted(block: suspend CoroutineScope.() -> Unit): Job = launch {
lifecycle.whenStarted(block)
}
// 当 activity 处于resume的时候执行 协程体
fun launchWhenResumed(block: suspend CoroutineScope.() -> Unit): Job = launch {
lifecycle.whenResumed(block)
}
}
复制代码
因为上面启动协程的方法绑定了activity
生命周期,因此在activity
destroy
的时候,也实现了自动cancel掉协程
因此咱们 CoroutineActivity
Demo的代码能够写的更加简单,以下:
class CoroutineActivity : AppCompatActivity() {
private lateinit var testApi: TestApi
...
fun requestData1() {
lifecycleScope.launchWhenResumed {
try {
val lastedNews = testApi.getLatestNews2()
tv_text.text = lastedNews.stories!![0].title
} catch(e: Exception) {
tv_text.text = "error"
}
}
}
}
复制代码
同时Google也对LiveData提供了对协程的支持,不过须要添加lifecycle-livedata-ktx
依赖
// 如今仍是`alpha`版,等正式版发布之后,请替换成正式版
implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0-alpha05"
复制代码
lifecycle-livedata-ktx
依赖添加了liveData
顶级函数,返回CoroutineLiveData
源码以下:
...
internal const val DEFAULT_TIMEOUT = 5000L
...
fun <T> liveData(
context: CoroutineContext = EmptyCoroutineContext,
timeoutInMs: Long = DEFAULT_TIMEOUT,
@BuilderInference block: suspend LiveDataScope<T>.() -> Unit
): LiveData<T> = CoroutineLiveData(context, timeoutInMs, block)
复制代码
CoroutineLiveData
是在何时启动协程并执行协程体的呢???
internal class CoroutineLiveData<T>(
context: CoroutineContext = EmptyCoroutineContext,
timeoutInMs: Long = DEFAULT_TIMEOUT,
block: Block<T>
) : MediatorLiveData<T>() {
private var blockRunner: BlockRunner<T>?
private var emittedSource: EmittedSource? = null
init {
val scope = CoroutineScope(Dispatchers.Main.immediate + context + supervisorJob)
blockRunner = BlockRunner(
liveData = this,
block = block,
timeoutInMs = timeoutInMs,
scope = scope
) {
blockRunner = null
}
}
...
// observer(观察者)个数有0到1时执行
// 即第一次调用observe或observeForever时执行
override fun onActive() {
super.onActive()
// 启动协程并执行协程体
blockRunner?.maybeRun()
}
// observer(观察者)个数有1到0时执行
// 即调用removeObserver时触发检查并执行回调
override fun onInactive() {
super.onInactive()
// 取消协程
blockRunner?.cancel()
}
}
复制代码
可见CoroutineLiveData
是在onActive()
启动协程,在onInactive()
取消协程
因此使用LiveData
对协程的支持, 那么CoroutineActivity
Demo的代码写法以下
class CoroutineActivity : AppCompatActivity() {
private lateinit var testApi: TestApi
...
fun requestData1() {
liveData {
try {
val lastedNews = testApi.getLatestNews2()
emit(lastedNews.stories!![0].title!!)
} catch(e: Exception) {
emit("error")
}
}.observe(this, Observer {
tv_text.text = it
})
}
}
复制代码
上面咱们讲了协程在android里最经常使用的用法,下面将介绍协程的一些基本知识
协程上下文用CoroutineContext
表示,kotlin
中 比较经常使用的Job
、协程调度器(CoroutineDispatcher)
、协程拦截器(ContinuationInterceptor)
等都是CoroutineContext
的子类,即它们都是协程上下文
先看一下CoroutineContext
比较重要的plus
方法,它是一个用operator
修复的重载(+)
号的操做符方法
@SinceKotlin("1.3")
public interface CoroutineContext {
/**
* Returns a context containing elements from this context and elements from other [context].
* The elements from this context with the same key as in the other one are dropped.
*/
public operator fun plus(context: CoroutineContext): CoroutineContext =
if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
context.fold(this) { acc, element ->
val removed = acc.minusKey(element.key)
if (removed === EmptyCoroutineContext) element else {
// make sure interceptor is always last in the context (and thus is fast to get when present)
val interceptor = removed[ContinuationInterceptor]
if (interceptor == null) CombinedContext(removed, element) else {
val left = removed.minusKey(ContinuationInterceptor)
if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
CombinedContext(CombinedContext(left, element), interceptor)
}
}
}
}
复制代码
好比上面说的MainScope
定义就使用了+号操做符
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
复制代码
若是你看启动协程的源码就会发现,在kotlin
中 大量使用 + 号操做符,因此kotlin中大部分CoroutineContext
对象都是CombinedContext
对象
上面的example
使用的launch
方法启动协程有三个参数, 分别是协程上下文
、协程启动模式
、协程体
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext, // 协程上下文
start: CoroutineStart = CoroutineStart.DEFAULT, // 协程启动模式
block: suspend CoroutineScope.() -> Unit // 协程体
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
复制代码
DEFAULT
当即执行协程体
runBlocking {
val job = GlobalScope.launch(start = CoroutineStart.DEFAULT) {
println("1: " + Thread.currentThread().name)
}
// 不须要调用join方法
// job.join()
}
复制代码
打印结果
1: DefaultDispatcher-worker-1
复制代码
CoroutineStart.DEFAULT
启动模式不须要手动调用join
或start
等方法,而是在调用launch
方法的时候就会自动执行协程体的代码
LAZY
只有在须要的状况下才执行协程体
runBlocking {
val job = GlobalScope.launch(start = CoroutineStart.LAZY) {
println("1: " + Thread.currentThread().name)
}
// 必定调用join方法
job.join()
}
复制代码
打印结果
1: DefaultDispatcher-worker-1
复制代码
CoroutineStart.LAZY
启动模式必定要手动调用join
或start
等方法,否者协程体不会执行
ATOMIC
当即执行协程体,但在开始运行以前没法取消, 即开启协程会无视cancelling
状态
runBlocking {
val job = GlobalScope.launch(start = CoroutineStart.ATOMIC) {
println("1: " + Thread.currentThread().name)
delay(1000)
println("2: " + Thread.currentThread().name)
}
job.cancel()
delay(2000)
}
复制代码
打印结果
1: DefaultDispatcher-worker-1
复制代码
CoroutineStart. ATOMIC
启动模式的协程体 即便调了cancel
方法 也必定会执行,由于开启协程会无视cancelling
状态;上面的example只打印了一句话,是由于执行delay(1000)
的时候 发现协程处于关闭状态, 因此出现了JobCancellationException
异常,致使下面的代码没有执行,若是 delay(1000)
这句代码用 try catch
捕获一下异常,就会继续执行下面的代码
UNDISPATCHED
当即在当前线程执行协程体,直到第一个 suspend 调用 挂起以后的执行线程取决于上下文当中的调度器了
runBlocking {
println("0: " + Thread.currentThread().name)
// 注意这里没有用GlobalScope.launch
// 由于GlobalScope.launch启动的是一个顶层协程, 没法关联当前协程的上下文(coroutineContext), 致使结果有误差
launch(context = Dispatchers.Default, start = CoroutineStart.UNDISPATCHED) {
println("1: " + Thread.currentThread().name)
delay(1000)
println("2: " + Thread.currentThread().name)
}
delay(2000)
}
复制代码
打印结果
0: main
1: main
2: DefaultDispatcher-worker-1
复制代码
可见 0 和 1 的执行线程是同样的,当执行完delay(1000)
, 后面的代码执行线程取决于Dispatchers.Default
调度器指定的线程,因此 2 在另外一个线程中执行
协程调度器 其实也是 协程上下文
协程调度器是用来指定协程代码块在哪一个线程中执行,kotlin
提供了几个默认的协程调度器,分别是Default
、Main
、Unconfined
, 并针对jvm
, kotlin
提供了一个特有的IO
调度器
Dispatchers.Default
指定代码块在线程池中执行
GlobalScope.launch(Dispatchers.Default) {
println("1: " + Thread.currentThread().name)
launch (Dispatchers.Default) {
delay(1000) // 延迟1秒后,再继续执行下面的代码
println("2: " + Thread.currentThread().name)
}
println("3: " + Thread.currentThread().name)
}
复制代码
打印结果以下
1: DefaultDispatcher-worker-1
3: DefaultDispatcher-worker-1
2: DefaultDispatcher-worker-1
复制代码
Dispatchers.Main
指定代码块在main线程中执行(针对Android就是ui线程)
GlobalScope.launch(Dispatchers.Default) {
println("1: " + Thread.currentThread().name)
launch (Dispatchers.Main) {
delay(1000) // 延迟1秒后,再继续执行下面的代码
println("2: " + Thread.currentThread().name)
}
println("3: " + Thread.currentThread().name)
}
复制代码
打印结果以下:
1: DefaultDispatcher-worker-1
3: DefaultDispatcher-worker-1
2: main
复制代码
可见Dispatchers.Main就是指定协程代码块在main线程中执行
Dispatchers.Unconfined
没有指定协程代码快在哪一个特定线程中执行,即当前在哪一个线程,代码块中接下来的代码就在哪一个线程中执行(即一段协程代码块 因为启动了子协程 致使切换了线程, 那么接下来的代码块也是在这个线程中执行)
GlobalScope.launch(Dispatchers.Default) {
println("1: " + Thread.currentThread().name)
launch (Dispatchers.Unconfined) {
println("2: " + Thread.currentThread().name)
requestApi() // delay(1000) 原本想用delay,可是使用requestApi 可能更加清晰
println("3: " + Thread.currentThread().name)
}
println("4: " + Thread.currentThread().name)
}
// 定义一个挂起函数,在一个新的子线程中执行
private suspend fun requestApi() = suspendCancellableCoroutine<String> {
Thread {
println("5: requestApi: " + Thread.currentThread().name)
it.resume("success")
}.start()
}
复制代码
打印结果以下:
1: DefaultDispatcher-worker-1
2: DefaultDispatcher-worker-1
5: requestApi: Thread-3
4: DefaultDispatcher-worker-1
3: Thread-3
复制代码
可见2 和 3的代码 执行线程明显不同;当执行到requestApi这句代码的时候 会切换到子线程(即Thread-3
)中执行代码,而后接下来的协程代码块就会在Thread-3
中执行
Dispatchers.IO
它是基于 Default
调度器背后的线程池,并实现了独立的队列和限制,所以协程调度器从 Default
切换到 IO
并不会触发线程切换
GlobalScope.launch(Dispatchers.Default) {
println("1: " + Thread.currentThread().name)
launch (Dispatchers.IO) {
println("2: " + Thread.currentThread().name)
requestApi() // delay(1000)
println("3: " + Thread.currentThread().name)
}
println("4: " + Thread.currentThread().name)
}
复制代码
打印结果以下:
1: DefaultDispatcher-worker-1
4: DefaultDispatcher-worker-1
2: DefaultDispatcher-worker-1
5: requestApi: Thread-3
3: DefaultDispatcher-worker-1
复制代码
绑定到任意自定义线程的调度器(这种方式要谨慎使用)
可使用kotlin
自带newSingleThreadContext
方法或者使用ExecutorService
的扩展方法asCoroutineDispatcher
建立一个Dispatcher
// 第一种方法
val dispatcher = newSingleThreadContext("custom thread")
// 第二种方法
// val dispatcher = Executors.newSingleThreadExecutor{ r -> Thread(r, "custom thread") }.asCoroutineDispatcher()
GlobalScope.launch(dispatcher) {
println("1: " + Thread.currentThread().name)
delay(1000)
println("2: " + Thread.currentThread().name)
}
runBlocking {
delay(2000L)
// 必定要close,不然线程永远都不会结束,很危险
dispatcher.close()
}
复制代码
打印结果以下:
1: custom thread
2: custom thread
复制代码
可见咱们能够本身建立线程绑定到协程调度器上,可是这种方式不建议使用,由于一旦手动建立了线程 就须要手动close,不然线程就永远也不会终止,这样会很危险
协程做用域是一个很是重的东西
GlobeScope
GlobeScope
启动的协程会单独启动一个做用域,没法继承外面协程的做用域,其内部的子协程听从默认的做用域规则
coroutineScope
coroutineScope
启动的协程会继承父协程的做用域,其内部的取消操做是双向传播的,子协程未捕获的异常也会向上传递给父协程
supervisorScope
supervisorScope
启动的协程会继承父协程的做用域,他跟coroutineScope
不同的点是 它是单向传递的,即内部的取消操做和异常传递 只能由父协程向子协程传播,不能从子协程传向父协程
MainScope
就是使用的supervisorScope
做用域,因此只须要子协程 出错 或 cancel
并不会影响父协程,从而也不会影响兄弟协程
协程的异常传递跟协程做用域有关,要么跟coroutineScope
同样双向传递,要么跟supervisorScope
同样由父协程向子协程单向传递
针对supervisorScope
的单向传递
runBlocking {
println("1")
supervisorScope {
println("2")
// 启动一个子协程
launch {
1/0 // 故意让子协程出现异常
}
delay(100)
println("3")
}
println("4")
}
复制代码
打印结果以下:
1
2
Exception in thread "main @coroutine#2" java.lang.ArithmeticException: / by zero
3
4
复制代码
可见在supervisorScope
做用域中启动的子协程若是出现异常,并无致使父协程异常,而且父协程的代码还能继续往下执行
咱们再来验证一下再supervisorScope
做用域中父协程异常是否会传递给子协程
runBlocking {
println("1")
supervisorScope {
println("2")
// 启动一个子协程
launch {
try {
delay(1000)
println("3")
} catch (e: Exception) {
println("error")
}
}
delay(100)
1/0 //父协程报错
println("3")
}
}
复制代码
1
2
error
java.lang.ArithmeticException: / by zero
复制代码
可见在supervisorScope
做用域中 父协程确实会将异常传递给子协程
针对coroutineScope
的双向传递
runBlocking {
println("1")
try {
coroutineScope {
println("2")
// 启动一个子协程
launch {
1/0 // 故意让子协程出现异常
}
delay(100)
println("3")
}
} catch (e: Exception) {
println("error")
}
}
复制代码
打印结果以下:
1
2
error
复制代码
可见在coroutineScope
做用域中启动的子协程若是出现异常,则会传递给父协程
咱们再来验证一下再coroutineScope
做用域中父协程异常是否会传递给子协程
runBlocking {
println("1")
coroutineScope {
println("2")
// 启动一个
launch {
try {
delay(1000)
println("3")
} catch (e: Exception) {
println("error")
}
}
delay(100)
1/0
println("3")
}
}
复制代码
打印结果以下:
1
2
error
java.lang.ArithmeticException: / by zero
复制代码
可见在coroutineScope
做用域中 父协程确实会将异常传递给子协程
先看一段代码
GlobalScope.launch {
println("1")
// 启动一个子协程
val job = launch {
println("2")
try {// 捕获 协程cancel致使的异常,让代码继续往下执行
delay(1000)
} catch (e: Exception) {
println("error")
}
println("3")
if (isActive) { // 若是协程cancel了,则isActive为false
println("4")
}
delay(1000) // 没有捕获异常,则终止代码继续往下执行
println("5")
}
delay(100)
job.cancel()
}
复制代码
打印结果以下:
1
2
error
3
复制代码
当先启动协程,而后cancel,会出现以下几种状况:
也就是说 协程的取消(cancel) 致使协程体终止运行的方式是 抛出异常,若是协程体的代码不依赖协程的cancel状态(即没有报错),则协程的取消 对协程体的执行通常没什么影响
好比:
GlobalScope.launch {
val job = launch {
println("==start==")
var i = 0
while (i <= 10) {
Thread.sleep(100)
println(i++)
}
println("==end==")
}
delay(100)
job.cancel()
}
复制代码
打印结果以下:
==start==
0
1
2
3
4
5
6
7
8
9
10
==end==
复制代码
可见即便协程取消了,协程体仍是在继续运行
若是想结束协程体的运行该怎么办呢??
这个时候可使用CoroutineScope
的isActive字段判断协程的状态是否被取消了
GlobalScope.launch {
val job = launch {
println("==start==")
var i = 0
while (i <= 10 && isActive) {
Thread.sleep(100)
println(i++)
}
println("==end==")
}
delay(200)
job.cancel()
}
复制代码
打印结果
==start==
0
1
==end==
复制代码
可见若是协程取消了,可使用isActive
字段来判断是否须要执行协程体的某段代码
在执行协程体的时候,可使用withContext
方便的切换代码执行所运行线程;好比
GlobalScope.launch(Dispatchers.Default) {
// 在Dispatchers.Default的线程池中执行
println("1: " + Thread.currentThread().name)
withContext(Dispatchers.Main) { // 切换到主线程执行
println("2: " + Thread.currentThread().name)
}
// 在Dispatchers.Default的线程池中执行
println("3: " + Thread.currentThread().name)
val dispatcher = newSingleThreadContext("custom thread")
withContext(dispatcher) { // 切换到自定义线程中执行
println("4: " + Thread.currentThread().name)
}
dispatcher.close()
// 在Dispatchers.Default的线程池中执行
println("5: " + Thread.currentThread().name)
}
复制代码
打印结果
1: DefaultDispatcher-worker-1
2: main
3: DefaultDispatcher-worker-2
4: custom thread
5: DefaultDispatcher-worker-2
复制代码
可见咱们可使用withContext
方便的切换代码运行所在的线程
withContext
还能够配合NonCancellable
上下文确保代码块不能被取消
GlobalScope.launch(Dispatchers.Default) {
val job = launch {
println("1: " + Thread.currentThread().name)
try {
delay(1000)
} catch (e: Exception) {
withContext(NonCancellable) { // 配合NonCancellable上下文确保协程体不能被取消
println("error: " + e.message)
delay(100) // 若是没有用withContext(NonCancellable)包裹,则delay(100)会报错, 致使下面的代码不执行
println("2: " + Thread.currentThread().name)
}
}
}
delay(100)
job.cancel()
}
复制代码
打印结果
1: DefaultDispatcher-worker-1
error: Job was cancelled
2: DefaultDispatcher-worker-1
复制代码
什么是结构化并发呢?
其实很简单,即保证启动的协程在同一做用域中(我的理解)
当咱们使用GlobalScope.launch启动协程的时候会建立一个顶层协程,若是咱们每次都使用GlobalScope.launch启动协程, 那么就会建立不少个顶层协程,而且不会相互干扰,即即便一个协程出错或的取消了,另外一个协程仍是会继续运行,由于它们不是在同一个协程做用域中
GlobalScope.launch(Dispatchers.Default) {
val a1 = GlobalScope.async { 这里使用async启动协程,没有使用launch
delay(1000)
println("1: " + Thread.currentThread().name)
}
val a2 = GlobalScope.async {
delay(100)
1/0 // 故意报错
println("2: " + Thread.currentThread().name)
}
a1.await()
a2.await() // a2.cancel() 也可使用cancel
}
复制代码
打印结果以下
1: DefaultDispatcher-worker-1
Exception in thread "DefaultDispatcher-worker-1" java.lang.ArithmeticException: / by zero
复制代码
可见a2
报错或cancel,并不会影响a1
这到底会引发什么问题呢?
好比咱们在一个activity中一般会有多个并发网络请求 请求数据(即会启动多个协程),当其中一个网络请求出错时(即协程出错),咱们但愿关闭其它并行的网络请求,而不处理(即但愿关闭掉其它协程),可是结果并不是如此
再好比咱们在一个activity中一般会有许多个网络请求(即会启动许多个协程),若是咱们老是使用GlobalScope启动协程,那么必须保持每一个协程的引用,并在activity destroy时cancel掉全部协程,不然即便activity destroy,那么协程里的异步请求代码仍是会继续执行,这样很容易出错或内存泄漏
咱们该怎么方便的解决这样的问题呢?
其实咱们可使用结构化并发(即协程做用域)来解决这样的问题,即保证启动的多个协程在同一做用域中,若是cancel掉这个做用域上下文,那么在这个做用域下启动的全部子协程都会取消,同时还能够配合coroutineScope、supervisorScope协程做用域 处理异常传递的问题
因此上面的代码能够这样改
GlobalScope.launch(Dispatchers.Default) {
val a1 = async {
delay(1000)
println("1: " + Thread.currentThread().name)
}
val a2 = async {
delay(100)
1/0 // 故意报错
println("2: " + Thread.currentThread().name)
}
a1.await()
a2.await()
}
复制代码
即把启动 a1
、a2
协程的GlobalScope去掉,保证a1
、a2
在同一协程做用域中
咱们先来看一看retrofit兼容协程的实现源码
suspend fun <T : Any> Call<T>.await(): T {
// 使用suspendCancellableCoroutine定义挂起函数,参数是Continuation对象
return suspendCancellableCoroutine { continuation ->
continuation.invokeOnCancellation {
cancel()
}
enqueue(object : Callback<T> {
override fun onResponse(call: Call<T>, response: Response<T>) {
if (response.isSuccessful) {
val body = response.body()
if (body == null) {
val invocation = call.request().tag(Invocation::class.java)!!
val method = invocation.method()
val e = KotlinNullPointerException("Response from " +
method.declaringClass.name +
'.' +
method.name +
" was null but response body type was declared as non-null")
// 若是结果异常,则调用Continuation 的 resumeWithException回调
continuation.resumeWithException(e)
} else {
// 若是结果正常,则调用Continuation 的 resume回调
continuation.resume(body)
}
} else {
// 若是结果异常,则调用Continuation 的 resumeWithException回调
continuation.resumeWithException(HttpException(response))
}
}
override fun onFailure(call: Call<T>, t: Throwable) {
// 若是结果异常,则调用Continuation 的 resumeWithException回调
continuation.resumeWithException(t)
}
})
}
}
复制代码
Continuation
的源码和扩展函数以下
@SinceKotlin("1.3")
@InlineOnly
public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
/**
* Resumes the execution of the corresponding coroutine so that the [exception] is re-thrown right after the
* last suspension point.
*/
@SinceKotlin("1.3")
@InlineOnly
public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
resumeWith(Result.failure(exception))
@SinceKotlin("1.3")
public interface Continuation<in T> {
/**
* The context of the coroutine that corresponds to this continuation.
*/
public val context: CoroutineContext
/**
* Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
* return value of the last suspension point.
*/
public fun resumeWith(result: Result<T>)
}
复制代码
可见协程挂起函数内部是使用回调将结果返回出去的,当有结果正常返回的时候,Continuation 调用 resume 返回结果,不然调用 resumeWithException 来抛出异常,这与 Callback 的模式如出一辙
而咱们写协程代码之因此能够看起来是同步的,实际上是编译器帮你作了不少事情(即你能够当它是“语法糖”)
注意:使用AndroidStudio反编译kotlin协程代码的时候会致使ide严重卡顿,而且反编译出来的java代码有无数层的嵌套,不知道是没法反编译协程代码,仍是AndroidStudio的bug, 致使没法配合kotlin反编译的java代码来说解
上面已经对协程挂起函数原理作了一些解析,若是咱们使用了多个挂起函数 那它们是怎么配合运行的呢?
注意: 下面的代码是我copy的别人的代码
suspend fun main() {
log(1)
// returnSuspended()是一个suspend函数
log(returnSuspended())
log(2)
// delay也是一个suspend函数
delay(1000)
log(3)
// returnImmediately也是一个suspend函数
log(returnImmediately())
log(4)
}
复制代码
对应的java实现代码逻辑以下(注意,下面的代码逻辑上并不能作到十分严谨,仅供学习理解协程使用)
public class ContinuationImpl implements Continuation<Object> {
// label 状态 默认为 0
private int label = 0;
private final Continuation<Unit> completion;
public ContinuationImpl(Continuation<Unit> completion) {
this.completion = completion;
}
@Override
public CoroutineContext getContext() {
return EmptyCoroutineContext.INSTANCE;
}
@Override
public void resumeWith(@NotNull Object o) {
try {
Object result = o;
switch (label) {
case 0: {
LogKt.log(1);
// 在SuspendFunctionsKt.returnSuspended内部以回调的方式 调用this的resumeWith方法
result = SuspendFunctionsKt.returnSuspended( this);
// label 状态加 1
label++;
if (isSuspended(result)) return;
}
case 1: {
LogKt.log(result);
LogKt.log(2);
// 在DelayKt.delay内部以回调的方式 调用this的resumeWith方法
result = DelayKt.delay(1000, this);
// label 状态加 1
label++;
if (isSuspended(result)) return;
}
case 2: {
LogKt.log(3);
// 在SuspendFunctionsKt.returnImmediately内部以回调的方式 调用this的resumeWith方法
result = SuspendFunctionsKt.returnImmediately( this);
// label 状态加 1
label++;
if (isSuspended(result)) return;
}
case 3:{
LogKt.log(result);
LogKt.log(4);
}
}
completion.resumeWith(Unit.INSTANCE);
} catch (Exception e) {
completion.resumeWith(e);
}
}
private boolean isSuspended(Object result) {
return result == IntrinsicsKt.getCOROUTINE_SUSPENDED();
}
}
复制代码
可见多个挂起函数之间的配合使用是使用label
这个状态字段不断加1 而且 不断调用resumeWith
方法实现的
总结以下:
最后 很是感谢破解 Kotlin 协程的博客,这是学习Coroutine
很是好的文章,建议你们去看看