相信你们或多或少的都了解过,协程是什么,官网上这么说:android
Essentially, coroutines are light-weight threads.
协程是轻量级的线程,为何是轻量的?能够先告诉你们结论,由于它基于线程池API,因此在处理并发任务这件事上它真的游刃有余。
有可能有的同窗问了,既然它基于线程池,那我直接使用线程池或者使用 Android 中其余的异步任务解决方式,好比 Handler
、RxJava
等,不更好吗?
协程可使用阻塞的方式写出非阻塞式的代码,解决并发中常见的回调地狱,这是其最大的优势,后面介绍。程序员
<pre data-tool="mdnice编辑器" class="custom" >\`GlobalScope.launch(Dispatchers.Main) { val res = getResult(2) mNumTv.text = res.toString() }
启动协程的代码就是如此的简单。上面的代码中能够分为三部分,分别是 GlobalScope
、Dispatcher
和 launch
,他们分别对应着协程的做用域、调度器和协程构建器,咱们挨个儿介绍。数据库
协程的做用域有三种,他们分别是:编程
runBlocking
:顶层函数,它和 coroutineScope
不同,它会阻塞当前线程来等待,因此这个方法在业务中并不适用 。GlobalScope
:全局协程做用域,能够在整个应用的声明周期中操做,且不能取消,因此仍不适用于业务开发。显然,咱们不能在 Activity
中调用 GlobalScope
,这样可能会形成内存泄漏,看一下如何自定义做用域,具体的步骤我在注释中已给出:数组
<pre data-tool="mdnice编辑器" class="custom" >\`class MainActivity : AppCompatActivity() { // 1\\. 建立一个 MainScope val scope = MainScope() override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity\_main) // 2\\. 启动协程 scope.launch(Dispatchers.Unconfined) { val one = getResult(20) val two = getResult(40) mNumTv.text = (one + two).toString() } } // 3\\. 销毁的时候释放 override fun onDestroy() { super.onDestroy() scope.cancel() } private suspend fun getResult(num: Int): Int { delay(5000) return num \* num } } </pre>
调度器的做用是将协程限制在特定的线程执行。主要的调度器类型有:网络
Dispatchers.Main
:指定执行的线程是主线程,如上面的代码。Dispatchers.IO
:指定执行的线程是 IO 线程。Dispatchers.Default
:默认的调度器,适合执行 CPU 密集性的任务。Dispatchers.Unconfined
:非限制的调度器,指定的线程可能会随着挂起的函数的发生变化。什么是挂起?咱们就以九心吃饭为例,若是到公司对面的广场吃饭,九心得通过:并发
若是九心点广场的外卖呢?异步
从九心吃饭的例子能够看出,若是点了外卖,九心花费的时间较少了,能够空闲出更多的时间作本身的事。再仔细分析一下,其实从公司到广场和等待取餐这个过程并无省去,只是九心把这个过程交给了外卖员。
协程的原理跟九心点外卖的原理是一致的,耗时阻塞的操做并无减小,只是交给了其余线程async
launch
的做用从它的名称就能够看的出来,启动一个新的协程,它返回的是一个 Job
对象,咱们能够调用 Job#cancel()
取消这个协程。
除了 launch
,还有一个方法跟它很像,就是 async
,它的做用是建立一个协程,以后返回一个 Deferred<T>
对象,咱们能够调用 Deferred#await()
去获取返回的值,有点相似于 Java 中的 Future
,稍微改一下上面的代码:编辑器
<pre data-tool="mdnice编辑器" class="custom" >\`class MainActivity : AppCompatActivity() { // 1\\. 建立一个 MainScope val scope = MainScope() override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity\_main) // 2\\. 启动协程 scope.launch(Dispatchers.Unconfined) { val one = async { getResult(20) } val two = async { getResult(40) } mNumTv.text = (one.await() + two.await()).toString() } } // 3\\. 销毁的时候释放 override fun onDestroy() { super.onDestroy() scope.cancel() } private suspend fun getResult(num: Int): Int { delay(5000) return num \* num } }
与修改前的代码相比,async
可以并发执行任务,执行任务的时间也所以缩短了一半。
除了上述的并发执行任务,async
还能够对它的 start
入参设置成懒加载
<pre data-tool="mdnice编辑器" class="custom" >\`val one = async(start = CoroutineStart.LAZY) { getResult(20) }
这样系统就能够在调用它的时候再为它分配资源了。
suspend
是修饰函数的关键字,意思是当前的函数是能够挂起的,可是它仅仅起着提醒的做用,好比,当咱们的函数中没有须要挂起的操做的时候,编译器回给咱们提醒 Redudant suspend modifier,意思是当前的 suspend
是没有必要的,能够把它删除。
那咱们何时须要使用挂起函数呢?常见的场景有:
withContext
切换到指定的 IO 线程去进行网络或者数据库请求。delay方法
去等待某个事件。withContext
的代码:
`<pre data-tool="mdnice编辑器" class="custom" >\private suspend fun getResult(num: Int): Int { return withContext(Dispatchers.IO) { num \* num } } delay 的代码: <pre data-tool="mdnice编辑器" class="custom" >\private suspend fun getResult(num: Int): Int { delay(5000) return num \* num } `
在介绍自定义协程做用域的时候,咱们须要主动在 Activity
或者 Fragment
中的 onDestroy
方法中调用 job.cancel()
,忘记处理多是程序员常常会犯的错误,如何避免呢?
Google 老是可以解决程序员的痛点,在 Android Jetpack 中的 lifecycle
、LiveData
和 ViewModel
已经集成了快速使用协程的方法,若是咱们已经引入了 Android Jetpack,能够引入依赖:
<pre data-tool="mdnice编辑器" class="custom" > \`dependencies { def lifecycle\_version = "2.2.0" // ViewModel implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:$lifecycle\_version" // LiveData implementation "androidx.lifecycle:lifecycle-livedata-ktx:$lifecycle\_version" // Lifecycles only (without ViewModel or LiveData) implementation "androidx.lifecycle:lifecycle-runtime-ktx:$lifecycle\_version" }
使用能够结合具体的场景,好比结合 Lifecycle
,须要使用 lifecycleScope
协程做用域:
<pre data-tool="mdnice编辑器" class="custom" >\`lifecycleScope.launch { // 表明当前生命周期处于 Resumed 的时候才会执行(选择性使用) whenResumed { // ... 具体的协程代码 } }
即便你不使用 Android Jetpack 组件,因为 Lifecycles
在很早以前就内置在 Android 系统的代码中,因此你仍然能够仅仅引入 Lifecycle
的协程扩展库,由于它会帮助你很好的处理 Activity
或者 Fragment
的生命周期。
引入 Android Jetpack 协程扩展库官方文档:点我打开
长期以来,在 Android 中响应式编程的首选方案是 RxJava,咱们今天就来了解一下 Kotlin中的响应式编程 Flow。若是你能熟练使用 RxJava,那你确定能快速上手 Flow。
LiveData 更加简单和纯粹,它创建单一的生产消费模型,Flow 才是相似于 RxJava 的存在。
先上一段代码:
<pre data-tool="mdnice编辑器" class="custom" >\`lifecycleScope.launch { // 建立一个协程 Flow<T> createFlow() .collect {num-> // 具体的消费处理 // ... } } }
我在 createFlow
这个方法中,返回了 Flow<Int>
的对象,因此咱们能够这样对比。
咱们暂不考虑 RxJava
中的背压和非背压,直接先将 Flow
对标 RxJava 中的 Observable
。
和 RxJava 同样,在建立 Flow
对象的时候咱们也须要调用 emit
方法发射数据:
<pre data-tool="mdnice编辑器" class="custom" >\`fun createFlow(): Flow<Int> = flow { for (i in 1..10) emit(i) }
一直调用 emit
可能不便捷,由于 RxJava 提供了 Observable.just()
这类的操做符,显然,Flow 也为咱们提供了快速建立操做:
flowof(vararg elements: T)
:帮助可变数组生成 Flow
实例.asFlow()
:面向数组、列表等集合好比可使用 (1..10).asFlow()
代替上述的 Flow
对象的建立。
collect
方法和 RxJava 中的 subscribe
方法同样,都是用来消费数据的。
除了简单的用法外,这里有两个问题得注意一下:
collect
函数是一个 suspend
方法,因此它必须发生在协程或者带有 suspend
的方法里面,这也是我为何在一开始的时候启动了 lifecycleScope.launch
。lifecycleScope
是我使用的 Lifecycle
的协程扩展库当中的,你能够替换成自定义的协程做用域。咱们学习 RxJava 的时候,大佬们都会说,RxJava 牛逼,牛逼在哪儿呢?
切换线程,一样的,Flow 的协程切换也很牛逼。Flow 是这么切换协程的:
pre data-tool="mdnice编辑器" class="custom" >\`lifecycleScope.launch { // 建立一个协程 Flow<T> createFlow() // 将数据发射的操做放到 IO 线程中的协程 .flowOn(Dispatchers.IO) .collect { num -> // 具体的消费处理 // ... } } }
和 RxJava 对比:
flowOn
使用的参数是协程对应的调度器,它实质改变的是协程对应的线程。
我在上面的表格中并无写到在 Flow 中如何改变消费线程,并不意味着 Flow 不能够指定消费线程?
Flow 的消费线程在咱们启动协程指定调度器的时候就确认好了,对应着启动协程的调度器。好比在上面的代码中 lifecycleScope
启动的调度器是 Dispatchers.Main
,那么 collect
方法就消费在主线程。
Flow 中的 catch
对应着 RxJava 中的 onError
,catch
操做:
<pre data-tool="mdnice编辑器" class="custom" >\`lifecycleScope.launch { flow { //... }.catch {e-> }.collect( ) }
除此之外,你可使用声明式捕获 try { } catch (e: Throwable) { }
去捕获异常,不过 catch
本质上是一个扩展方法,它是对声明式捕获的封装。
Flow 中的 onCompletion
对应这 RxJava 中的 onComplete
回调,onCompletion
操做:
<pre data-tool="mdnice编辑器" class="custom" >\`lifecycleScope.launch { createFlow() .onCompletion { // 处理完成操做 } .collect { } }
除此之外,咱们还能够经过捕获式 try {} finally {}
去获取完成状况。
咱们在对 Flow 已经有了一些基础的认知了,再来聊一聊 Flow 的特色,Flow 具备如下特色:
若是你对 Kotlin 中的 Sequence
有一些认识,那么你应该能够轻松的 Get 到前两个点。
有点相似于懒加载,当咱们触发 collect
方法的时候,数据才开始发射。
<pre data-tool="mdnice编辑器" class="custom" >\`lifecycleScope.launch { val flow = (1..10).asFlow().flowOn(Dispatchers.Main) flow.collect { num -> // 具体的消费处理 // ... } } }
也就是说,在第2行的时候,虽然流建立好了,可是数据一直到第四行发生 collect
才开始发射。
看代码比较容易理解:
<pre data-tool="mdnice编辑器" class="custom" >\`lifecycleScope.launch { flow { for(i in 1..3) { Log.e("Flow","$i emit") emit(i) } }.filter { Log.e("Flow","$it filter") it % 2 != 0 }.map { Log.e("Flow","$it map") "${it \* it} money" }.collect { Log.e("Flow","i get $it") } }
获得的日志:
<pre data-tool="mdnice编辑器" class="custom" >\`E/Flow: 1 emit E/Flow: 1 filter E/Flow: 1 map E/Flow: i get 1 money E/Flow: 2 emit E/Flow: 2 filter E/Flow: 3 emit E/Flow: 3 filter E/Flow: 3 map E/Flow: i get 9 money
从日志中,咱们很容易得出这样的结论,每一个数据都是通过 emit
、filter
、map
和 collect
这一套完整的处理流程后,下个数据才会开始处理,而不是全部的数据都先统一 emit
,完了再统一 filter
,接着 map
,最后再 collect
。
Flow 采用和协程同样的协做取消,也就是说,Flow 的 collect
只能在可取消的挂起函数中挂起的时候取消,不然不能取消。
若是咱们想取消 Flow 得借助 withTimeoutOrNull
之类的顶层函数,不妨猜一下,下面的代码最终会打印出什么?
<pre data-tool="mdnice编辑器" class="custom" >\`lifecycleScope.launch { val f = flow { for (i in 1..3) { delay(500) Log.e(TAG, "emit $i") emit(i) } } withTimeoutOrNull(1600) { f.collect { delay(500) Log.e(TAG, "consume $it") } } Log.e(TAG, "cancel") }
限于篇幅,我仅介绍一下 Flow 中操做符的做用,就不一一介绍每一个操做符具体怎么使用了。
总会有一些特殊的状况,好比我只须要取前几个,我只要最新的数据等,不过在这些状况下,数据的发射就是并发执行的。
展平流有点相似于 RxJava 中的 flatmap
,将你发射出去的数据源转变为另外一种数据源。
顾名思义,就是帮你作 collect
处理,collect
是最基础的末端操做符。
其余还有一些操做符,我这里就不一一介绍了,感兴趣能够查看 API。
Channel
是一个面向多协程之间数据传输的 BlockQueue
。它的使用方式超级简单:
<pre data-tool="mdnice编辑器" class="custom" >\`lifecycleScope.launch { // 1\\. 生成一个 Channel val channel = Channel<Int>() // 2\\. Channel 发送数据 launch { for(i in 1..5){ delay(200) channel.send(i \* i) } channel.close() } // 3\\. Channel 接收数据 launch { for( y in channel) Log.e(TAG, "get $y") } }
实现协程之间的数据传输须要三步:
建立的 Channel
的方式能够分为两种:
produce
若是使用了扩展函数,代码就变成了:
<pre data-tool="mdnice编辑器" class="custom" >\`lifecycleScope.launch { // 1\\. 生成一个 Channel val channel = produce<Int> { for(i in 1..5){ delay(200) send(i \* i) } close() } // 2\\. 接收数据 // ... 省略 跟以前代码一致 }
直接将第一步和第二步合并了。
发送数据使用的 Channel#send()
方法,当咱们数据发送完毕的时候,可使用 Channel#close()
来代表通道已经结束数据的发送。
正常状况下,咱们仅须要调用 Channel#receive()
获取数据,可是该方法只能获取一次传递的数据,若是咱们仅需获取指定次数的数据,能够这么操做:
<pre data-tool="mdnice编辑器" class="custom" >\`repeat(4){ Log.e(TAG, "get ${channel.receive()}") }
但若是发送的数据不能够预估呢?这个时候咱们就须要迭代 Channel
了
<pre data-tool="mdnice编辑器" class="custom" >\`for( y in channel) Log.e(TAG, "get $y")
多协程处理并发数据的时候,原子性一样也得不到保证,协程中出了一种叫 Mutex
的锁,区别是它的 lock
操做是挂起的,非阻塞的,感兴趣的同窗能够自行查看。
https://shimo.im/docs/TG8PDh9...