kotlin协程-Android实战

协程,英文名是 Coroutine, 本质上,协程是轻量级的线程, 它的调度切换是协做式的,能够主动挂起和恢复java

retrofit2对协程的支持

先来看看咱们最经常使用的retrofit2,在使用协程和不实用协程的代码区别在哪里android

注意retrofit22.6.0才开始支持协程,因此必定要将retrofit2升级到2.6.0及以上数据库

先分别定义两个api,一个是结合rxjava2的用法,一个结合协程的用法api

interface TestApi {
    @GET("api/4/news/latest")
    fun getLatestNews(): Flowable<LatestNews>
    
    @GET("api/4/news/latest")
    suspend fun getLatestNews2(): LatestNews
}

复制代码

可见retrofit2支持用suspend 定义 getLatestNews2api为一个挂起函数,便可在协程中使用这个apibash

再来看看怎么使用两个不一样的api网络

class CoroutineActivity : AppCompatActivity() {
	...
	// 这是一个咱们使用retrofit2 请求数据+切换线程最经常使用的方法
    fun requestData1() {
        testApi.getLatestNews()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(object : DisposableSubscriber<LatestNews>() {
                override fun onComplete() {}

                override fun onNext(t: LatestNews) {
                    tv_text.text = Gson().toJson(t)
                }

                override fun onError(t: Throwable?) {
                    tv_text.text = "error"
                }
            })
    }
    
    // 使用协程 请求+渲染数据
    fun requestData2() {
        GlobalScope.launch(Dispatchers.Main) {
            try {
                tv_text.text = Gson().toJson(testApi.getLatestNews2())
            } catch (e: Exception) {
                tv_text.text = "error"
            }
        }
    }
}
复制代码

rxjava2是使用回调的方式渲染数据,这个你们都知道并发

而协程须要先使用GlobalScope.launch启动一个协程(启动协程的方法不少,请自行查看官方文档),并使用Dispatchers.Main指定协程调度器为主线程(即ui线程), 而后经过 try catch分别处理正常和异常的状况(暂时使用GlobalScope上下文启动协程,下面会介绍一种专门再android中启动协程的方法)异步

这样看来是否是使用协程能够简化不少代码,使代码看起来更加优雅jvm

咱们再来看看多个请求并发和串行的状况async

先多添加几个api,方便操做

interface TestApi {
	@GET("api/3/news/latest")
    fun getLatestNews(): Flowable<LatestNews>

    @GET("api/3/news/{id}")
    fun getNewsDetail(@Path("id") id: Long): Flowable<News>


    @GET("api/4/news/latest")
    suspend fun getLatestNews2(): LatestNews

    @GET("api/3/news/{id}")
    suspend fun getNewsDetail2(@Path("id") id: Long): News
}

复制代码

好比咱们先调用getLatestNews()方法请求一系列的新闻列表,而后在调用getNewsDetail请求第一个新闻的详情,代码以下

// 非协程用法
testApi.getLatestNews()
    .flatMap {
        testApi.getNewsDetail(it.stories!![0].id!!)
    }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(object : DisposableSubscriber<News>() {
        override fun onComplete() {}

        override fun onNext(t: News) {
            tv_text.text = t.title
        }

        override fun onError(t: Throwable?) {
            tv_text.text = "error"
        }
    })

// 协程用法
GlobalScope.launch(Dispatchers.Main) {
    try {
        val lastedNews = testApi.getLatestNews2()
        val detail = testApi.getNewsDetail2(lastedNews.stories!![0].id!!)
        tv_text.text = detail.title
    } catch(e: Exception) {
        tv_text.text = "error"
    }
}
复制代码

再好比若是咱们想调用getNewsDetail同时请求多个新闻详情数据

// 非协程用法
testApi.getLatestNews()
    .flatMap {
        Flowable.zip(
            testApi.getNewsDetail(it.stories!![0].id!!), 
            testApi.getNewsDetail(it.stories!![1].id!!), 
            BiFunction<News, News, List<News>> { news1, news2->
                listOf(news1, news2) 
            }
        )
    }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(object : DisposableSubscriber<List<News>>() {
        override fun onComplete() {}

        override fun onNext(t: List<News>) {
            tv_text.text = t[0].title + t[1].title
        }

        override fun onError(t: Throwable?) {
            tv_text.text = "error"
        }
    })

// 协程的用法
GlobalScope.launch(Dispatchers.Main) {
    try {
    	// 先请求新闻列表
        val lastedNews = testApi.getLatestNews2()
        // 再使用async 并发请求第一个和第二个新闻的详情
        val detail1 = async { testApi.getNewsDetail2(lastedNews.stories!![0].id!!) }
        val detail2 = async { testApi.getNewsDetail2(lastedNews.stories!![1].id!!) }
        tv_text.text = detail1.await().title + detail2.await().title
    } catch(e: Exception) {
        tv_text.text = "error"
    }
}
复制代码

可见相对于非协程的写法(代码中使用rxjava2),协程能让你的代码更加简洁、优雅,能更加清晰的描述你第一步想作什么、第二步想作什么等等

room数据库对协程的支持

room数据库在2.1.0开始支持协程, 而且须要导入room-ktx依赖

implementation "androidx.room:room-ktx:2.1.0"
复制代码

而后在Dao中使用suspend定义挂起函数

@Dao
abstract class UserDao {
    @Query("select * from tab_user")
    abstract suspend fun getAll(): List<User>
}

复制代码

最后就像上面retrofit2那样使用协程便可

class RoomActivity : AppCompatActivity() {
    private var adapter: RoomAdapter? = null

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_room)
        ...
    }
    
    ...

    private fun loadUser() {
        GlobalScope.launch(Dispatchers.Main) {
            adapter!!.data = AppDataBase.getInstance().userDao().getAll()
        }
    }
    
}
        
复制代码

这里指介绍room数据库的协程用法,对于room数据库的介绍和其余用法请查看Android Jetpack ROOM数据库用法介绍android Jetpack ROOM数据库结合其它Library的使用介绍

协程在android里的应用

上面的example都是使用GlobalScope上下文来启动协程, 其实真正在android中通常不建议直接使用GlobalScope,由于使用GlobalScope.launch 时,咱们会建立一个顶层协程。虽然它很轻量,但它运行时仍会消耗一些内存资源,若是咱们忘记保持对新启动的协程的引用,它还会继续运行,因此咱们必须保持全部对GlobalScope.launch启动协程的引用,而后在activity destory(或其它须要cancel)的时候cancel掉全部的协程,不然就会形成内存泄露等一系列问题

好比:

class CoroutineActivity : AppCompatActivity() {
    private lateinit var testApi: TestApi
    private var job1: Job? = null
    private var job2: Job? = null
    private var job3: Job? = null
    
    ...

    override fun onDestroy() {
        super.onDestroy()
        job1?.cancel()
        job2?.cancel()
        job3?.cancel()
    }
    ...
    
    // 启动第一个顶级协程
    fun requestData1() {
        job1 = GlobalScope.launch(Dispatchers.Main) {
            try {
                val lastedNews = testApi.getLatestNews2()
                tv_text.text = lastedNews.stories!![0].title
            } catch(e: Exception) {
                tv_text.text = "error"
            }
        }
    }

	// 启动第二个顶级协程
    fun requestData2() {
        job2 = GlobalScope.launch(Dispatchers.Main) {
            try {
                val lastedNews = testApi.getLatestNews2()
                // 在协程内部启动第三个顶级协程
                job3 = GlobalScope.launch(Dispatchers.Main) {
                    try {
                        val detail = testApi.getNewsDetail2(lastedNews.stories!![0].id!!)
                        tv_text.text = detail.title
                    } catch (e: Exception) {
                        tv_text.text = "error"
                    }
                }
            } catch(e: Exception) {
                tv_text.text = "error"
            }
        }
    }
}
复制代码

可见若是使用GlobalScope启动的协程越多,就必须定义越多的变量持有对启动协程的引用,并在onDestroy的时候cancel掉全部协程

下面咱们就介绍MainScope代替GlobalScope的使用

class CoroutineActivity : AppCompatActivity() {
    private var mainScope = MainScope()
    private lateinit var testApi: TestApi

    ...

    override fun onDestroy() {
        super.onDestroy()
        // 只须要调用mainScope.cancel,就会cancel掉全部使用mainScope启动的全部协程
        mainScope.cancel()
    }

    fun requestData1() {
        mainScope.launch {
            try {
                val lastedNews = testApi.getLatestNews2()
                tv_text.text = lastedNews.stories!![0].title
            } catch(e: Exception) {
                tv_text.text = "error"
            }
        }
    }

    fun requestData2() {
        mainScope.launch {
            try {
                val lastedNews = testApi.getLatestNews2()
                val detail = testApi.getNewsDetail2(lastedNews.stories!![0].id!!)
                tv_text.text = detail.title
            } catch (e: Exception) {
                tv_text.text = "error"
            }
        }
    }
}

复制代码

又或者是使用kotlin委托模式实现以下:

class CoroutineActivity : AppCompatActivity(), CoroutineScope by MainScope() {
    private lateinit var testApi: TestApi

	...
	
    override fun onDestroy() {
        super.onDestroy()
        cancel()
    }

    fun requestData1() {
        launch {
            try {
                val lastedNews = testApi.getLatestNews2()
                tv_text.text = lastedNews.stories!![0].title
            } catch(e: Exception) {
                tv_text.text = "error"
            }
        }
    }

    fun requestData2() {
        launch {
            try {
                val lastedNews = testApi.getLatestNews2()
                val detail = testApi.getNewsDetail2(lastedNews.stories!![0].id!!)
                tv_text.text = detail.title
            } catch (e: Exception) {
                tv_text.text = "error"
            }
        }
    }
}
复制代码

同时咱们先来看看MainScope的定义

@Suppress("FunctionName")
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)

复制代码

可见使用MainScope很是简单,只须要在activity onDestroy中调用MainScopecancel方法便可,而不须要定义其它协程的引用, 而且MainScope的调度器是Dispatchers.Main, 因此也不须要手动指定Main调度器

Lifecycle对协程的支持

发现Lifecycle组件库在2.2.0alpha版中已经有了对于协程的支持

须要添加lifecycle-runtime-ktx依赖(正式版出来以后,请使用正式版)

implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0-alpha05"
复制代码

lifecycle-runtime-ktx 中 给LifecycleOwner添加了 lifecycleScope扩展属性(类于上面介绍的MainScope),用于方便的操做协程;

先看看源码

val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope
    get() = lifecycle.coroutineScope
    
    
val Lifecycle.coroutineScope: LifecycleCoroutineScope
    get() {
        while (true) {
            val existing = mInternalScopeRef.get() as LifecycleCoroutineScopeImpl?
            if (existing != null) {
                return existing
            }
            val newScope = LifecycleCoroutineScopeImpl(
                this,
                // SupervisorJob 指定协程做用域是单向传递
                // Dispatchers.Main.immediate 指定协程体 在主线程中执行
                // Dispatchers.Main.immediate 跟 Dispatchers.Main惟一的区别是,若是当前在主线程,这立马执行协程体,而不是走Dispatcher分发流程
                SupervisorJob() + Dispatchers.Main.immediate
            )
            if (mInternalScopeRef.compareAndSet(null, newScope)) {
                newScope.register()
                return newScope
            }
        }
    }    
复制代码

同时LifecycleCoroutineScope 还提供了绑定LifecycleOwner生命周期(通常是指activityfragment)的启动协程的方法;以下:

abstract class LifecycleCoroutineScope internal constructor() : CoroutineScope {
    internal abstract val lifecycle: Lifecycle

	// 当 activity 处于created的时候执行 协程体
    fun launchWhenCreated(block: suspend CoroutineScope.() -> Unit): Job = launch {
        lifecycle.whenCreated(block)
    }

	// 当 activity 处于start的时候执行 协程体
    fun launchWhenStarted(block: suspend CoroutineScope.() -> Unit): Job = launch {
        lifecycle.whenStarted(block)
    }

	// 当 activity 处于resume的时候执行 协程体
    fun launchWhenResumed(block: suspend CoroutineScope.() -> Unit): Job = launch {
        lifecycle.whenResumed(block)
    }
}
复制代码

因为上面启动协程的方法绑定了activity生命周期,因此在activity destroy的时候,也实现了自动cancel掉协程

因此咱们 CoroutineActivity Demo的代码能够写的更加简单,以下:

class CoroutineActivity : AppCompatActivity() {
    private lateinit var testApi: TestApi

	...

    fun requestData1() {
        lifecycleScope.launchWhenResumed {
            try {
                val lastedNews = testApi.getLatestNews2()
                tv_text.text = lastedNews.stories!![0].title
            } catch(e: Exception) {
                tv_text.text = "error"
            }
        }
    }
}

复制代码

LiveData对协程的支持

同时Google也对LiveData提供了对协程的支持,不过须要添加lifecycle-livedata-ktx依赖

// 如今仍是`alpha`版,等正式版发布之后,请替换成正式版
implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0-alpha05"

复制代码

lifecycle-livedata-ktx依赖添加了liveData顶级函数,返回CoroutineLiveData

源码以下:

...
internal const val DEFAULT_TIMEOUT = 5000L
...
fun <T> liveData(
    context: CoroutineContext = EmptyCoroutineContext,
    timeoutInMs: Long = DEFAULT_TIMEOUT,
    @BuilderInference block: suspend LiveDataScope<T>.() -> Unit
): LiveData<T> = CoroutineLiveData(context, timeoutInMs, block)

复制代码

CoroutineLiveData是在何时启动协程并执行协程体的呢???

internal class CoroutineLiveData<T>(
    context: CoroutineContext = EmptyCoroutineContext,
    timeoutInMs: Long = DEFAULT_TIMEOUT,
    block: Block<T>
) : MediatorLiveData<T>() {
    private var blockRunner: BlockRunner<T>?
    private var emittedSource: EmittedSource? = null

    init {
        val scope = CoroutineScope(Dispatchers.Main.immediate + context + supervisorJob)
        blockRunner = BlockRunner(
            liveData = this,
            block = block,
            timeoutInMs = timeoutInMs,
            scope = scope
        ) {
            blockRunner = null
        }
    }
	
	...
	
    // observer(观察者)个数有0到1时执行
    // 即第一次调用observe或observeForever时执行
    override fun onActive() {
        super.onActive()
        // 启动协程并执行协程体
        blockRunner?.maybeRun()
    }
	
    // observer(观察者)个数有1到0时执行
    // 即调用removeObserver时触发检查并执行回调
    override fun onInactive() {
        super.onInactive()
        // 取消协程
        blockRunner?.cancel()
    }
}
复制代码

可见CoroutineLiveData是在onActive()启动协程,在onInactive()取消协程

因此使用LiveData对协程的支持, 那么CoroutineActivity Demo的代码写法以下

class CoroutineActivity : AppCompatActivity() {
    private lateinit var testApi: TestApi
    
	...

    fun requestData1() {
        liveData {
            try {
                val lastedNews = testApi.getLatestNews2()
                emit(lastedNews.stories!![0].title!!)
            } catch(e: Exception) {
                emit("error")
            }
        }.observe(this, Observer {
            tv_text.text = it
        })
    }
}

复制代码

上面咱们讲了协程在android里最经常使用的用法,下面将介绍协程的一些基本知识

协程上下文

协程上下文用CoroutineContext表示,kotlin中 比较经常使用的Job协程调度器(CoroutineDispatcher)协程拦截器(ContinuationInterceptor)等都是CoroutineContext的子类,即它们都是协程上下文

先看一下CoroutineContext 比较重要的plus方法,它是一个用operator修复的重载(+)号的操做符方法

@SinceKotlin("1.3")
public interface CoroutineContext {

    /**
     * Returns a context containing elements from this context and elements from  other [context].
     * The elements from this context with the same key as in the other one are dropped.
     */
    public operator fun plus(context: CoroutineContext): CoroutineContext =
        if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
            context.fold(this) { acc, element ->
                val removed = acc.minusKey(element.key)
                if (removed === EmptyCoroutineContext) element else {
                    // make sure interceptor is always last in the context (and thus is fast to get when present)
                    val interceptor = removed[ContinuationInterceptor]
                    if (interceptor == null) CombinedContext(removed, element) else {
                        val left = removed.minusKey(ContinuationInterceptor)
                        if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                            CombinedContext(CombinedContext(left, element), interceptor)
                    }
                }
            }
}

复制代码

好比上面说的MainScope定义就使用了+号操做符

public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
复制代码

若是你看启动协程的源码就会发现,在kotlin中 大量使用 + 号操做符,因此kotlin中大部分CoroutineContext对象都是CombinedContext对象

上面的example使用的launch方法启动协程有三个参数, 分别是协程上下文协程启动模式协程体

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext, // 协程上下文
    start: CoroutineStart = CoroutineStart.DEFAULT, // 协程启动模式
    block: suspend CoroutineScope.() -> Unit // 协程体
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}
复制代码

协程启动模式

  • DEFAULT

    当即执行协程体

    runBlocking {
        val job = GlobalScope.launch(start = CoroutineStart.DEFAULT) {
            println("1: " + Thread.currentThread().name)
        }
        // 不须要调用join方法
        // job.join()
    }
    复制代码

    打印结果

    1: DefaultDispatcher-worker-1
    复制代码

    CoroutineStart.DEFAULT启动模式不须要手动调用joinstart等方法,而是在调用launch方法的时候就会自动执行协程体的代码

  • LAZY

    只有在须要的状况下才执行协程体

    runBlocking {
        val job = GlobalScope.launch(start = CoroutineStart.LAZY) {
            println("1: " + Thread.currentThread().name)
        }
        // 必定调用join方法
        job.join()
    }
    复制代码

    打印结果

    1: DefaultDispatcher-worker-1
    复制代码

    CoroutineStart.LAZY启动模式必定要手动调用joinstart等方法,否者协程体不会执行

  • ATOMIC

    当即执行协程体,但在开始运行以前没法取消, 即开启协程会无视cancelling状态

    runBlocking {
        val job = GlobalScope.launch(start = CoroutineStart.ATOMIC) {
            println("1: " + Thread.currentThread().name)
            delay(1000)
            println("2: " + Thread.currentThread().name)
        }
        job.cancel()
        delay(2000)
    }
    复制代码

    打印结果

    1: DefaultDispatcher-worker-1
    复制代码

    CoroutineStart. ATOMIC启动模式的协程体 即便调了cancel方法 也必定会执行,由于开启协程会无视cancelling状态;上面的example只打印了一句话,是由于执行delay(1000)的时候 发现协程处于关闭状态, 因此出现了JobCancellationException异常,致使下面的代码没有执行,若是 delay(1000) 这句代码用 try catch 捕获一下异常,就会继续执行下面的代码

  • UNDISPATCHED

    当即在当前线程执行协程体,直到第一个 suspend 调用 挂起以后的执行线程取决于上下文当中的调度器了

    runBlocking {
        println("0: " + Thread.currentThread().name)
        // 注意这里没有用GlobalScope.launch
        // 由于GlobalScope.launch启动的是一个顶层协程, 没法关联当前协程的上下文(coroutineContext), 致使结果有误差
        launch(context = Dispatchers.Default, start = CoroutineStart.UNDISPATCHED) {
            println("1: " + Thread.currentThread().name)
            delay(1000)
            println("2: " + Thread.currentThread().name)
        }
        delay(2000)
    }
    复制代码

    打印结果

    0: main
    1: main
    2: DefaultDispatcher-worker-1
    复制代码

    可见 0 和 1 的执行线程是同样的,当执行完delay(1000), 后面的代码执行线程取决于Dispatchers.Default调度器指定的线程,因此 2 在另外一个线程中执行

协程调度器

协程调度器 其实也是 协程上下文

协程调度器是用来指定协程代码块在哪一个线程中执行,kotlin提供了几个默认的协程调度器,分别是DefaultMainUnconfined, 并针对jvm, kotlin提供了一个特有的IO调度器

  • Dispatchers.Default

    指定代码块在线程池中执行

    GlobalScope.launch(Dispatchers.Default) {
        println("1: " + Thread.currentThread().name)
        launch (Dispatchers.Default) {
            delay(1000) // 延迟1秒后,再继续执行下面的代码
            println("2: " + Thread.currentThread().name)
        }
        println("3: " + Thread.currentThread().name)
    }
    复制代码

    打印结果以下

    1: DefaultDispatcher-worker-1
    3: DefaultDispatcher-worker-1
    2: DefaultDispatcher-worker-1
    复制代码
  • Dispatchers.Main

    指定代码块在main线程中执行(针对Android就是ui线程)

    GlobalScope.launch(Dispatchers.Default) {
        println("1: " + Thread.currentThread().name)
        launch (Dispatchers.Main) {
            delay(1000) // 延迟1秒后,再继续执行下面的代码
            println("2: " + Thread.currentThread().name)
        }
        println("3: " + Thread.currentThread().name)
    }
    复制代码

    打印结果以下:

    1: DefaultDispatcher-worker-1
    3: DefaultDispatcher-worker-1
    2: main
    复制代码

    可见Dispatchers.Main就是指定协程代码块在main线程中执行

  • Dispatchers.Unconfined

    没有指定协程代码快在哪一个特定线程中执行,即当前在哪一个线程,代码块中接下来的代码就在哪一个线程中执行(即一段协程代码块 因为启动了子协程 致使切换了线程, 那么接下来的代码块也是在这个线程中执行)

    GlobalScope.launch(Dispatchers.Default) {
        println("1: " + Thread.currentThread().name)
        launch (Dispatchers.Unconfined) {
            println("2: " + Thread.currentThread().name)
            requestApi()  // delay(1000) 原本想用delay,可是使用requestApi 可能更加清晰
            println("3: " + Thread.currentThread().name)
        }
        println("4: " + Thread.currentThread().name)
    }
    
    // 定义一个挂起函数,在一个新的子线程中执行
    private suspend fun requestApi() = suspendCancellableCoroutine<String> {
        Thread {
            println("5: requestApi: " + Thread.currentThread().name)
            it.resume("success")
        }.start()
    }
    复制代码

    打印结果以下:

    1: DefaultDispatcher-worker-1
    2: DefaultDispatcher-worker-1
    5: requestApi: Thread-3
    4: DefaultDispatcher-worker-1
    3: Thread-3
    复制代码

    可见2 和 3的代码 执行线程明显不同;当执行到requestApi这句代码的时候 会切换到子线程(即Thread-3)中执行代码,而后接下来的协程代码块就会在Thread-3中执行

  • Dispatchers.IO

    它是基于 Default 调度器背后的线程池,并实现了独立的队列和限制,所以协程调度器从 Default 切换到 IO 并不会触发线程切换

    GlobalScope.launch(Dispatchers.Default) {
        println("1: " + Thread.currentThread().name)
        launch (Dispatchers.IO) {
            println("2: " + Thread.currentThread().name)
            requestApi()  // delay(1000)
            println("3: " + Thread.currentThread().name)
        }
        println("4: " + Thread.currentThread().name)
    }
    复制代码

    打印结果以下:

    1: DefaultDispatcher-worker-1
    4: DefaultDispatcher-worker-1
    2: DefaultDispatcher-worker-1
    5: requestApi: Thread-3
    3: DefaultDispatcher-worker-1
    复制代码
  • 绑定到任意自定义线程的调度器(这种方式要谨慎使用)

    可使用kotlin自带newSingleThreadContext方法或者使用ExecutorService的扩展方法asCoroutineDispatcher建立一个Dispatcher

    // 第一种方法
    val dispatcher = newSingleThreadContext("custom thread")
    // 第二种方法
    // val dispatcher = Executors.newSingleThreadExecutor{ r -> Thread(r, "custom thread") }.asCoroutineDispatcher()
    GlobalScope.launch(dispatcher) {
        println("1: " + Thread.currentThread().name)
        delay(1000)
        println("2: " + Thread.currentThread().name)
    }
    
    runBlocking {     
        delay(2000L)  
        // 必定要close,不然线程永远都不会结束,很危险
        dispatcher.close()
    }
    复制代码

    打印结果以下:

    1: custom thread
    2: custom thread
    复制代码

    可见咱们能够本身建立线程绑定到协程调度器上,可是这种方式不建议使用,由于一旦手动建立了线程 就须要手动close,不然线程就永远也不会终止,这样会很危险

协程做用域GlobalScope、coroutineScope、supervisorScope

协程做用域是一个很是重的东西

  • GlobeScope

    GlobeScope 启动的协程会单独启动一个做用域,没法继承外面协程的做用域,其内部的子协程听从默认的做用域规则

  • coroutineScope

    coroutineScope 启动的协程会继承父协程的做用域,其内部的取消操做是双向传播的,子协程未捕获的异常也会向上传递给父协程

  • supervisorScope

    supervisorScope 启动的协程会继承父协程的做用域,他跟coroutineScope不同的点是 它是单向传递的,即内部的取消操做和异常传递 只能由父协程向子协程传播,不能从子协程传向父协程

    MainScope 就是使用的supervisorScope做用域,因此只须要子协程 出错 或 cancel 并不会影响父协程,从而也不会影响兄弟协程

协程异常传递模式

协程的异常传递跟协程做用域有关,要么跟coroutineScope同样双向传递,要么跟supervisorScope同样由父协程向子协程单向传递

针对supervisorScope的单向传递

runBlocking {
    println("1")
    supervisorScope {
        println("2")
        // 启动一个子协程
        launch {
            1/0 // 故意让子协程出现异常
        }
        delay(100)
        println("3")
    }
    println("4")
}
复制代码

打印结果以下:

1
2
Exception in thread "main @coroutine#2" java.lang.ArithmeticException: / by zero
3
4

复制代码

可见在supervisorScope做用域中启动的子协程若是出现异常,并无致使父协程异常,而且父协程的代码还能继续往下执行

咱们再来验证一下再supervisorScope做用域中父协程异常是否会传递给子协程

runBlocking {
    println("1")
    supervisorScope {
        println("2")
        // 启动一个子协程
        launch {
            try {
                delay(1000)
                println("3")
            } catch (e: Exception) {
                println("error")
            }
        }
        delay(100)
        1/0 //父协程报错
        println("3")
    }
}
复制代码
1
2
error

java.lang.ArithmeticException: / by zero
复制代码

可见在supervisorScope做用域中 父协程确实会将异常传递给子协程

针对coroutineScope的双向传递

runBlocking {
    println("1")
    try {
        coroutineScope {
            println("2")
            // 启动一个子协程
            launch {
                1/0 // 故意让子协程出现异常
            }
            delay(100)
            println("3")
        }
    } catch (e: Exception) {
        println("error")
    }
}
复制代码

打印结果以下:

1
2
error

复制代码

可见在coroutineScope做用域中启动的子协程若是出现异常,则会传递给父协程

咱们再来验证一下再coroutineScope做用域中父协程异常是否会传递给子协程

runBlocking {
    println("1")
    coroutineScope {
        println("2")
        // 启动一个
        launch {
            try {
                delay(1000)
                println("3")
            } catch (e: Exception) {
                println("error")
            }
        }
        delay(100)
        1/0
        println("3")
    }
}
复制代码

打印结果以下:

1
2
error

java.lang.ArithmeticException: / by zero
复制代码

可见在coroutineScope做用域中 父协程确实会将异常传递给子协程

协程取消

先看一段代码

GlobalScope.launch {
    println("1")
    // 启动一个子协程
    val job = launch {
        println("2")
        try {// 捕获 协程cancel致使的异常,让代码继续往下执行
            delay(1000)
        } catch (e: Exception) {
            println("error")
        }
        println("3")
        if (isActive) { // 若是协程cancel了,则isActive为false
            println("4")
        }
        delay(1000) // 没有捕获异常,则终止代码继续往下执行
        println("5")
    }
    delay(100)
    job.cancel()
}
复制代码

打印结果以下:

1
2
error
3
复制代码

当先启动协程,而后cancel,会出现以下几种状况:

  • 若是执行到协程体内的代码依赖协程的cancel状态(好比delay方法),则会抛出异常,若是捕获了异常,则会继续往下执行,若是没有捕获异常则终止往下继续执行协程体
  • 若是协程体内的代码不依赖协程的cancel状态(即println方法),则会继续往下执行

也就是说 协程的取消(cancel) 致使协程体终止运行的方式是 抛出异常,若是协程体的代码不依赖协程的cancel状态(即没有报错),则协程的取消 对协程体的执行通常没什么影响

好比:

GlobalScope.launch {
    val job = launch {
        println("==start==")
        var i = 0
        while (i <= 10) {
            Thread.sleep(100)
            println(i++)
        }
        println("==end==")
    }
    delay(100)
    job.cancel()
}
复制代码

打印结果以下:

==start==
0
1
2
3
4
5
6
7
8
9
10
==end==
复制代码

可见即便协程取消了,协程体仍是在继续运行

若是想结束协程体的运行该怎么办呢??

这个时候可使用CoroutineScope的isActive字段判断协程的状态是否被取消了

GlobalScope.launch {
    val job = launch {
        println("==start==")
        var i = 0
        while (i <= 10 && isActive) {
            Thread.sleep(100)
            println(i++)
        }
        println("==end==")
    }
    delay(200)
    job.cancel()
}
复制代码

打印结果

==start==
0
1
==end==
复制代码

可见若是协程取消了,可使用isActive字段来判断是否须要执行协程体的某段代码

withContext

在执行协程体的时候,可使用withContext方便的切换代码执行所运行线程;好比

GlobalScope.launch(Dispatchers.Default) {
	// 在Dispatchers.Default的线程池中执行
    println("1: " + Thread.currentThread().name)
    withContext(Dispatchers.Main) { // 切换到主线程执行
        println("2: " + Thread.currentThread().name)
    }
    // 在Dispatchers.Default的线程池中执行
    println("3: " + Thread.currentThread().name)
    val dispatcher = newSingleThreadContext("custom thread")
    withContext(dispatcher) { // 切换到自定义线程中执行
        println("4: " + Thread.currentThread().name)
    }
    dispatcher.close()
    // 在Dispatchers.Default的线程池中执行
    println("5: " + Thread.currentThread().name)
}
复制代码

打印结果

1: DefaultDispatcher-worker-1
2: main
3: DefaultDispatcher-worker-2
4: custom thread
5: DefaultDispatcher-worker-2
复制代码

可见咱们可使用withContext方便的切换代码运行所在的线程

withContext还能够配合NonCancellable上下文确保代码块不能被取消

GlobalScope.launch(Dispatchers.Default) {
    val job = launch {
        println("1: " + Thread.currentThread().name)
        try {
            delay(1000)
        } catch (e: Exception) {
            withContext(NonCancellable) { // 配合NonCancellable上下文确保协程体不能被取消
                println("error: " + e.message)
                delay(100) // 若是没有用withContext(NonCancellable)包裹,则delay(100)会报错, 致使下面的代码不执行
                println("2: " + Thread.currentThread().name)
            }
        }
    }
    delay(100)
    job.cancel()
}
复制代码

打印结果

1: DefaultDispatcher-worker-1
error: Job was cancelled
2: DefaultDispatcher-worker-1
复制代码

结构化并发

什么是结构化并发呢?

其实很简单,即保证启动的协程在同一做用域中(我的理解)

当咱们使用GlobalScope.launch启动协程的时候会建立一个顶层协程,若是咱们每次都使用GlobalScope.launch启动协程, 那么就会建立不少个顶层协程,而且不会相互干扰,即即便一个协程出错或的取消了,另外一个协程仍是会继续运行,由于它们不是在同一个协程做用域中

GlobalScope.launch(Dispatchers.Default) {
    val a1 = GlobalScope.async { 这里使用async启动协程,没有使用launch
        delay(1000)
        println("1: " + Thread.currentThread().name)
    }
    val a2 = GlobalScope.async {
        delay(100)
        1/0 // 故意报错
        println("2: " + Thread.currentThread().name)
    }
    a1.await()
    a2.await() // a2.cancel() 也可使用cancel
}
复制代码

打印结果以下

1: DefaultDispatcher-worker-1
Exception in thread "DefaultDispatcher-worker-1" java.lang.ArithmeticException: / by zero
复制代码

可见a2报错或cancel,并不会影响a1

这到底会引发什么问题呢?

好比咱们在一个activity中一般会有多个并发网络请求 请求数据(即会启动多个协程),当其中一个网络请求出错时(即协程出错),咱们但愿关闭其它并行的网络请求,而不处理(即但愿关闭掉其它协程),可是结果并不是如此

再好比咱们在一个activity中一般会有许多个网络请求(即会启动许多个协程),若是咱们老是使用GlobalScope启动协程,那么必须保持每一个协程的引用,并在activity destroy时cancel掉全部协程,不然即便activity destroy,那么协程里的异步请求代码仍是会继续执行,这样很容易出错或内存泄漏

咱们该怎么方便的解决这样的问题呢?

其实咱们可使用结构化并发(即协程做用域)来解决这样的问题,即保证启动的多个协程在同一做用域中,若是cancel掉这个做用域上下文,那么在这个做用域下启动的全部子协程都会取消,同时还能够配合coroutineScope、supervisorScope协程做用域 处理异常传递的问题

因此上面的代码能够这样改

GlobalScope.launch(Dispatchers.Default) {
    val a1 = async {
        delay(1000)
        println("1: " + Thread.currentThread().name)
    }
    val a2 = async {
        delay(100)
        1/0 // 故意报错
        println("2: " + Thread.currentThread().name)
    }
    a1.await()
    a2.await()
}
复制代码

即把启动 a1a2协程的GlobalScope去掉,保证a1a2在同一协程做用域中

协程挂起函数原理分析

咱们先来看一看retrofit兼容协程的实现源码

suspend fun <T : Any> Call<T>.await(): T {
	// 使用suspendCancellableCoroutine定义挂起函数,参数是Continuation对象
  return suspendCancellableCoroutine { continuation ->
    continuation.invokeOnCancellation {
      cancel()
    }
    enqueue(object : Callback<T> {
      override fun onResponse(call: Call<T>, response: Response<T>) {
        if (response.isSuccessful) {
          val body = response.body()
          if (body == null) {
            val invocation = call.request().tag(Invocation::class.java)!!
            val method = invocation.method()
            val e = KotlinNullPointerException("Response from " +
                method.declaringClass.name +
                '.' +
                method.name +
                " was null but response body type was declared as non-null")
            // 若是结果异常,则调用Continuation 的 resumeWithException回调
            continuation.resumeWithException(e)
          } else {
          	// 若是结果正常,则调用Continuation 的 resume回调
            continuation.resume(body)
          }
        } else {
          // 若是结果异常,则调用Continuation 的 resumeWithException回调
          continuation.resumeWithException(HttpException(response))
        }
      }

      override fun onFailure(call: Call<T>, t: Throwable) {
        // 若是结果异常,则调用Continuation 的 resumeWithException回调
        continuation.resumeWithException(t)
      }
    })
  }
}
复制代码

Continuation的源码和扩展函数以下

@SinceKotlin("1.3")
@InlineOnly
public inline fun <T> Continuation<T>.resume(value: T): Unit =
    resumeWith(Result.success(value))

/**
 * Resumes the execution of the corresponding coroutine so that the [exception] is re-thrown right after the
 * last suspension point.
 */
@SinceKotlin("1.3")
@InlineOnly
public inline fun <T> Continuation<T>.resumeWithException(exception: Throwable): Unit =
    resumeWith(Result.failure(exception))

@SinceKotlin("1.3")
public interface Continuation<in T> {
    /**
     * The context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext

    /**
     * Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}
复制代码

可见协程挂起函数内部是使用回调将结果返回出去的,当有结果正常返回的时候,Continuation 调用 resume 返回结果,不然调用 resumeWithException 来抛出异常,这与 Callback 的模式如出一辙

而咱们写协程代码之因此能够看起来是同步的,实际上是编译器帮你作了不少事情(即你能够当它是“语法糖”)

注意:使用AndroidStudio反编译kotlin协程代码的时候会致使ide严重卡顿,而且反编译出来的java代码有无数层的嵌套,不知道是没法反编译协程代码,仍是AndroidStudio的bug, 致使没法配合kotlin反编译的java代码来说解

协程的状态转移

上面已经对协程挂起函数原理作了一些解析,若是咱们使用了多个挂起函数 那它们是怎么配合运行的呢?

注意: 下面的代码是我copy的别人的代码

suspend fun main() {
    log(1)
    // returnSuspended()是一个suspend函数
    log(returnSuspended())
    log(2)
    // delay也是一个suspend函数
    delay(1000)
    log(3)
    // returnImmediately也是一个suspend函数
    log(returnImmediately())
    log(4)
}

复制代码

对应的java实现代码逻辑以下(注意,下面的代码逻辑上并不能作到十分严谨,仅供学习理解协程使用)

public class ContinuationImpl implements Continuation<Object> {
	 // label 状态 默认为 0
    private int label = 0;
    private final Continuation<Unit> completion;

    public ContinuationImpl(Continuation<Unit> completion) {
        this.completion = completion;
    }

    @Override
    public CoroutineContext getContext() {
        return EmptyCoroutineContext.INSTANCE;
    }

    @Override
    public void resumeWith(@NotNull Object o) {
        try {
            Object result = o;
            switch (label) {
                case 0: {
                    LogKt.log(1);
                    // 在SuspendFunctionsKt.returnSuspended内部以回调的方式 调用this的resumeWith方法
                    result = SuspendFunctionsKt.returnSuspended( this);
                    // label 状态加 1
                    label++;
                    if (isSuspended(result)) return;
                }
                case 1: {
                    LogKt.log(result);
                    LogKt.log(2);
                    // 在DelayKt.delay内部以回调的方式 调用this的resumeWith方法
                    result = DelayKt.delay(1000, this);
                    // label 状态加 1
                    label++;
                    if (isSuspended(result)) return;
                }
                case 2: {
                    LogKt.log(3);
                    // 在SuspendFunctionsKt.returnImmediately内部以回调的方式 调用this的resumeWith方法
                    result = SuspendFunctionsKt.returnImmediately( this);
                    // label 状态加 1
                    label++;
                    if (isSuspended(result)) return;
                }
                case 3:{
                    LogKt.log(result);
                    LogKt.log(4);
                }
            }
            completion.resumeWith(Unit.INSTANCE);
        } catch (Exception e) {
            completion.resumeWith(e);
        }
    }

    private boolean isSuspended(Object result) {
        return result == IntrinsicsKt.getCOROUTINE_SUSPENDED();
    }
}
复制代码

可见多个挂起函数之间的配合使用是使用label这个状态字段不断加1 而且 不断调用resumeWith方法实现的

总结以下:

  • 协程的挂起函数本质上就是一个回调,回调类型就是 Continuation
  • 协程体的执行就是一个状态机,每一次遇到挂起函数,都是一次状态转移,就像咱们前面例子中的 label 不断的自增来实现状态流转同样

最后 很是感谢破解 Kotlin 协程的博客,这是学习Coroutine很是好的文章,建议你们去看看

相关文章
相关标签/搜索