Kotlin 应用 | 用协程控制多个并行异步结果的优先级

你会怎么实现下面这个场景?应用首页有三个优先级从高到低的弹窗,展现内容依赖网络请求,若低优先级弹窗请求先返回则需等待,让高优先级先展现。git

串行请求是最容易想到的解决方案,即先请求最高优先级的弹窗,当它返回展现后再请求第二优先级弹窗。但这样会拉长全部弹窗的展现时间。github

性能更好的方案是同时并行发出三个请求,但网络请求时长的不肯定性使得最高优先级的弹窗不必定优先返回,因此得设计一种优先级阻塞机制。本文使用 协程 + 链式队列 实现这个机制。web

异步任务链

把单个异步任务进行抽象:网络

// 单个异步任务
class Item {  companion object {  // 默认异步优先级  const val PRIORITY_DEFAULT = 0  }   // 异步操做:耗时的异步操做  var suspendAction: (suspend () -> Any?)? = null  set(value) {  field = value  value?.let {  // 启动协程执行异步操做  GlobalScope.launch { deferred = async { it.invoke() } }  }  }  // 异步响应:异步操做结束后要作的事情  var resumeAction: ((Any?) -> Unit)? = null  // 异步结果:异步操做返回值  var deferred: Deferred<*>? = null  // 异步优先级  var priority: Int = PRIORITY_DEFAULT } 复制代码

异步任务有三个主要的属性,分别是异步操做suspendAction、异步响应resumeAction、异步结果deferred。每当异步操做被赋值时,就启动协程执行它,并将其返回值保存在deferred中。app

为了确保异步任务的优先级,把多个异步任务用链的方式串起来,组成异步任务链dom

class Item {
 companion object {  const val PRIORITY_DEFAULT = 0  }   var suspendAction: (suspend () -> Any?)? = null  set(value) {  field = value  value?.let {  GlobalScope.launch { deferred = async { it.invoke() } }  }  }   var resumeAction: ((Any?) -> Unit)? = null  var deferred: Deferred<*>? = null  var priority: Int = PRIORITY_DEFAULT   // 异步任务前结点(Item 包含 Item)  internal var next: Item? = null  // 异步任务后结点(Item 包含 Item)  internal var pre: Item? = null   // 在当前结点后插入新结点  internal fun addNext(item: Item) {  next?.let {  it.pre = item  item.next = it  item.pre = this  this.next = item  } ?: also { // 尾结点插入  this.next = item  item.pre = this  item.next = null  }  }   // 在当前结点前插入新结点  internal fun addPre(item: Item) {  pre?.let {  it.next = item  item.pre = it  item.next = this  this.pre = item  } ?: also { // 头结点插入  item.next = this  item.pre = null  this.pre = item  }  } } 复制代码

本身包含本身 的方式就能实现链式结构。Android 消息列表也用一样的结构:异步

public final class Message implements Parcelable {
 Message next;  ... } 复制代码

链必须有个头结点,存放头结点的类就是存放整个链的类,就好像消息列表MessageQueue同样:async

public final class MessageQueue {
 Message mMessages; } 复制代码

模仿消息列表,写一个异步任务链编辑器

// 异步任务链
class SuspendList {  // 头结点  private var head: Item = emptyItem()   // 向异步任务链插入结点  fun add(item: Item) {  // 从头结点向后查找插入位置,找到再后插入  head.findItem(item.priority).addNext(item)  }   // 根据优先级向后查找插入位置(优先级升序)  private fun Item.findItem(priority: Int): Item {  // 当前结点  var p: Item? = this  // 当前结点的后续结点  var next: Item? = p?.next  // 从当前结点开始向后遍历异步任务链  while (next != null) {  // 若优先级介于当前结点和其后续结点之间,则表示找到插入位置  if (priority in p!!.priority until next.priority) {  break  }  p = p.next  next = p?.next  }  return p!!  }   // 观察异步任务链并按优先级阻塞  fun observe() = GlobalScope.launch(Dispatchers.Main) {  // 从头结点向后遍历异步任务链  var p: Item? = head.next  while (p != null) {  // 在每一个异步结果上阻塞,直到异步任务完成后执行异步响应  p.resumeAction?.invoke(p.deferred?.await())  p = p.next  }  }   // 异步任务(已讲解再也不赘述)  class Item { ... } }  // 空结点(头结点) fun emptyItem(): SuspendList.Item = SuspendList.Item().apply { priority = -1 } 复制代码

SuspendList持有链的头结点,为了使“头插入”和“中间插入”复用一套代码,将头结点设置为“空结点”。函数

异步任务链上的任务按优先级升序排列(优先级数字越小优先级越高)。这保证了优先级最高的异步任务老是在链表头。

优先级阻塞:当全部异步任务都被添加到链以后,调用observe()观察整个异步任务链。该方法启动了一个协程,在协程中从头结点向后遍历链,并在每一个异步任务的Deferred上阻塞。由于链表已按优先级排序,因此阻塞时也是按优先级从高到低进行的。

全局做用域

真实业务场景中,须要统一安排优先级的异步任务多是跨界面的。这就要求异步任务链能全局访问,单例是一个最直接的选择,但它限制了整个 App 中异步任务链的数量:

// 私有构造函数
class SuspendList private constructor() {  companion object {  // 静态 map 存放全部异步任务链  var map = ArrayMap<String, SuspendList>()  // 根据 key 构建异步任务链  fun of(key: String): SuspendList = map[key] ?: let {  val p = SuspendList()  map[key] = p  p  }  }  ... } 复制代码

而后就能够像这样使用异步任务链:

// 构建异步任务链
SuspendList.of("dialog").apply {  // 向链添加异步任务1  add(Item {  suspendAction = { fetchUser() }  resumeAction = { user: Any? -> onUserResume(user) }  priority = 3  })  // 向链添加异步任务2  add(Item {  suspendAction = { fetchActivity() }  resumeAction = { activity: Any? -> onActivityResume(activity) }  priority = 1  }) }.observe()  suspend fun fetchUser(): String {  delay(4000)  return "User Taylor" }  suspend fun fetchActivity(): String {  delay(5000)  return "Activity Bonus" }  private fun onActivityResume(activity: Any?) {  Log.v("test", "activity(${activity.toString()}) resume") }  private fun onUserResume(user: Any?) {  Log.v("test", "user(${user.toString()}) resume") } 复制代码

上述代码构建了一个名为 dialog 的异步任务链,向其中添加了两个异步任务,并按优先级观察它们的结果。

其中Item()是一个顶层方法,用于构建单个异步任务:

fun Item(init: SuspendList.Item.() -> Unit): SuspendList.Item = SuspendList.Item().apply(init)
复制代码

这是构建对象 DSL 的标准写法,详细讲解能够参见这里

运用 DSL 的思路还能够进一步将构建代码简化成这样:

SuspendList.of("dialog") {
 Item {  suspendAction = { fetchUser() }  resumeAction = { user: Any? -> onUserResume(user) }  priority = 3  }  Item {  suspendAction = { fetchActivity() }  resumeAction = { activity: Any? -> onActivityResume(activity) }  priority = 1  } }.observe() 复制代码

不过须要对原先的of()Item()作一些调整:

// 新增接收者为SuspendList的 lambda 参数,为构建异步任务提供外层环境
fun of(key: String, init: SuspendList.() -> Unit): SuspendList = (map[key] ?: let {  val p = SuspendList()  map[key] = p  p }).apply(init)  // 将构建异步任务声明为SuspendList的扩展方法 // 构建异步任务后将其插入到异步任务链中 fun SuspendList.Item(init: SuspendList.Item.() -> Unit): SuspendList.Item =  SuspendList.Item().apply(init).also { add(it) } 复制代码

异步超时

若某个高优先级的异步任务迟迟不能结束,其它任务只能都被阻塞?

得加个超时参数:

class Item {
 // 为异步操做赋值时,再也不马上构建协程  var suspendAction: (suspend () -> Any?)? = null  // 超时时长  var timeout: Long = -1  ... } 复制代码

为单个异步任务添加超时时长参数,还得重构一下异步任务的构建函数:

fun SuspendList.Item(init: SuspendList.Item.() -> Unit): SuspendList.Item =
 SuspendList.Item().apply {  // 为异步任务设置各类参数  init()  // 启动协程  GlobalScope.launch {  // 将异步任务结果包装成 Deferred  deferred = async {  // 若须要超时机制  if (timeout > 0) {  withTimeoutOrNull(timeout) { suspendAction?.invoke() }  }  // 不须要超时机制  else {  suspendAction?.invoke()  }  }  }  }.also { add(it) } 复制代码

本来在suspendAction赋值时就立马启动协程,如今将其延后,等全部参数都设置完毕后才启动。这样能够避免“先为 suspendAction 赋值,再为 timeout 赋值”case 下超时无效的 bug。

使用withTimeoutOrNull()实现超时机制,当超时发生时,业务会从resumeAction中得到null

异步任务生命周期管理

构建异步任务链时使用了GlobalScope.launch()启动协程,其建立的协程不符合structured-concurrency。因此须要手动管理生命周期:

class SuspendList private constructor() {
 class Item {  // 为异步任务新增 Job 属性,指向其对应的协程  internal var job:Job? = null  ...  }   // observer()返回类型为 Job,业务层能够在须要的时候取消它  fun observe() = GlobalScope.launch(Dispatchers.Main) {  var p: Item? = head.next  while (p != null) {  p.resumeAction?.invoke(p.deferred?.await())  // 当异步任务响应被处理后,取消其协程以释放资源  p.job?.cancel()  p = p.next  }  } }  fun SuspendList.Item(init: SuspendList.Item.() -> Unit): SuspendList.Item =  SuspendList.Item().apply {  init()  // 将该异步任务的协程存储在 job 中  job = GlobalScope.launch {  deferred = async {  if (timeout > 0) {  withTimeoutOrNull(timeout) { suspendAction?.invoke() }  } else {  suspendAction?.invoke()  }  }  }  }.also { add(it) } 复制代码

Talk is cheap, show me the code

本篇的完整源码能够点击这里

推荐阅读

这是该系列的第十二篇,系列文章目录以下:

  1. Kotlin基础 | 白话文转文言文般的Kotlin常识

  2. Kotlin基础 | 望文生义的Kotlin集合操做

  3. Kotlin实战 | 用实战代码更深刻地理解预约义扩展函数

  4. Kotlin实战 | 使用DSL构建结构化API去掉冗余的接口方法

  5. Kotlin基础 | 抽象属性的应用场景

  6. Kotlin进阶 | 动画代码太丑,用DSL动画库拯救,像说话同样写代码哟!

  7. Kotlin基础 | 用约定简化相亲

  8. Kotlin基础 | 2 = 12 ?泛型、类委托、重载运算符综合应用

  9. Kotlin实战 | 语法糖,总有一颗甜到你(持续更新)

  10. Kotlin 实战 | 干掉 findViewById 和 Activity 中的业务逻辑

  11. Kotlin基础 | 为何要这样用协程?

  12. Kotlin 应用 | 用协程控制多个并行异步结果的优先级

相关文章
相关标签/搜索