Kotlin coroutines meeting Architecture components

前言

Kotlin coroutines 为咱们提供了一种编写良好异步代码的简易 API。在 Kotlin coroutines 中,咱们能够自定义 CoroutineScope,用来管理咱们启动的 coroutines 的运行位置。须要注意的是,每一个 coroutine 都须要运行在 CoroutineScope 中。 Architecture components 为咱们提供了在各组件中使用 coroutine 的官方支持。html

Add KTX dependencies

这些 coroutine scopes 做为 Architecture components 的扩展包提供给开发者使用,它们位于 KTX extensions。经过单独添加如下依赖,咱们就可使用它们。java

  • ViewModelScope:androidx.lifecycle:lifecycle-viewmodel-ktx:2.1.0-beta01 或更高
  • LifecycleScope:androidx.lifecycle:lifecycle-runtime-ktx:2.2.0-alpha01 或更高
  • LiveData:androidx.lifecycle:lifecycle-livedata-ktx:2.2.0-alpha01 或更高

最新版本可在 Google Maven 中找到。android

Lifecycle-aware coroutine scopes

Architecture components 为咱们提供了 ViewModelScopeLifecycleScope 用于管理咱们在 ViewModel 和 Lifecycle 中启动的 Coroutinesgit

ViewModelScope

Usage

ViewModelScope 是 ViewModel 的一个扩展属性,所以咱们能够在每一个 ViewModel 的子类中使用它。每一个在 ViewModelScope 中启动的 coroutine 都会在 ViewModel 销毁(ViewModel#onCleared())的时候自动取消(cancel)。若是咱们只须要在 ViewModel 存活(active)时作一些逻辑处理,使用 Coroutines 是一个好的选择。举个栗子,假如咱们在 ViewModel 中为 View 层计算一些数据而后将结果显示到 UI 上,咱们应该限定这些计算工做在 ViewModel 的生命周期内执行。这样当咱们的 ViewModel 销毁的时候,这些计算工做也会自动取消,避免资源浪费和内存泄露。 咱们可使用 ViewModel 的扩展属性 -- ViewModelScope,来限定 Coroutines 的运行范围。使用方式以下:github

class MyViewModel: ViewModel {
    init {
        // 在 ViewModelScope 中启动一个 coroutine
        viewModelScope.launch {
            // 这个 coroutine 会在 ViewModel 销毁时被自动取消。
        }
    }
}
复制代码

Source code

下面咱们来看下源码是如何实现的。web

ViewModel.kt

// 用于存放 viewModelScope 的 key
private const val JOB_KEY = "androidx.lifecycle.ViewModelCoroutineScope.JOB_KEY"

/** * [CoroutineScope] tied to this [ViewModel]. * This scope will be canceled when ViewModel will be cleared, i.e [ViewModel.onCleared] is called * * This scope is bound to [Dispatchers.Main] */
val ViewModel.viewModelScope: CoroutineScope
        get() {
            // 首先从缓存中取 CoroutineScope,若非第一次调用 viewModelScope,则会直接返回
            val scope: CoroutineScope? = this.getTag(JOB_KEY)
            if (scope != null) {
                return scope
            }
            // 若首次调用,则新建一个 CloseableCoroutineScope,并存在 ViewModel 中。
            // 这个 CloseableCoroutineScope 与主线程绑定。
            return setTagIfAbsent(JOB_KEY,
                CloseableCoroutineScope(SupervisorJob() + Dispatchers.Main))
        }

internal class CloseableCoroutineScope(context: CoroutineContext) : Closeable, CoroutineScope {
    // 实现 CoroutineScope 接口,设置 CoroutineContext
    override val coroutineContext: CoroutineContext = context

    // 实现 Closeable 接口
    override fun close() {
        // 取消该 scope 中启动的全部 coroutines
        coroutineContext.cancel()
    }
}
复制代码

ViewModel.java

下面咱们带着如下疑问,来看下 ViewModel 的源码。ViewModel 的源码也就几十行。数据库

  • getTag 是如何实现的?
  • setTagIfAbsent 是如何实现的?
  • 为何 viewModelScope 中启动的线程会在 ViewModel 销毁时被自动取消。
public abstract class ViewModel {
    // Can't use ConcurrentHashMap, because it can lose values on old apis (see b/37042460)
    @Nullable
    private final Map<String, Object> mBagOfTags = new HashMap<>();
    private volatile boolean mCleared = false;

    /** * This method will be called when this ViewModel is no longer used and will be destroyed. * <p> * It is useful when ViewModel observes some data and you need to clear this subscription to * prevent a leak of this ViewModel. */
    @SuppressWarnings("WeakerAccess")
    protected void onCleared() {
    }

    @MainThread
    final void clear() {
        mCleared = true;
        // Since clear() is final, this method is still called on mock objects
        // and in those cases, mBagOfTags is null. It'll always be empty though
        // because setTagIfAbsent and getTag are not final so we can skip
        // clearing it
        if (mBagOfTags != null) {
            synchronized (mBagOfTags) {
                for (Object value : mBagOfTags.values()) {
                    // see comment for the similar call in setTagIfAbsent
                    closeWithRuntimeException(value);
                }
            }
        }
        onCleared();
    }

    /** * Sets a tag associated with this viewmodel and a key. * If the given {@code newValue} is {@link Closeable}, * it will be closed once {@link #clear()}. * <p> * If a value was already set for the given key, this calls do nothing and * returns currently associated value, the given {@code newValue} would be ignored * <p> * If the ViewModel was already cleared then close() would be called on the returned object if * it implements {@link Closeable}. The same object may receive multiple close calls, so method * should be idempotent. */
    <T> T setTagIfAbsent(String key, T newValue) {
        T previous;
        synchronized (mBagOfTags) {
            //noinspection unchecked
            previous = (T) mBagOfTags.get(key);
            if (previous == null) {
                mBagOfTags.put(key, newValue);
            }
        }
        T result = previous == null ? newValue : previous;
        if (mCleared) {
            // It is possible that we'll call close() multiple times on the same object, but
            // Closeable interface requires close method to be idempotent:
            // "if the stream is already closed then invoking this method has no effect." (c)
            closeWithRuntimeException(result);
        }
        return result;
    }

    /** * Returns the tag associated with this viewmodel and the specified key. */
    @SuppressWarnings("TypeParameterUnusedInFormals")
    <T> T getTag(String key) {
        //noinspection unchecked
        synchronized (mBagOfTags) {
            return (T) mBagOfTags.get(key);
        }
    }

    private static void closeWithRuntimeException(Object obj) {
        if (obj instanceof Closeable) {
            try {
                ((Closeable) obj).close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}
复制代码

能够看到:api

  • mBagOfTags 这个成员变量是一个 HashMap,用于存放键值对,getTag(String key) 就是从 mBagOfTags 取出该 key 对应的值,并作了强制类型转换。
  • setTagIfAbsent(String key, T newValue) 就是将该 newValue 存储到 mBagOfTags 中,以便下次取出使用。须要注意的是,若是想要给已经存在的 key 设置一个新值(newValue),将不会生效,新值会被忽略,而后返回已经存在的旧值(previous)。而且,若是 ViewModel#clear() 已经被系统调用(好比它的 Activity/Fragment 已经销毁)时(mCleared = true),新存储的值会调用 closeWithRuntimeException(Object obj)
  • ViewModel#clear() 中,会遍历 mBagOfTags,而后调用 closeWithRuntimeException(Object obj)
  • closeWithRuntimeException(Object obj) 方法中,若是这个 obj 是实现了 Closeable 接口的类的对象,就会调用它的 close 方法。

回到这个问题:为何 viewModelScope 中启动的线程会在 ViewModel 销毁时被自动取消? 如今就能够有答案了:由于 ViewModel 的扩展属性 viewModelScope 是一个实现了 Closeable 接口的 CloseableCoroutineScope,而且存放在了 ViewModel 的 mBagOfTags 中。因为 ViewModel#clear() 时会将 mBagOfTags 中全部实现了 Closeable 接口的类的对象关闭(close),因此会回调 CloseableCoroutineScope#close() 方法,并此方法内,取消了全部该 CoroutineScope 中的全部 Coroutines。缓存

Test

TestCoroutineDispatcher

ViewModel.kt 源码可知 viewModelScope 是运行在主线程中的(CloseableCoroutineScope(SupervisorJob() + Dispatchers.Main))。Dispatchers.Main 在 Android 平台是指 UI 线程,它的实现依赖 Looper.getMainLooper()。所以咱们没法使用 Unit tests 测试它们。 幸运的是,官方为咱们提供了测试依赖能够替换 Dispatchers.Main 的实现。bash

implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:$coroutines_version'
复制代码

咱们可使用该库提供的 Dispatchers.setMain(dispatcher: CoroutineDispatcher) 来重写它,而且它也为咱们提供了一个默认实现:TestCoroutineDispatcherTestCoroutineDispatcher 是一个 CoroutineDispatcher 的实现类,咱们可使用它控制 Coroutines 的执行,好比 pause/resume 或控制它的虚拟时钟。该类是在 Kotlin Coroutines v1.2.1 新增的,目前仍是一个实验性 API。能够查看官方文档

咱们不该在单元测试时使用 Dispatchers.Unconfined 来替换 Dispatchers.Main,它不会是咱们预想的那样验证结果或耗时。因为单元测试应该运行在一个隔离环境内,不受其它因素的影响,因此每次执行完一个测试,咱们要恢复初始状态,能够调用 Dispatchers.resetMain() 重置。 固然,咱们能够自定义 Rule 来避免样板代买:

@ExperimentalCoroutinesApi
class CoroutinesTestRule(
        val testDispatcher: TestCoroutineDispatcher = TestCoroutineDispatcher()
) : TestWatcher() {

    override fun starting(description: Description?) {
        super.starting(description)
        Dispatchers.setMain(testDispatcher)
    }

    override fun finished(description: Description?) {
        super.finished(description)
        Dispatchers.resetMain()
        testDispatcher.cleanupTestCoroutines()
    }
}
复制代码

而后咱们就能够在测试类中使用这个 Rule:

class MainViewModelUnitTest {

    @get:Rule
    var coroutinesTestRule = CoroutinesTestRule()

    @Test
    fun test() {
        ...
    }
}
复制代码

使用 Mockito 测试 Coroutines

咱们通常使用 Mockitoverify 方法验证对象的方法是否调用,但这并非一个完美的方式。咱们最好验证咱们的逻辑代码是否正确,好比某个元素是否存在。 在验证某个对象的方法是否调用前,咱们要确保全部已启动的 Coroutines 都执行完毕。举个例子:

class MainViewModel(private val dependency: Any): ViewModel {

  fun sampleMethod() {
    viewModelScope.launch {
      val hashCode = dependency.hashCode()
      // TODO: do something with hashCode
  }
}

class MainViewModelUnitTest {

  // Mockito setup goes here
  ...

  @get:Rule
  var coroutinesTestRule = CoroutinesTestRule()

  @Test
  fun test() = coroutinesTestRule.testDispatcher.runBlockingTest {
    val subject = MainViewModel(mockObject)
    subject.sampleMethod()
    // Checks mockObject called the hashCode method that is expected from the coroutine created in sampleMethod
    verify(mockObject).hashCode()
  }
}
复制代码

MainViewModelUnitTest 测试类中,咱们使用了 TestCoroutineDispatcher 提供的 runBlockingTest 函数。因为 TestCoroutineDispatcher 重写了 Dispatchers.Main,因此 MainViewModel 中的 Coroutines 将会在这个 Dispatcher 中运行。runBlockingTest 函数能够保证全部测试代码中的 Coroutines 都会同步执行。所以 verify 方法也将会在全部 Coroutines 运行完后才会执行验证行为。

LifecycleScope

Usage

LifecycleScope 是 Lifecyle 的一个扩展属性,所以咱们能够在任何能够拿到 Lifecyle 的地方(通常是 Activity/Fragment)使用它。每一个在 LifecycleScope 中启动的 Coroutine 都会在 Lifecycle 销毁的时候自动取消(cancel)。咱们能够经过 lifecycle.coroutineScopelifecycleOwner.lifecycleScope 使用 Lifecycle 的 LifecycleScope。 下面这个例子,示范了怎么使用 lifecycleOwner.lifecycleScope 来异步建立预计算的文本。

class MyFragment: Fragment() {
    override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
        super.onViewCreated(view, savedInstanceState)
        viewLifecycleOwner.lifecycleScope.launch {
            val params = TextViewCompat.getTextMetricsParams(textView)
            val precomputedText = withContext(Dispatchers.Default) {
                PrecomputedTextCompat.create(longTextContent, params)
            }
            TextViewCompat.setPrecomputedText(textView, precomputedText)
        }
    }
}
复制代码

Soure code

Lifecyle.kt

/** * [CoroutineScope] tied to this [Lifecycle]. * * This scope will be cancelled when the [Lifecycle] is destroyed. * * This scope is bound to [Dispatchers.Main] */
val Lifecycle.coroutineScope: LifecycleCoroutineScope
    get() {
        while (true) {
            val existing = mInternalScopeRef.get() as LifecycleCoroutineScopeImpl?
            if (existing != null) {
                return existing
            }
            val newScope = LifecycleCoroutineScopeImpl(
                this,
                SupervisorJob() + Dispatchers.Main
            )
            if (mInternalScopeRef.compareAndSet(null, newScope)) {
                newScope.register()
                return newScope
            }
        }
    }

/** * [CoroutineScope] tied to a [Lifecycle] and [Dispatchers.Main] * * This scope will be cancelled when the [Lifecycle] is destroyed. * * This scope provides specialised versions of `launch`: [launchWhenCreated], [launchWhenStarted], * [launchWhenResumed] */
abstract class LifecycleCoroutineScope internal constructor() : CoroutineScope {
    internal abstract val lifecycle: Lifecycle

    /** * Launches and runs the given block when the [Lifecycle] controlling this * [LifecycleCoroutineScope] is at least in [Lifecycle.State.CREATED] state. * * The returned [Job] will be cancelled when the [Lifecycle] is destroyed. * @see Lifecycle.whenCreated * @see Lifecycle.coroutineScope */
    fun launchWhenCreated(block: suspend CoroutineScope.() -> Unit): Job = launch {
        lifecycle.whenCreated(block)
    }

    /** * Launches and runs the given block when the [Lifecycle] controlling this * [LifecycleCoroutineScope] is at least in [Lifecycle.State.STARTED] state. * * The returned [Job] will be cancelled when the [Lifecycle] is destroyed. * @see Lifecycle.whenStarted * @see Lifecycle.coroutineScope */

    fun launchWhenStarted(block: suspend CoroutineScope.() -> Unit): Job = launch {
        lifecycle.whenStarted(block)
    }

    /** * Launches and runs the given block when the [Lifecycle] controlling this * [LifecycleCoroutineScope] is at least in [Lifecycle.State.RESUMED] state. * * The returned [Job] will be cancelled when the [Lifecycle] is destroyed. * @see Lifecycle.whenResumed * @see Lifecycle.coroutineScope */
    fun launchWhenResumed(block: suspend CoroutineScope.() -> Unit): Job = launch {
        lifecycle.whenResumed(block)
    }
}

internal class LifecycleCoroutineScopeImpl(
    override val lifecycle: Lifecycle,
    override val coroutineContext: CoroutineContext
) : LifecycleCoroutineScope(), LifecycleEventObserver {
    init {
        // in case we are initialized on a non-main thread, make a best effort check before
        // we return the scope. This is not sync but if developer is launching on a non-main
        // dispatcher, they cannot be 100% sure anyways.
        if (lifecycle.currentState == Lifecycle.State.DESTROYED) {
            coroutineContext.cancel()
        }
    }

    fun register() {
        // TODO use Main.Immediate once it is graduated out of experimental.
        launch(Dispatchers.Main) {
            if (lifecycle.currentState >= Lifecycle.State.INITIALIZED) {
                lifecycle.addObserver(this@LifecycleCoroutineScopeImpl)
            } else {
                coroutineContext.cancel()
            }
        }
    }

    override fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) {
        if (lifecycle.currentState <= Lifecycle.State.DESTROYED) {
            lifecycle.removeObserver(this)
            coroutineContext.cancel()
        }
    }
}
复制代码

LifecycleOwner.kt

/** * [CoroutineScope] tied to this [LifecycleOwner]'s [Lifecycle]. * * This scope will be cancelled when the [Lifecycle] is destroyed. * * This scope is bound to [Dispatchers.Main]. */
val LifecycleOwner.lifecycleScope: LifecycleCoroutineScope
    get() = lifecycle.coroutineScope

复制代码

Suspend Lifecycle-aware coroutines

尽管 CoroutineScope 为咱们提供了一种合适的方式来自动取消耗时操做,但在有些状况下,Lifecycle 不在一个肯定的状态时,也就是说是一个暂态,随时可能会转移到其余状态时,咱们可能想挂起(suspend)咱们的方法执行。举个栗子,咱们必须在 Lifecycle 的状态大于等于 STARTED 时,才可使用 FragmentTransaction。对于这些状况,Lifecycle 也提供了对应的扩展方法:lifecycle.whenCreatedlifecycle.whenStartedlifecycle.whenResumed。若是 Lifecycle 没有到达指望的最小状态时,运行在这些方法内的全部 Coroutines 都会被挂起。 下面这个例子展现了至少须要 Lifecycle 达到 STARTED 状态时,才会执行 lifecycle.whenStarted 里面的代码。

class MyFragment: Fragment {
    init { // Notice that we can safely launch in the constructor of the Fragment.
        lifecycleScope.launch {
            whenStarted {
                // The block inside will run only when Lifecycle is at least STARTED.
                // It will start executing when fragment is started and
                // can call other suspend methods.
                loadingView.visibility = View.VISIBLE
                val canAccess = withContext(Dispatchers.IO) {
                    checkUserAccess()
                }

                // When checkUserAccess returns, the next line is automatically
                // suspended if the Lifecycle is not *at least* STARTED.
                // We could safely run fragment transactions because we know the
                // code won't run unless the lifecycle is at least STARTED.
                loadingView.visibility = View.GONE
                if (canAccess == false) {
                    findNavController().popBackStack()
                } else {
                    showContent()
                }
            }

            // This line runs only after the whenStarted block above has completed.

        }
    }
}
复制代码

Lifecyle 销毁时,使用这些 when 方法启动的 Coroutines 将会被自动取消。下面的例子,一旦 Lifecyle 的状态变为 DESTROYED, finally 代码块会当即执行。

class MyFragment: Fragment {
    init {
        lifecycleScope.launchWhenStarted {
            try {
                // Call some suspend functions.
            } finally {
                // This line might execute after Lifecycle is DESTROYED.
                if (lifecycle.state >= STARTED) {
                    // Here, since we've checked, it is safe to run any
                    // Fragment transactions.
                }
            }
        }
    }
}
复制代码

注意:尽管这些 Lifecycle 的扩展属性或扩展方法为咱们提供了很大的便利,但在 Lifecyle 生命周期内,咱们最好在能保证消息有效的状况下使用它们(好比上面的 precomputed text)。 另外须要注意的是,Activity 重启(restart) 时,这些 Coroutines 不会被重启(restart)。

Use coroutines with LiveData

咱们在使用 LiveData 时,常常须要异步获取数据而后设置给 LiveData。好比,咱们可能须要读取用户设置,而后展现到 UI 上。这时,咱们可使用 liveData 扩展构造函数,在该函数内能够调用 suspend 函数获取数据并将结果传递给 LiveData。 以下所示,loadUser() 是一个 suspend 函数,经过查询数据库返回一个 User 对象。使用 liveData 扩展构造函数,咱们能够异步调用 loadUser(),而后使用 emit() 方法改变 LiveDatavalue

val user: LiveData<User> = liveData {
    val data = database.loadUser() // loadUser is a suspend function.
    emit(data)
}
复制代码

liveData 扩展构造函数为 Coroutines 和 LiveData 提供告终构化并发(Structured concurrency)的支持。livedata 包含的代码块会在 LiveData 变为 active 时自动执行,并在 LiveData 变为 inactive 时在一个可配置的时间后自动取消。若该代码块在执行完以前就被取消了,那么在 LiveData 再次 active 时,代码块也会从新执行。但若它已经执行完毕,则不会从新执行。而且该代码块只会在被自动取消的状况下才会在 LiveData 再次 active 时从新执行。 若是该代码块因为其余缘由(好比抛出 CancelationExeption)被取消了,它也不会从新从新执行。 咱们能够在 livedata 扩展构造函数内发射多个值。每次发射(调用 emit())都会执行主线程的 suspend 函数,直到该值被设置给 LiveData

val user: LiveData<Result> = liveData {
    emit(Result.loading())
    try {
        emit(Result.success(fetchUser())
    } catch(ioException: Exception) {
        emit(Result.error(ioException))
    }
}
复制代码

咱们也可使用在 Transformations 提供的操做符内使用 liveData,以下所示:

class MyViewModel: ViewModel() {
    private val userId: LiveData<String> = MutableLiveData()
    val user = userId.switchMap { id ->
        liveData(context = viewModelScope.coroutineContext + Dispatchers.IO) {
            emit(database.loadUserById(id))
        }
    }
}
复制代码

咱们也能够在任什么时候候使用 emitSource() 函数发射多个值给 LiveData。但要注意:每次调用 emit()emitSource() 都会清除以前添加的值。 可见源码(LiveDataScope)。

class UserDao: Dao {
    @Query("SELECT * FROM User WHERE id = :id")
    fun getUser(id: String): LiveData<User>
}

class MyRepository {
    fun getUser(id: String) = liveData<User> {
        val disposable = emitSource(
            userDao.getUser(id).map {
                Result.loading(it)
            }
        )
        try {
            val user = webservice.fetchUser(id)
            // Stop the previous emission to avoid dispatching the updated user
            // as `loading`.
            disposable.dispose()
            // Update the database.
            userDao.insert(user)
            // Re-establish the emission with success type.
            emitSource(
                userDao.getUser(id).map {
                    Result.success(it)
                }
            )
        } catch(exception: IOException) {
            // Any call to `emit` disposes the previous one automatically so we don't
            // need to dispose it here as we didn't get an updated value.
            emitSource(
                userDao.getUser(id).map {
                    Result.error(exception, it)
                }
            )
        }
    }
}
复制代码

Reference

联系

我是 xiaobailong24,您能够经过如下平台找到我:

相关文章
相关标签/搜索