【译】使用Kotlin从零开始写一个现代Android 项目-Part3


这是《使用Kotlin开发一个现代的APP》系列文章的第三部分,还没看过前2部分的,能够先看一下:java

【译】使用Kotlin从零开始写一个现代Android 项目-Part1react

【译】使用Kotlin从零开始写一个现代Android 项目-Part2android

正文开始!git

什么是RxJava ?

关于RxJava,一个普遍的概念是-RxJava是用于异步编程的API的Java实现,它具备可观察流和响应式的API。实际上,它是这三个概念的结合:观察者模式、迭代器模式和函数式编程。这里也有其余编程语言实现的库,如:RxSwift、RxJs 、RxNet等。github

我RxJava上手很难,有时,它确实很使人困惑,若是实施不当,可能会给您带来一些问题。尽管如此,咱们仍是值得花时间学习它。我将尝试经过简单的步骤来解释RxJava。面试

首先,让咱们回答一些简单的问题,当您开始阅读有关RxJava时,可能会问本身:编程

咱们真的须要它吗?

答案是否认的,RxJava只是能够在Android开发中使用的又一个库。若是使用Kotlin开发,它也不是必须的,我但愿你明白我说的,它只一个很帮助你的库,就像你使用的因此其余库同样。后端

要学习RxJava2,必须先学RxJava1吗?

你能够直接从RxJava2开始,不过,做为Android开发人员,知道这两种状况对你仍是有好处的,由于你可能会参与维护其余人的RxJava1代码。数组

我看到有RxAndroid,应该使用RxAndroid仍是RxJava?

RxJava能用在任何Java开发平台,不只仅是Android,好比,对于后端开发来讲,RxJava 能够与Spring等框架一块儿使用,RxAndroid是一个库,其中包含在Android中使用RxJava所需的库。所以,若是要在Android开发中使用RxJava,则必须再添加RxAndroid。稍后,我将解释RxAndroid基于RxJava所添加的内容。微信

咱们使用Kotlin开发,为何不用RxKotin呢?

咱们没有必要另外再添加一个Rx 库了,由于Kotlin与Java是彻底兼容的,这里确实有一个RxKotin库:https://github.com/ReactiveX/... ,不过该库是在RxJava之上编写的。它只是将Kotlin功能添加到RxJava。您能够将RxJava与Kotlin一块儿使用,而无需使用RxKotlin库。为了简单起见,在这一部分中我将不使用RxKotlin。

如何将Rxjava2添加到项目中?

要使用RxJava,你须要在build.gradle中添加以下代码:

dependencies {
    ... 
    implementation "io.reactivex.rxjava2:rxjava:2.1.8"
    implementation "io.reactivex.rxjava2:rxandroid:2.0.1"
    ...
}

而后,点击sync,下载Rxjava库。

RxJava包含了些啥?

我想把RxJava分为如下三部分:

  • 一、用于观察者模式和数据流的类:ObservablesObservers
  • 二、Schedulers
  • 三、数据流操做符
ObservablesObservers

咱们已经解释了这种模式。您能够将Observable视为数据的源(被观察者),将Observer视为接收数据的源(观察者)。

有不少建立Observables的方式,最简单的方法是使用Observable.just()来获取一个项目并建立Observable来发射该项目。

让咱们转到GitRepoRemoteDataSource类并更改getRepositories方法,以返回Observable:

class GitRepoRemoteDataSource {

    fun getRepositories() : Observable<ArrayList<Repository>> {
        var arrayList = ArrayList<Repository>()
        arrayList.add(Repository("First from remote", "Owner 1", 100, false))
        arrayList.add(Repository("Second from remote", "Owner 2", 30, true))
        arrayList.add(Repository("Third from remote", "Owner 3", 430, false))

        return Observable.just(arrayList).delay(2,TimeUnit.SECONDS)
    }
}

Observable <ArrayList <Repository >>表示Observable发出Repository对象的数组列表。若是要建立发出Repository对象的Observable <Repository>,则应使用Observable.from(arrayList)

.delay(2,TimeUnit.SECONDS)表示延迟2s后才开始发射数据。

可是,等等!咱们并无高数Observable什么时候发射数据啊?Observables一般在一些Observer订阅后就开始发出数据。

请注意,咱们再也不须要如下接口了

interface OnRepoRemoteReadyCallback {
    fun onRemoteDataReady(data: ArrayList<Repository>)
}

GitRepoLocalDataSource:类中作一样的更改

class GitRepoLocalDataSource {

    fun getRepositories() : Observable<ArrayList<Repository>> {
        var arrayList = ArrayList<Repository>()
        arrayList.add(Repository("First From Local", "Owner 1", 100, false))
        arrayList.add(Repository("Second From Local", "Owner 2", 30, true))
        arrayList.add(Repository("Third From Local", "Owner 3", 430, false))

        return Observable.just(arrayList).delay(2, TimeUnit.SECONDS)
    }

    fun saveRepositories(arrayList: ArrayList<Repository>) {
        //todo save repositories in DB
    }
}

一样的,也不须要这个接口了:

interface OnRepoLocalReadyCallback {
    fun onLocalDataReady(data: ArrayList<Repository>)
}

如今,咱们须要在repository中返回Observable

class GitRepoRepository(private val netManager: NetManager) {

    private val localDataSource = GitRepoLocalDataSource()
    private val remoteDataSource = GitRepoRemoteDataSource()

    fun getRepositories(): Observable<ArrayList<Repository>> {

        netManager.isConnectedToInternet?.let {
            if (it) {
                //todo save those data to local data store
                return remoteDataSource.getRepositories()
            }
        }

        return localDataSource.getRepositories()
    }
}

若是网络已链接,咱们从远程数据源返回Observable,不然,从本地数据源返回Observable,一样的,咱们也再也不须要OnRepositoryReadyCallback接口。

如你所料,咱们须要更改在MainViewModel中获取数据的方式。如今咱们应该从gitRepoRepository获取Observable并订阅它。一旦咱们向Observer订阅了该Observable,Observable将开始发出数据:

class MainViewModel(application: Application) : AndroidViewModel(application) {
    ...

    fun loadRepositories() {
        isLoading.set(true)
        gitRepoRepository.getRepositories().subscribe(object: Observer<ArrayList<Repository>>{
            override fun onSubscribe(d: Disposable) {
                //todo
            }

            override fun onError(e: Throwable) {
               //todo
            }

            override fun onNext(data: ArrayList<Repository>) {
                repositories.value = data
            }

            override fun onComplete() {
                isLoading.set(false)
            }
        })
    }
}

一旦Observer订阅了Observable,onSubscribe方法将被调用,主要onSubscribe的参数Disposable,稍后将讲到它。

每当Observable发出数据时,将调用onNext()方法。当Observable完成s数据发射时,onComplete()将被调用一次。以后,Observable终止。

若是发生某些异常,onError()方法将被回调,而后Observable终止。这意味着Observable将再也不发出数据,所以onNext()不会被调用,也不会调用onComplete()

另外,请注意。若是尝试订阅已终止的Observable,则将收到IllegalStateException

那么,RxJava如何帮助咱们?

  • 首先,咱们摆脱了这些接口,它是全部repository和数据源创建的样板接口。
  • 若是咱们使用接口,而且在数据层中发生某些异常,则咱们的应用程序可能会崩溃。使用RxJava错误将在onError()方法中返回,所以咱们能够向用户显示适当的错误消息。
  • 由于咱们始终将RxJava用于数据层,它更清晰。
  • 我以前没有告诉过你:之前的方法可能会致使内存泄漏。
使用RxJava2和ViewModel时,如何防止内存泄漏

咱们再一次看一下ViewModel的生命周期图

一旦Activity销毁,ViewModel的onCleared方法将被调用,在onCleared方法中,咱们须要取消全部订阅

class MainViewModel(application: Application) : AndroidViewModel(application) {
    ...
    
    lateinit var disposable: Disposable

    fun loadRepositories() {
        isLoading.set(true)
        gitRepoRepository.getRepositories().subscribe(object: Observer<ArrayList<Repository>>{
            override fun onSubscribe(d: Disposable) {
                disposable = d
            }

            override fun onError(e: Throwable) {
                //if some error happens in our data layer our app will not crash, we will
                // get error here
            }

            override fun onNext(data: ArrayList<Repository>) {
                repositories.value = data
            }

            override fun onComplete() {
                isLoading.set(false)
            }
        })
    }

    override fun onCleared() {
        super.onCleared()
        if(!disposable.isDisposed){
            disposable.dispose()
        }
    }
}

咱们能够优化一下上面的代码:

首先,使用DisposableObserver替换Observer,它实现了Disposable而且有dispose()方法,咱们再也不须要onSubscribe()方法,由于咱们能够直接在DisposableObserver实例上调用dispose()

第二步,替换掉返回Void的.subscribe()方法,使用.subscribeWith() 方法,他能返回指定的Observer

class MainViewModel(application: Application) : AndroidViewModel(application) {
    ...

    lateinit var disposable: Disposable

    fun loadRepositories() {
        isLoading.set(true)
        disposable = gitRepoRepository.getRepositories().subscribeWith(object: DisposableObserver<ArrayList<Repository>>() {

            override fun onError(e: Throwable) {
               // todo
            }

            override fun onNext(data: ArrayList<Repository>) {
                repositories.value = data
            }

            override fun onComplete() {
                isLoading.set(false)
            }
        })
    }

    override fun onCleared() {
        super.onCleared()
        if(!disposable.isDisposed){
            disposable.dispose()
        }
    }
}

上面的代码还能够继续优化:

咱们保存了一个Disposable实例,所以,咱们才能够在onCleared()回调中调用dispose(),可是等等!咱们须要为每个调用都这样作吗?若是有10个回调,那么咱们得保存10个实例,在onCleared()中取消10次订阅?显然不可能,这里有更好的方法,咱们应该将它们所有保存在一个存储桶中,并在调用onCleared()方法时,将它们所有一次处理。咱们可使用CompositeDisposable

CompositeDisposable:可容纳多个Disposable的容器

所以,每次建立一个Disposable,都须要将其添加到CompositeDisposable中:

class MainViewModel(application: Application) : AndroidViewModel(application) {
    ...
  
    private val compositeDisposable = CompositeDisposable()

    fun loadRepositories() {
        isLoading.set(true)
        compositeDisposable.add(gitRepoRepository.getRepositories().subscribeWith(object: DisposableObserver<ArrayList<Repository>>() {

            override fun onError(e: Throwable) {
                //if some error happens in our data layer our app will not crash, we will
                // get error here
            }

            override fun onNext(data: ArrayList<Repository>) {
                repositories.value = data
            }

            override fun onComplete() {
                isLoading.set(false)
            }
        }))
    }

    override fun onCleared() {
        super.onCleared()
        if(!compositeDisposable.isDisposed){
            compositeDisposable.dispose()
        }
    }
}

感谢Kotlin的扩展函数,咱们还能够更进一步:

与C#和Gosu类似,Kotlin提供了使用新功能扩展类的能力,而没必要继承该类,也就是扩展函数。

让咱们建立一个新的包,叫作extensions,而且添加一个新的文件RxExtensions.kt

import io.reactivex.disposables.CompositeDisposable
import io.reactivex.disposables.Disposable

operator fun CompositeDisposable.plusAssign(disposable: Disposable) {
    add(disposable)
}

如今咱们可使用+ =符号将Disposable对象添加到CompositeDisposable实例:

class MainViewModel(application: Application) : AndroidViewModel(application) {
    ...

    private val compositeDisposable = CompositeDisposable()

    fun loadRepositories() {
        isLoading.set(true)
        compositeDisposable += gitRepoRepository.getRepositories().subscribeWith(object : DisposableObserver<ArrayList<Repository>>() {

            override fun onError(e: Throwable) {
                //if some error happens in our data layer our app will not crash, we will
                // get error here
            }

            override fun onNext(data: ArrayList<Repository>) {
                repositories.value = data
            }

            override fun onComplete() {
                isLoading.set(false)
            }
        })
    }

    override fun onCleared() {
        super.onCleared()
        if (!compositeDisposable.isDisposed) {
            compositeDisposable.dispose()
        }
    }
}

如今,咱们运行程序,当你点击Load Data按钮,2s以后,程序crash,而后,若是查看日志,您将看到onNext方法内部发生错误,而且异常的缘由是:

java.lang.IllegalStateException: Cannot invoke setValue on a background thread

为什么会发生这个异常?

Schedulers

RxJava附带有调度器(Schedulers),使咱们能够选择在哪一个线程代码上执行。更准确地说,咱们能够选择使用subscribeOn()方在哪一个线程执行,observeOn()方法能够观察哪一个线程观察者。一般状况下,咱们全部的数据层代码都应该在后台线程执行,例如,若是咱们使用Schedulers.newThread(),每当咱们调用它时,调度器都会给咱们分配一个新的线程,为了简单起见,Scheduler中还有其余一些方法,我将不在本博文中介绍。

可能您已经知道全部UI代码都是在Android 主线程上完成的。 RxJava是Java库,它不了解Android主线程,这就是咱们使用RxAndroid的缘由。 RxAndroid使咱们能够选择Android Main线程做为执行代码的线程。显然,咱们的Observer应该在Android Main线程上运行。

让咱们更改一下代码:

...
fun loadRepositories() {
        isLoading.set(true)
        compositeDisposable += gitRepoRepository
                .getRepositories()
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeWith(object : DisposableObserver<ArrayList<Repository>>() {
              ...
        })
    }
...

而后再运行代码,一切都正常了,nice~

其余 observables types

这里还有一些其余的observable 类型

  • Single<T>: 被观察者仅发射一个数据,或者是一个异常
  • Maybe<T>: 被观察者不发射数据,或者仅发射一个数据,或者是一个异常
  • Completable : 发射onSuccess()事件或者异常
  • Flowable<T>:和 Observable<T> 同样,不发射数据,或者发射n个数据,或者发射异常,可是Observable不支持背压,而Flowable却支持。
什么是背压(backpressure)?

为了记住一些概念,我喜欢将它们与现实中的一些例子类比

把它类比成一个通道,若是你向通道中塞入瓶颈可以接受的最多的商品,这将会变得很糟,这里也是一样的,有时,你的观察者没法处理其收到的事件数量,所以须要放慢速度。

你能够看看RxJava 关于背压的文档:https://github.com/ReactiveX/...

操做符

RxJava中,最牛逼的就是它的操做符了,仅用一行代码便可在RxJava中解决一些一般须要10行或更多行的问题。这些是操做符能够帮咱们作的:

  • 合并observables
  • 过滤
  • 按条件来作操做
  • 将observables 转换为其余类型

我给你举一个例子,让咱们将数据保存到GitRepoLocalDataSource中。由于咱们正在保存数据,因此咱们须要Completable来模拟它。假设咱们还想模拟1秒的延迟。天真的方法是:

fun saveRepositories(arrayList: ArrayList<Repository>): Completable {
    return Completable.complete().delay(1,TimeUnit.SECONDS)
}

为何说天真?

Completable.complete()返回一个Completable实例,该实例在订阅后当即完成。

一旦Completable 完成后,它将终止。所以,以后将不执行任何运算符(延迟是运算符之一)。在这种状况下,咱们的Completable不会有任何延迟。让咱们找解决方法:

fun saveRepositories(arrayList: ArrayList<Repository>): Completable {
    return Single.just(1).delay(1,TimeUnit.SECONDS).toCompletable()
}

为何是这种方式?

Single.just(1)建立一个Single实例,而且仅发射一个数字1,由于咱们用了delay(1,TimeUnit.SECONDS) ,所以发射操做延迟1s。

toCompletable()返回一个Completable,它丢弃Single的结果,并在此Single调用 onSuccess时调用 onComplete

所以,上面的代码将返回Completable,而且1s后调用onComplete()

如今,咱们应该更改咱们的GitRepoRepository。让咱们回顾一下逻辑。咱们检查互联网链接。若是有互联网链接,咱们从远程数据源获取数据,将其保存在本地数据源中并返回数据。不然,咱们仅从本地数据源获取数据。看一看:

fun getRepositories(): Observable<ArrayList<Repository>> {

    netManager.isConnectedToInternet?.let {
        if (it) {
            return remoteDataSource.getRepositories().flatMap {
                return@flatMap localDataSource.saveRepositories(it)
                        .toSingleDefault(it)
                        .toObservable()
            }
        }
    }

    return localDataSource.getRepositories()
}

使用了.flatMap,一旦remoteDataSource.getRepositories()发射数据,该项目将被映射到发出相同项目的新Observable。咱们从Completable建立的新Observable发射的相同项目保存在本地数据存储中,而且将其转换为发出相同发射项的Single。由于咱们须要返回Observable,因此咱们必须将Single转换为Observable。

很疯狂,huh? 想象一下RxJava还能为咱们作些啥!

RxJava是一个很是有用的工具,去使用它,探索它,我相信你会爱上它的!

以上就是本文得所有内容,下一篇文章将是本系列的最后一篇文章,敬请期待!

本系列已更新完毕:

【译】使用Kotlin从零开始写一个现代Android 项目-Part1

【译】使用Kotlin从零开始写一个现代Android 项目-Part2

【译】使用Kotlin从零开始写一个现代Android 项目-Part3

【译】使用Kotlin从零开始写一个现代Android 项目-Part4

文章首发于公众号: 「 技术最TOP 」,天天都有干货文章持续更新,能够微信搜索 「 技术最TOP 」第一时间阅读,回复【思惟导图】【面试】【简历】有我准备一些Android进阶路线、面试指导和简历模板送给你

相关文章
相关标签/搜索