若是您是库做者,您也许但愿用户在使用 Kotlin 协程与 Flow 时能够更加轻松地调用您基于 Java 或回调的 API。另外,若是您是 API 的使用者,则可能愿意将第三方 API 界面适配协程,以使它们对 Kotlin 更友好。html
本文将会介绍如何使用协程和 Flow 简化 API,以及如何使用 suspendCancellableCoroutine 和 callbackFlow API 建立您本身的适配器。针对那些富有好奇心的读者,本文还会对这些 API 进行剖析,以帮您了解它们底层的工做原理。react
若是您更喜欢观看视频,能够 点击这里。android
在您为现有 API 编写本身的封装以前,请检查是否已经存在针对您的用例的适配器或者 扩展方法。下面是一些包含常见类型协程适配器的库。git
Future 类型github
对于 future 类型,Java 8 集成了 CompletableFuture,而 Guava 集成了 ListenableFuture。这里提到的并非所有,您能够在线搜索以肯定是否存在适用于您的 future 类型的适配器。编程
// 等待 CompletionStage 的执行完成而不阻塞线程 suspend fun <T> CompletionStage<T>.await(): T // 等待 ListenableFuture 的执行完成而不阻塞线程 suspend fun <T> ListenableFuture<T>.await(): T
使用这些函数,您能够摆脱回调并挂起协程直到 future 的结果被返回。api
Reactive Streamapp
对于响应式流的库,有针对 RxJava、Java 9 API 与 响应式流库 的集成:异步
// 将给定的响应式 Publisher 转换为 Flow fun <T : Any> Publisher<T>.asFlow(): Flow<T>
这些函数将响应式流转换为了 Flow。jvm
Android 专用 API
对于 Jetpack 库或 Android 平台 API,您能够参阅 Jetpack KTX 库 列表。现有超过 20 个库拥有 KTX 版本,构成了您所熟悉的 Java API。其中包括 SharedPreferences、ViewModels、SQLite 以及 Play Core。
回调
回调是实现异步通信时很是常见的作法。事实上,咱们在 后台线程任务运行指南 中将回调做为 Java 编程语言的默认解决方案。然而,回调也有许多缺点: 这一设计会致使使人费解的回调嵌套。同时,因为没有简单的传播方式,错误处理也更加复杂。在 Kotlin 中,您能够简单地使用协程调用回调,但前提是您必须建立您本身的适配器。
若是没有找到适合您用例的适配器,更直接的作法是本身编写适配器。对于一次性异步调用,可使用 suspendCancellableCoroutine API;而对于流数据,可使用 callbackFlow API。
做为练习,下面的示例将会使用来自 Google Play Services 的 Fused Location Provider API 来获取位置数据。此 API 界面十分简单,可是它使用回调来执行异步操做。当逻辑变得复杂时,这些回调容易使代码变得不可读,而咱们可使用协程来摆脱它们。
若是您但愿探索其它解决方案,能够经过上面函数所连接的源代码为您带来启发。
一次性异步调用
Fused Location Provider API 提供了 getLastLocation) 方法来得到 最后已知位置。对于协程来讲,理想的 API 是一个直接返回确切结果的挂起函数。
注意: 这一 API 返回值为 Task,而且已经有了对应的 适配器。出于学习的目的,咱们用它做为范例。
咱们能够经过为 FusedLocationProviderClient
建立扩展函数来得到更好的 API:
suspend fun FusedLocationProviderClient.awaitLastLocation(): Location
因为这是一个一次性异步操做,咱们使用 suspendCancellableCoroutine
函数: 一个用于从协程库建立挂起函数的底层构建块。
suspendCancellableCoroutine
会执行做为参数传入的代码块,而后在等待继续信号期间挂起协程的执行。当协程 Continuation 对象中的 resume
或 resumeWithException
方法被调用时,协程会被恢复执行。有关 Continuation 的更多信息,请参阅: Kotlin Vocabulary | 揭秘协程中的 suspend 修饰符。
咱们使用能够添加到 getLastLocation 方法中的回调来在合适的时机恢复协程。参见下面的实现:
// FusedLocationProviderClient 的扩展函数,返回最后已知位置 suspend fun FusedLocationProviderClient.awaitLastLocation(): Location = // 建立新的可取消协程 suspendCancellableCoroutine<Location> { continuation -> // 添加恢复协程执行的监听器 lastLocation.addOnSuccessListener { location -> // 恢复协程并返回位置 continuation.resume(location) }.addOnFailureListener { e -> // 经过抛出异常来恢复协程 continuation.resumeWithException(e) } // suspendCancellableCoroutine 块的结尾。这里会挂起协程 //直到某个回调调用了 continuation 参数 }
注意: 尽管协程库中一样包含了不可取消版本的协程构建器 (即 suspendCoroutine
),但最好始终选择使用 suspendCancellableCoroutine
处理协程做用域的取消及从底层 API 传播取消事件。
suspendCancellableCoroutine 原理
在内部,suspendCancellableCoroutine 使用 suspendCoroutineUninterceptedOrReturn 在挂起函数的协程中得到 Continuation。这一 Continuation
对象会被一个 CancellableContinuation 对象拦截,后者会今后时开始控制协程的生命周期 (其 实现 具备 Job 的功能,可是有一些限制)。
接下来,传递给 suspendCancellableCoroutine
的 lambda 表达式会被执行。若是该 lambda 返回告终果,则协程将当即恢复;不然协程将会在 CancellableContinuation 被 lambda 手动恢复前保持挂起状态。
您能够经过我在下面代码片断 (原版实现) 中的注释来了解发生了什么:
public suspend inline fun <T> suspendCancellableCoroutine( crossinline block: (CancellableContinuation<T>) -> Unit ): T = // 获取运行此挂起函数的协程的 Continuation 对象 suspendCoroutineUninterceptedOrReturn { uCont -> // 接管协程。Continuation 已经被拦截, // 接下来将会遵循 CancellableContinuationImpl 的生命周期 val cancellable = CancellableContinuationImpl(uCont.intercepted(), ...) /* ... */ // 使用可取消 Continuation 调用代码块 block(cancellable) // 挂起协程而且等待 Continuation 在 “block” 中被恢复,或者在 “block” 结束执行时返回结果 cancellable.getResult() }
想了解更多有关挂起函数的工做原理,请参阅这篇: Kotlin Vocabulary | 揭秘协程中的 suspend 修饰符。
流数据
若是咱们转而但愿用户的设备在真实的环境中移动时,周期性地接收位置更新 (使用 requestLocationUpdates) 函数),咱们就须要使用 Flow 来建立数据流。理想的 API 看起来应该像下面这样:
fun FusedLocationProviderClient.locationFlow(): Flow<Location>
为了将基于回调的 API 转换为 Flow,可使用 callbackFlow 流构建器来建立新的 flow。callbackFlow
的 lambda 表达式的内部处于一个协程的上下文中,这意味着它能够调用挂起函数。不一样于 flow 流构建器,channelFlow 能够在不一样的 CoroutineContext 或协程以外使用 offer 方法发送数据。
一般状况下,使用 callbackFlow 构建流适配器遵循如下三个步骤:
将上述步骤应用于当前用例,咱们获得如下实现:
// 发送位置更新给消费者 fun FusedLocationProviderClient.locationFlow() = callbackFlow<Location> { // 建立了新的 Flow。这段代码会在协程中执行。 // 1. 建立回调并向 flow 中添加元素 val callback = object : LocationCallback() { override fun onLocationResult(result: LocationResult?) { result ?: return // 忽略为空的结果 for (location in result.locations) { try { offer(location) // 将位置发送到 flow } catch (t: Throwable) { // 位置没法发送到 flow } } } } // 2. 注册回调并经过调用 requestLocationUpdates 获取位置更新。 requestLocationUpdates( createLocationRequest(), callback, Looper.getMainLooper() ).addOnFailureListener { e -> close(e) // 在出错时关闭 flow } // 3. 等待消费者取消协程并注销回调。这一过程会挂起协程,直到 Flow 被关闭。 awaitClose { // 在这里清理代码 removeLocationUpdates(callback) } }
callbackFlow 内部原理
在内部,callbackFlow 使用了一个 channel。channel 在概念上很接近阻塞 队列) —— 它在配置时须要指定容量 (capacity): 便可以缓冲的元素个数。在 callbackFlow 中建立的 channel 默认容量是 64 个元素。若是将新元素添加到已满的 channel,因为 offer 不会将元素添加到 channel 中,而且会当即返回 false,因此 send 会暂停生产者,直到频道 channel 中有新元素的可用空间为止。
awaitClose 内部原理
有趣的是,awaitClose
内部使用的是 suspendCancellableCoroutine
。您能够经过我在如下代码片断中的注释 (查看 原始实现) 一窥究竟:
public suspend fun ProducerScope<*>.awaitClose(block: () -> Unit = {}) { ... try { // 使用可取消 continuation 挂起协程 suspendCancellableCoroutine<Unit> { cont -> // 仅在 Flow 或 Channel 关闭时成功恢复协程,不然保持挂起 invokeOnClose { cont.resume(Unit) } } } finally { // 老是会执行调用者的清理代码 block() } }
复用 Flow
除非额外使用中间操做符 (如: conflate
),不然 Flow 是冷且惰性的。这意味着每次调用 flow 的终端操做符时,都会执行构建块。对于咱们的用例来讲,因为添加一个新的位置监听器开销很小,因此这一特性不会有什么大问题。然而对于另外的一些实现可就不必定了。
您可使用 shareIn
中间操做符在多个收集器间复用同一个 flow,并使冷流成为热流。
val FusedLocationProviderClient.locationFlow() = callbackFlow<Location> { ... }.shareIn( // 让 flow 跟随 applicationScope applicationScope, // 向新的收集器发送上一次发送的元素 replay = 1, // 在有活跃的订阅者时,保持生产者的活跃状态 started = SharingStarted.WhileSubscribed() )
您能够经过文章《协程中的取消和异常 | 驻留任务详解》来了解更多有关在应用中使用 applicationScope
的最佳实践。
您应当考虑经过建立协程适配器使您的 API 或现存 API 简洁、易读且符合 Kotlin 的使用习惯。首先检查是否已经存在可用的适配器,若是没有,您可使用 suspendCancellableCoroutine
针对一次性调用;或使用 callbackFlow
针对流数据,来建立您本身的适配器。
您能够经过 codelab: 建立 Kotlin 扩展库,来上手本文所介绍的话题。