即学即用Kotlin - 协程

前言

上周在内部分享会上大佬同事分享了关于 Kotlin 协程的知识,以前有看过 Kotlin 协程的一些知识,觉得本身还挺了解协程的,结果...html

打脸
打脸

在这一次分享中,发现 FlowChannel 这一起知识是本身不怎么了解的,本文也将着重和你们聊一聊这一起的内容,协程部分将分为三篇,本文是第一篇:android

《即学即用Kotlin - 协程》
《抽丝剥茧Kotlin - 协程基础篇》
《抽丝剥茧Kotlin - 协程Flow篇》程序员

目录

目录
目录

1、基础

1. 概念

相信你们或多或少的都了解过,协程是什么,官网上这么说:web

Essentially, coroutines are light-weight threads.数据库

协程是轻量级的线程,为何是轻量的?能够先告诉你们结论,由于它基于线程池API,因此在处理并发任务这件事上它真的游刃有余。编程

有可能有的同窗问了,既然它基于线程池,那我直接使用线程池或者使用 Android 中其余的异步任务解决方式,好比 HandlerRxJava等,不更好吗?数组

协程能够使用阻塞的方式写出非阻塞式的代码,解决并发中常见的回调地狱,这是其最大的优势,后面介绍。微信

2. 使用

GlobalScope.launch(Dispatchers.Main) {
 val res = getResult(2)  mNumTv.text = res.toString() } 复制代码

启动协程的代码就是如此的简单。上面的代码中能够分为三部分,分别是 GlobalScopeDispatcherlaunch,他们分别对应着协程的做用域、调度器和协程构建器,咱们挨个儿介绍。网络

协程做用域

协程的做用域有三种,他们分别是:并发

  • runBlocking:顶层函数,它和 coroutineScope 不同,它会阻塞当前线程来等待,因此这个方法在业务中并不适用 。
  • GlobalScope:全局协程做用域,能够在整个应用的声明周期中操做,且不能取消,因此仍不适用于业务开发。
  • 自定义做用域:自定义协程的做用域,不会形成内存泄漏。

显然,咱们不能在 Activity 中调用 GlobalScope,这样可能会形成内存泄漏,看一下如何自定义做用域,具体的步骤我在注释中已给出:

class MainActivity : AppCompatActivity() {
 // 1. 建立一个 MainScope  val scope = MainScope()   override fun onCreate(savedInstanceState: Bundle?) {  super.onCreate(savedInstanceState)  setContentView(R.layout.activity_main)   // 2. 启动协程  scope.launch(Dispatchers.Unconfined) {  val one = getResult(20)  val two = getResult(40)  mNumTv.text = (one + two).toString()  }  }   // 3. 销毁的时候释放  override fun onDestroy() {  super.onDestroy()   scope.cancel()  }   private suspend fun getResult(num: Int): Int {  delay(5000)  return num * num  } } 复制代码

调度器

调度器的做用是将协程限制在特定的线程执行。主要的调度器类型有:

  • Dispatchers.Main:指定执行的线程是主线程,如上面的代码。
  • Dispatchers.IO:指定执行的线程是 IO 线程。
  • Dispatchers.Default:默认的调度器,适合执行 CPU 密集性的任务。
  • Dispatchers.Unconfined:非限制的调度器,指定的线程可能会随着 挂起的函数的发生变化。

什么是挂起?咱们就以九心吃饭为例,若是到公司对面的广场吃饭,九心得通过:

  • 走到广场 10min > 点餐 5min > 等待上餐 10min > 就餐 30min > 回来 10 min

若是九心点广场的外卖呢?

  • 九心:下单 5min > 等待(等待的时候能够工做) 30min > 就餐 30min
  • 外卖骑手:到店 > 取餐 > 送外卖

从九心吃饭的例子能够看出,若是点了外卖,九心花费的时间较少了,能够空闲出更多的时间作本身的事。再仔细分析一下,其实从公司到广场和等待取餐这个过程并无省去,只是九心把这个过程交给了外卖员。

协程的原理跟九心点外卖的原理是一致的,耗时阻塞的操做并无减小,只是交给了其余线程: 协程请求数据过程

launch

launch 的做用从它的名称就能够看的出来,启动一个新的协程,它返回的是一个 Job对象,咱们能够调用 Job#cancel() 取消这个协程。

除了 launch,还有一个方法跟它很像,就是 async,它的做用是建立一个协程,以后返回一个 Deferred<T>对象,咱们能够调用 Deferred#await()去获取返回的值,有点相似于 Java 中的 Future,稍微改一下上面的代码:

class MainActivity : AppCompatActivity() {
 // 1. 建立一个 MainScope  val scope = MainScope()   override fun onCreate(savedInstanceState: Bundle?) {  super.onCreate(savedInstanceState)  setContentView(R.layout.activity_main)   // 2. 启动协程  scope.launch(Dispatchers.Unconfined) {  val one = async { getResult(20) }  val two = async { getResult(40) }  mNumTv.text = (one.await() + two.await()).toString()  }  }   // 3. 销毁的时候释放  override fun onDestroy() {  super.onDestroy()   scope.cancel()  }   private suspend fun getResult(num: Int): Int {  delay(5000)  return num * num  } } 复制代码

与修改前的代码相比,async 可以并发执行任务,执行任务的时间也所以缩短了一半。

除了上述的并发执行任务,async 还能够对它的 start 入参设置成懒加载

val one = async(start = CoroutineStart.LAZY) { getResult(20) }
复制代码

这样系统就能够在调用它的时候再为它分配资源了。

suspend

suspend 是修饰函数的关键字,意思是当前的函数是能够挂起的,可是它仅仅起着提醒的做用,好比,当咱们的函数中没有须要挂起的操做的时候,编译器回给咱们提醒 Redudant suspend modifier,意思是当前的 suspend 是没有必要的,能够把它删除。

那咱们何时须要使用挂起函数呢?常见的场景有:

  • 耗时操做:使用 withContext 切换到指定的 IO 线程去进行网络或者数据库请求。
  • 等待操做:使用 delay方法去等待某个事件。

withContext 的代码:

private suspend fun getResult(num: Int): Int {
 return withContext(Dispatchers.IO) {  num * num  } } 复制代码

delay 的代码:

private suspend fun getResult(num: Int): Int {
 delay(5000)  return num * num } 复制代码

结合 Android Jetpack

在介绍自定义协程做用域的时候,咱们须要主动在 Activity 或者 Fragment 中的 onDestroy 方法中调用 job.cancel(),忘记处理多是程序员常常会犯的错误,如何避免呢?

Google 老是可以解决程序员的痛点,在 Android Jetpack 中的 lifecycleLiveDataViewModel 已经集成了快速使用协程的方法,若是咱们已经引入了 Android Jetpack,能够引入依赖:

dependencies {
 def lifecycle_version = "2.2.0"   // ViewModel  implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:$lifecycle_version"  // LiveData  implementation "androidx.lifecycle:lifecycle-livedata-ktx:$lifecycle_version"  // Lifecycles only (without ViewModel or LiveData)  implementation "androidx.lifecycle:lifecycle-runtime-ktx:$lifecycle_version"  } 复制代码

使用能够结合具体的场景,好比结合 Lifecycle,须要使用 lifecycleScope 协程做用域:

lifecycleScope.launch {
 // 表明当前生命周期处于 Resumed 的时候才会执行(选择性使用)  whenResumed {  // ... 具体的协程代码  } } 复制代码

即便你不使用 Android Jetpack 组件,因为 Lifecycles 在很早以前就内置在 Android 系统的代码中,因此你仍然能够仅仅引入 Lifecycle 的协程扩展库,由于它会帮助你很好的处理 Activity 或者 Fragment 的生命周期。

引入 Android Jetpack 协程扩展库官方文档:点我打开

2、流

长期以来,在 Android 中响应式编程的首选方案是 RxJava,咱们今天就来了解一下 Kotlin中的响应式编程 Flow。若是你能熟练使用 RxJava,那你确定能快速上手 Flow。

曾经我在《即学即用Android Jetpack - ViewModel & LiveData》一文中说过,LiveData 的使用相似于 RxJava,如今我收回这句话,事实上,LiveData 更加简单和纯粹,它创建单一的生产消费模型,Flow 才是相似于 RxJava 的存在。

1. 基础

先上一段代码:

lifecycleScope.launch {
 // 建立一个协程 Flow<T>  createFlow()  .collect {num->  // 具体的消费处理  // ...  }  } } 复制代码

我在 createFlow 这个方法中,返回了 Flow<Int> 的对象,因此咱们能够这样对比。

对比 Flow RxJava
数据源 Flow<T> Observable<T>
订阅 collect subscribe

建立 Flow 对象

咱们暂不考虑 RxJava中的背压和非背压,直接先将 Flow 对标 RxJava 中的 Observable

和 RxJava 同样,在建立 Flow 对象的时候咱们也须要调用 emit 方法发射数据:

fun createFlow(): Flow<Int> = flow {
 for (i in 1..10)  emit(i) } 复制代码

一直调用 emit 可能不便捷,由于 RxJava 提供了 Observable.just() 这类的操做符,显然,Flow 也为咱们提供了快速建立操做:

  • flowof(vararg elements: T):帮助可变数组生成 Flow 实例
  • 扩展函数 .asFlow():面向数组、列表等集合

好比可使用 (1..10).asFlow() 代替上述的 Flow 对象的建立。

消费数据

collect 方法和 RxJava 中的 subscribe 方法同样,都是用来消费数据的。

除了简单的用法外,这里有两个问题得注意一下:

  • collect 函数是一个 suspend 方法,因此它必须发生在协程或者带有 suspend 的方法里面,这也是我为何在一开始的时候启动了 lifecycleScope.launch
  • lifecycleScope 是我使用的 Lifecycle 的协程扩展库当中的,你能够替换成自定义的协程做用域。

2. 线程切换

咱们学习 RxJava 的时候,大佬们都会说,RxJava 牛逼,牛逼在哪儿呢?

切换线程,一样的,Flow 的协程切换也很牛逼。Flow 是这么切换协程的:

lifecycleScope.launch {
 // 建立一个协程 Flow<T>  createFlow()  // 将数据发射的操做放到 IO 线程中的协程  .flowOn(Dispatchers.IO)  .collect { num ->  // 具体的消费处理  // ...  }  } } 复制代码

和 RxJava 对比:

操做 Flow RxJava
改变数据发射的线程 flowOn subscribeOn
改变消费数据的线程 observeOn

改变数据发射的线程

flowOn 使用的参数是协程对应的调度器,它实质改变的是协程对应的线程。

改变消费数据的线程

我在上面的表格中并无写到在 Flow 中如何改变消费线程,并不意味着 Flow 不能够指定消费线程?

Flow 的消费线程在咱们启动协程指定调度器的时候就确认好了,对应着启动协程的调度器。好比在上面的代码中 lifecycleScope 启动的调度器是 Dispatchers.Main,那么 collect 方法就消费在主线程。

3. 异常和完成

异常捕获

对比 Flow RxJava
异常 catch onError

Flow 中的 catch 对应着 RxJava 中的 onErrorcatch 操做:

lifecycleScope.launch {
 flow {  //...  }.catch {e->   }.collect(   ) } 复制代码

除此之外,你可使用声明式捕获 try { } catch (e: Throwable) { } 去捕获异常,不过 catch 本质上是一个扩展方法,它是对声明式捕获的封装。

完成

对比 Flow RxJava
完成 onCompletion onComplete

Flow 中的 onCompletion 对应这 RxJava 中的 onComplete 回调,onCompletion操做:

lifecycleScope.launch {
 createFlow()  .onCompletion {  // 处理完成操做  }  .collect {   } } 复制代码

除此之外,咱们还能够经过捕获式 try {} finally {} 去获取完成状况。

4. Flow的特色

咱们在对 Flow 已经有了一些基础的认知了,再来聊一聊 Flow 的特色,Flow 具备如下特色:

  • 冷流
  • 有序
  • 协做取消

若是你对 Kotlin 中的 Sequence 有一些认识,那么你应该能够轻松的 Get 到前两个点。

冷流

有点相似于懒加载,当咱们触发 collect 方法的时候,数据才开始发射。

lifecycleScope.launch {
 val flow = (1..10).asFlow().flowOn(Dispatchers.Main)   flow.collect { num ->  // 具体的消费处理  // ...  }  } } 复制代码

也就是说,在第2行的时候,虽然流建立好了,可是数据一直到第四行发生 collect 才开始发射。

有序

看代码比较容易理解:

lifecycleScope.launch {
 flow {  for(i in 1..3) {  Log.e("Flow","$i emit")  emit(i)  }  }.filter {  Log.e("Flow","$it filter")  it % 2 != 0  }.map {  Log.e("Flow","$it map")  "${it * it} money"  }.collect {  Log.e("Flow","i get $it")  } } 复制代码

获得的日志:

E/Flow: 1 emit
E/Flow: 1 filter E/Flow: 1 map E/Flow: i get 1 money E/Flow: 2 emit E/Flow: 2 filter E/Flow: 3 emit E/Flow: 3 filter E/Flow: 3 map E/Flow: i get 9 money 复制代码

从日志中,咱们很容易得出这样的结论,每一个数据都是通过 emitfiltermapcollect 这一套完整的处理流程后,下个数据才会开始处理,而不是全部的数据都先统一 emit,完了再统一 filter,接着 map,最后再 collect

协做取消

Flow 采用和协程同样的协做取消,也就是说,Flow 的 collect 只能在可取消的挂起函数中挂起的时候取消,不然不能取消。

若是咱们想取消 Flow 得借助 withTimeoutOrNull 之类的顶层函数,不妨猜一下,下面的代码最终会打印出什么?

lifecycleScope.launch {
 val f = flow {  for (i in 1..3) {  delay(500)  Log.e(TAG, "emit $i")  emit(i)  }  }  withTimeoutOrNull(1600) {  f.collect {  delay(500)  Log.e(TAG, "consume $it")  }  }  Log.e(TAG, "cancel") } 复制代码

5. 操做符对比

限于篇幅,我仅介绍一下 Flow 中操做符的做用,就不一一介绍每一个操做符具体怎么使用了。

普通操做符:

Flow 操做符 做用
map 转换操做符,将 A 变成 B
take 后面跟 Int 类型的参数,表示接收多少个 emit 出的值
filter 过滤操做符

特殊的操做符

总会有一些特殊的状况,好比我只须要取前几个,我只要最新的数据等,不过在这些状况下,数据的发射就是并发执行的。

Flow 操做符 做用
buffer 数据发射并发,collect 不并发
conflate 发射数据太快,只处理最新发射的
collectLatest 接收处理太慢,只处理最新接收的

组合操做符

Flow 操做符 做用
zip 组合两个流,双方都有新数据才会发射处理
combine 组合两个流,在通过第一次发射之后,任意方有新数据来的时候就能够发射,另外一方有多是已经发射过的数据

展平流操做符

展平流有点相似于 RxJava 中的 flatmap,将你发射出去的数据源转变为另外一种数据源。

Flow 操做符 做用
flatMapConcat 串行处理数据
flatMapMerge 并发 collect 数据
flatMapLatest 在每次 emit 新的数据之后,会取消先前的 collect

末端操做符

顾名思义,就是帮你作 collect 处理,collect 是最基础的末端操做符。

末端流操做符 做用
collect 最基础的消费数据
toList 转化为 List 集合
toSet 转化为 Set 集合
first 仅仅取第一个值
single 确保流发射单个值
reduce 规约,若是发射的是 Int,最终会获得一个 Int,可作累加操做
fold 规约,能够说是 reduce 的升级版,能够自定义返回类型

其余还有一些操做符,我这里就不一一介绍了,感兴趣能够查看 API。

3、通道

Channel是一个面向多协程之间数据传输的 BlockQueue。它的使用方式超级简单:

lifecycleScope.launch {
 // 1. 生成一个 Channel  val channel = Channel<Int>()   // 2. Channel 发送数据  launch {  for(i in 1..5){  delay(200)  channel.send(i * i)  }  channel.close()  }   // 3. Channel 接收数据  launch {  for( y in channel)  Log.e(TAG, "get $y")  } } 复制代码

实现协程之间的数据传输须要三步:

1.建立 Channel

建立的 Channel的方式能够分为两种:

  • 直接建立对象:方式跟上述代码一致。
  • 扩展函数 produce

若是使用了扩展函数,代码就变成了:

lifecycleScope.launch {
 // 1. 生成一个 Channel  val channel = produce<Int> {  for(i in 1..5){  delay(200)  send(i * i)  }  close()  }   // 2. 接收数据  // ... 省略 跟以前代码一致 } 复制代码

直接将第一步和第二步合并了。

2. 发送数据

发送数据使用的 Channel#send() 方法,当咱们数据发送完毕的时候,可使用 Channel#close() 来代表通道已经结束数据的发送。

3. 接收数据

正常状况下,咱们仅须要调用 Channel#receive() 获取数据,可是该方法只能获取一次传递的数据,若是咱们仅需获取指定次数的数据,能够这么操做:

repeat(4){
 Log.e(TAG, "get ${channel.receive()}") } 复制代码

但若是发送的数据不能够预估呢?这个时候咱们就须要迭代 Channel

for( y in channel)
 Log.e(TAG, "get $y") 复制代码

4、多协程数据处理

多协程处理并发数据的时候,原子性一样也得不到保证,协程中出了一种叫 Mutex 的锁,区别是它的 lock 操做是挂起的,非阻塞的,感兴趣的同窗能够自行查看。

总结

我的感受协层的主要做用是简化代码的逻辑,减小了代码的回调地狱,结合 Kotlin,既能够写出优雅的代码,还能下降咱们犯错的几率。至于提高多协程开发的性能?

不存在的
不存在的

若是以为本文不错,「三连」是对我最大的鼓励。我将会在下一篇文章中和你们讨论协程的原理,欢迎你们关注。

学习协程和 kotlin 仍是颇有必要的,咱们团队在开发新的功能的时候,也所有选择了 Kotlin。

关于我

我是九心,新晋互联网码农,若是想要进阶和了解更多的干货,欢迎关注个人公众号接收到的个人最新文章。

微信二维码
微信二维码

参考文章:

《最全面的Kotlin协程: Coroutine/Channel/Flow 以及实际应用》
《Kotlin中文站》
《Kotlin 的协程用力瞥一眼》

相关文章
相关标签/搜索