本文介绍了咱们在开发 2019 Android 开发者峰会 (ADS) 应用时总结整理的 Flow 最佳实践 (应用源码已开源),咱们将和你们共同探讨应用中的每一个层级将如何处理数据流。html
ADS 应用的架构遵照 Android 官方的推荐架构指南,咱们在其中引入了 Domain 层 (用以囊括各类 UseCases 类) 来帮助分离焦点,进而保持代码的精简、复用性、可测试性。java
如同许多 Android 应用同样,ADS 应用从网络或缓存懒加载数据。咱们发现,这种场景很是适合 Flow。挂起函数 (suspend functions) 更适合于一次性操做。为了使用协程,咱们将重构分为两次 commit 提交: 第一次迁移了一次性操做,第二次将其迁移至数据流。android
在本文中,您将看到咱们把应用从 "在全部层级使用 LiveData",重构为 "只在 View 和 ViewModel 间使用 LiveData 进行通信,并在应用的底层和 UserCase 层架构中使用协程"。ios
您有两种方法在协程中处理数据流: 一种是 Flow API,另外一种是 Channel API。Channels 是一种同步原语,而 Flows 是为数据流模型所设计的: 它是订阅数据流的工厂。不过咱们可使用 Channels 来支持 Flows,这一点咱们稍后再说。git
相较于 Channel,Flow 更灵活,并提供了更明确的约束和更多操做符。github
因为末端操做符 (terminal operator) 会触发数据流的执行,同时会根据生产者一侧流操做来决定是成功完成操做仍是抛出异常,所以 Flows 会自动地关闭数据流,您基本不会在生产者一侧泄漏资源;而一旦 Channel 没有正确关闭,生产者可能不会清理大型资源,所以 Channels 更容易形成资源泄漏。数据库
应用数据层负责提供数据,一般是从数据库中读取,或从网络获取数据,例如,示例是一个数据源接口,它提供了一个用户事件数据流:数组
interface UserEventDataSource {
fun getObservableUserEvent(userId: String): Flow<UserEventResult>
}
复制代码
1. UseCase 层和 Repository 层缓存
介于 View/ViewModel 和数据源之间的层 (在咱们的例子中是 UseCase 和 Repository) 一般须要合并来自多个查询的数据,或在 ViewModel 层使用以前转化数据。就像 Kotlin sequences 同样,Flow 支持大量操做符来转换数据。目前已经有大量的可用的操做符,同时您也能够建立您本身的转换器 (好比,使用 transform 操做符)。不过 Flow 在许多的操做符中暴露了 suspend lambda 表达式,所以在大多数状况下没有必要经过自定义转换来完成复杂任务,能够直接在 Flow 中调用挂起函数。bash
在 ADS 应用中,咱们想将 UserEventResult 和 Repository 层中的会话数据进行绑定。咱们利用 map 操做符来将一个 suspend lambda 表达式应用在从数据源接收到的每个 Flow 的值上:
/* Copyright 2019 Google LLC.
SPDX-License-Identifier: Apache-2.0 */
class DefaultSessionAndUserEventRepository(
private val userEventDataSource: UserEventDataSource,
private val sessionRepository: SessionRepository
) : SessionAndUserEventRepository {
override fun getObservableUserEvent(
userId: String?,
eventId: SessionId
): Flow<Result<LoadUserSessionUseCaseResult>> {
// 处理 userId
// 监听用户事件,并将其与 Session 数据进行合并
return userEventDataSource.getObservableUserEvent(userId, eventId).map { userEventResult ->
val event = sessionRepository.getSession(eventId)
// 将 Session 和用户数据进行合并,并传递结果
val userSession = UserSession(
event,
userEventResult.userEvent ?: createDefaultUserEvent(event)
)
Result.Success(LoadUserSessionUseCaseResult(userSession))
}
}
}
复制代码
2. ViewModel
在利用 LiveData 执行 UI ↔ ViewModel 通讯时,ViewModel 层应该利用末端操做符来消费来自数据层的数据流 (好比: collect、first 或者是 toList) 。
/* Copyright 2019 Google LLC.
SPDX-License-Identifier: Apache-2.0 */
// 真实代码的简化版
class SessionDetailViewModel(
private val loadUserSessionUseCase: LoadUserSessionUseCase,
...
): ViewModel() {
private fun listenForUserSessionChanges(sessionId: SessionId) {
viewModelScope.launch {
loadUserSessionUseCase(sessionId).collect { loadResult ->
}
}
}
}
复制代码
完整代码能够参考这里: github.com/google/iosc…
若是您须要将 Flow 转化为 LiveData,则可使用 AndroidX lifecycle library 提供的 Flow.asLiveData() 扩展函数 (extension function)。这个扩展函数很是便于使用,由于它共享了 Flow 的底层订阅,同时根据观察者的生命周期管理订阅。此外,LiveData 能够为后续添加的观察者提供最新的数据,其订阅在配置发生变动的时候依旧可以生效。下面利用一段简单的代码来演示如何使用这个扩展函数:
class SimplifiedSessionDetailViewModel(
private val loadUserSessionUseCase: LoadUserSessionUseCase,
...
): ViewModel() {
val sessions = loadUserSessionUseCase(sessionId).asLiveData()
}
复制代码
特别说明: 这段代码不是 ADS 应用的,它只是用来演示如何使用 Flow.asLiveData()。
回到数据源的实现,要怎样去实现以前暴露的 getObservableUserEvent 函数?咱们考虑了两种实现: flow 构造器,或 BroadcastChannel 接口,这两种实现应用于不一样的场景。
1. 何时使用 Flow ?
Flow 是一种 "冷流"(Cold Stream)。"冷流" 是一种数据源,该类数据源的生产者会在每一个监听者开始消费事件的时候执行,从而在每一个订阅上建立新的数据流。一旦消费者中止监听或者生产者的阻塞结束,数据流将会被自动关闭。
Flow 很是适合须要开始/中止数据的产生来匹配观察者的场景。
您能够利用 flow 构造器来发送有限个/无限个元素。
val oneElementFlow: Flow<Int> = flow {
// 生产者代码开始执行,流被打开
emit(1)
// 生产者代码结束,流将被关闭
}
val unlimitedElementFlow: Flow<Int> = flow {
// 生产者代码开始执行,流被打开
while(true) {
// 执行计算
emit(result)
delay(100)
}
// 生产者代码结束,流将被关闭
}
复制代码
Flow 经过协程取消功能提供自动清理功能,所以倾向于执行一些重型任务。请注意,这里提到的取消是有条件的,一个永不挂起的 Flow 是永不会被取消的: 在咱们的例子中,因为 delay 是一个挂起函数,用于检查取消状态,当订阅者中止监听时,Flow 将会中止并清理资源。
2. 何时使用 BroadcastChannel
Channel 是一个用于协程间通讯的并发原语。BroadcastChannel 基于 Channel,并加入了多播功能。
可能在这样一些场景里,您可能会考虑在数据源层中使用 BroadcastChannel:
若是生产者和消费者的生命周期不一样或者彼此彻底独立运行时,请使用 BroadcastChannel。
若是您但愿生产者有独立的生命周期,同时向任何存在的监听者发送当前数据的时候,BroadcastChannel API 很是适合这种场景。在这种状况下,当新的监听者开始消费事件时,生产者不须要每次都被执行。
您依然能够向调用者提供 Flow,它们不须要知道具体的实现。您可使用 BroadcastChannel.asFlow() 这个扩展函数来将一个 BroadcastChannel 做为一个 Flow 使用。
不过,关闭这个特殊的 Flow 不会取消订阅。当使用 BroadcastChannel 的时候,您必须本身管理生命周期。BroadcastChannel 没法感知到当前是否还存在监听者,除非关闭或取消 BroadcastChannel,不然将会一直持有资源。请确保在不须要 BroadcastChannel 的时候将其关闭。同时请注意关闭后的 BroadcastChannel 没法再次被使用,若是须要,您须要从新建立实例。
接下来,咱们将分享如何使用 BroadcastChannel API 的示例。
3. 特别说明
部分 Flow 和 Channel API 仍处于实验阶段,极可能会发生变更。在一些状况下,您可能会正在使用 Channel,不过在将来可能会建议您使用 Flow。具体来说,StateFlow 和 Flow 的 share operator 方案可能在将来会减小 Channel 的使用。
包含 Room 在内的不少库已经支持将协程用于数据流操做。对于那些还不支持的库,您能够将任何基于回调的 API 转换为协程。
1. Flow 的实现
若是您想将一个基于回调的流 API 转换为使用 Flow,您可使用 channelFlow 函数 (固然也可使用 callbackFlow,它们都基于相同的实现)。channelFlow 将会建立一个 Flow 的实例,该实例中的元素将传递给一个 Channel。这样能够容许咱们在不一样的上下文或并发中提供元素。
如下示例中,咱们想要把从回调中拿到的元素发送到 Flow 中:
/* Copyright 2019 Google LLC.
SPDX-License-Identifier: Apache-2.0 */
override fun getObservableUserEvent(userId: String, eventId: SessionId): Flow<UserEventResult> {
// 1) 利用 channelFlow 建立一个 Flow
return channelFlow<UserEventResult> {
val eventDocument = firestore.collection(USERS_COLLECTION)
.document(userId)
.collection(EVENTS_COLLECTION)
.document(eventId)
// 1) 将回调注册到 API 上
val subscription = eventDocument.addSnapshotListener { snapshot, _ ->
val userEvent = if (snapshot.exists()) {
parseUserEvent(snapshot)
} else { null }
// 2) 将数据发送到 Flow
channel.offer(UserEventResult(userEvent))
}
// 3) 请不要关闭数据流,在消费者关闭或者 API 调用 onCompleted/onError 函数以前,请保证数据流
// 一直处于打开状态。
// 当数据流关闭后,请取消第三方库的订阅。
awaitClose { subscription.remove() }
}
}
复制代码
详细代码能够参考这里:
2. BroadcastChannel 实现
对于使用 Firestore 跟踪用户身份认证的数据流,咱们使用了 BroadcastChannel API,由于咱们但愿注册一个有独立生命周期的 Authentication 监听者,同时也但愿能向全部正在监听的对象广播当前的结果。
转化回调 API 为 BroadcastChannel 相比转化为 Flow 要略复杂一点。您能够建立一个类,并设置将实例化后的 BroadcastChannel 做为变量保存。在初始化期间,注册回调,像之前同样将元素发送到 BroadcastChannel:
/* Copyright 2019 Google LLC.
SPDX-License-Identifier: Apache-2.0 */
class FirebaseAuthStateUserDataSource(...) : AuthStateUserDataSource {
private val channel = ConflatedBroadcastChannel<Result<AuthenticatedUserInfo>>()
private val listener: ((FirebaseAuth) -> Unit) = { auth ->
// 数据处理逻辑
// 将当前的用户 (数据) 发送给消费者
if (!channel.isClosedForSend) {
channel.offer(Success(FirebaseUserInfo(auth.currentUser)))
} else {
unregisterListener()
}
}
@Synchronized
override fun getBasicUserInfo(): Flow<Result<AuthenticatedUserInfo>> {
if (!isListening) {
firebase.addAuthStateListener(listener)
isListening = true
}
return channel.asFlow()
}
}
复制代码
为了测试 Flow 转换 (就像咱们在 UseCase 和 Repository 层中所作的那样),您能够利用 flow 构造器返回一个假数据,例如:
/* Copyright 2019 Google LLC.
SPDX-License-Identifier: Apache-2.0 */
object FakeUserEventDataSource : UserEventDataSource {
override fun getObservableUserEvents(userId: String) = flow {
emit(UserEventsResult(userEvents))
}
}
class DefaultSessionAndUserEventRepositoryTest {
@Test
fun observableUserEvents_areMappedCorrectly() = runBlockingTest {
// 准备一个 repo
val userEvents = repository
.getObservableUserEvents("user", true).first()
// 对接收到的用户事件进行断言
}
}
复制代码
为了成功完成测试,一个比较好的作法是使用 take 操做符来从 Flow 中获取一些数据,使用 toList 做为末端操做符来从数组中获取结果。示例以下:
class AnotherStreamDataSourceImplTest {
@Test
fun `Test happy path`() = runBlockingTest {
//准备好 subject
val result = subject.flow.take(1).toList()
// 断言结果和预期的一致
}
}
复制代码
take 操做符很是适合在获取到数据后关闭 Flow。在测试完毕后不关闭 Flow 或 BroadcastChannel 将会致使内存泄漏以及测试结果不一致。
注意: 若是在数据源的实现是经过 BroadcastChannel 完成的,那么上面的代码还不够。您须要本身管理数据源的生命周期,并确保 BroadcastChannel 在测试开始以前已经启动,同时须要在测试结束后将其关闭,不然将会致使内存泄漏。
协程测试的最佳实践在这里依然适用。若是您在测试代码中建立新的协程,则可能想要在测试线程中执行它来确保测试得到执行。
您也能够经过视频回顾 2019 Android 开发者峰会演讲 —— 在 Android 上测试协程:
点击查看视频:v.qq.com/x/page/d303…
2019 ADS 应用在 GitHub 开源,请访问下方连接在 GitHub 上查看更详细的代码实现: github.com/google/iosc…