RxJava2 实战知识梳理(8) 使用 publish + merge 优化先加载缓存,再读取网络数据的请求过程

1、前言

在不少资讯应用当中,当咱们进入一个新的页面,为了提高用户体验,不让页面空白过久,咱们通常会先读取缓存中的数据,再去请求网络。java

今天这篇文章,咱们将实现下面这个效果:同时发起读取缓存、访问网络的请求,若是缓存的数据先回来,那么就先展现缓存的数据,而若是网络的数据先回来,那么就再也不展现缓存的数据。缓存

为了让你们对这一过程有更深入的理解,咱们介绍"先加载缓存,再请求网络"这种模型的四种实现方式,其中第四种实现能够达到上面咱们所说的效果,而前面的三种实现虽然也可以实现相同的需求,而且能够正常工做,可是在某些特殊状况下,会出现意想不到的状况:网络

  • 使用concat实现
  • 使用concatEager实现
  • 使用merge实现
  • 使用publish实现

2、示例

2.1 准备工做

咱们须要准备两个Observable,分别表示 缓存数据源网络数据源,在其中填入相应的缓存数据和网络数据,为了以后演示一些特殊的状况,咱们能够在建立它的时候指定它执行的时间:app

//模拟缓存数据源。
    private Observable<List<NewsResultEntity>> getCacheArticle(final long simulateTime) {
        return Observable.create(new ObservableOnSubscribe<List<NewsResultEntity>>() {
            @Override
            public void subscribe(ObservableEmitter<List<NewsResultEntity>> observableEmitter) throws Exception {
                try {
                    Log.d(TAG, "开始加载缓存数据");
                    Thread.sleep(simulateTime);
                    List<NewsResultEntity> results = new ArrayList<>();
                    for (int i = 0; i < 10; i++) {
                        NewsResultEntity entity = new NewsResultEntity();
                        entity.setType("缓存");
                        entity.setDesc("序号=" + i);
                        results.add(entity);
                    }
                    observableEmitter.onNext(results);
                    observableEmitter.onComplete();
                    Log.d(TAG, "结束加载缓存数据");
                } catch (InterruptedException e) {
                    if (!observableEmitter.isDisposed()) {
                        observableEmitter.onError(e);
                    }
                }
            }
        });
    }
    //模拟网络数据源。
    private Observable<List<NewsResultEntity>> getNetworkArticle(final long simulateTime) {
        return Observable.create(new ObservableOnSubscribe<List<NewsResultEntity>>() {
            @Override
            public void subscribe(ObservableEmitter<List<NewsResultEntity>> observableEmitter) throws Exception {
                try {
                    Log.d(TAG, "开始加载网络数据");
                    Thread.sleep(simulateTime);
                    List<NewsResultEntity> results = new ArrayList<>();
                    for (int i = 0; i < 10; i++) {
                        NewsResultEntity entity = new NewsResultEntity();
                        entity.setType("网络");
                        entity.setDesc("序号=" + i);
                        results.add(entity);
                    }
                    observableEmitter.onNext(results);
                    observableEmitter.onComplete();
                    Log.d(TAG, "结束加载网络数据");
                } catch (InterruptedException e) {
                    if (!observableEmitter.isDisposed()) {
                        observableEmitter.onError(e);
                    }
                }
            }
        });
    }
复制代码

在最终的下游,咱们接收数据,并在页面上经过RecyclerView进行展现:ide

private DisposableObserver<List<NewsResultEntity>> getArticleObserver() {
        return new DisposableObserver<List<NewsResultEntity>>() {

            @Override
            public void onNext(List<NewsResultEntity> newsResultEntities) {
                mNewsResultEntities.clear();
                mNewsResultEntities.addAll(newsResultEntities);
                mNewsAdapter.notifyDataSetChanged();
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "加载错误, e=" + throwable);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "加载完成");
            }
        };
    }
复制代码

2.2 使用 concat 实现

concat是不少文章都推荐使用的方式,由于它不会有任何问题,实现代码以下:函数

private void refreshArticleUseContact() {
        Observable<List<NewsResultEntity>> contactObservable = Observable.concat(
                getCacheArticle(500).subscribeOn(Schedulers.io()), getNetworkArticle(2000).subscribeOn(Schedulers.io()));
        DisposableObserver<List<NewsResultEntity>> disposableObserver = getArticleObserver();
        contactObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
    }
复制代码

上面这段代码的运行结果为: 优化

从控制台的输出能够看到,整个过程是先取读取缓存,等缓存的数据读取完毕以后,才开始请求网络,所以整个过程的耗时为两个阶段的相加,即 2500ms

它的原理图以下所示:
concat 原理图
从原理图中也验证了咱们前面的现象,它会链接多个 Observable,而且必需要等到前一个 Observable的全部数据项都发送完以后,才会开始下一个 Observable数据的发送。

那么,concat操做符的缺点是什么呢?很明显,咱们白白浪费了前面读取缓存的这段时间,能不能同时发起读取缓存和网络的请求,而不是等到读取缓存完毕以后,才去请求网络呢?spa

2.3 使用 concatEager 实现

为了解决前面没有同时发起请求的问题,咱们可使用concatEager,它的使用方法以下:3d

private void refreshArticleUseConcatEager() {
        List<Observable<List<NewsResultEntity>>> observables = new ArrayList<>();
        observables.add(getCacheArticle(500).subscribeOn(Schedulers.io()));
        observables.add(getNetworkArticle(2000).subscribeOn(Schedulers.io()));
        Observable<List<NewsResultEntity>> contactObservable = Observable.concatEager(observables);
        DisposableObserver<List<NewsResultEntity>> disposableObserver = getArticleObserver();
        contactObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
    }
复制代码

它和concat最大的不一样就是多个Observable能够同时开始发射数据,若是后一个Observable发射完成后,前一个Observable还有发射完数据,那么它会将后一个Observable的数据先缓存起来,等到前一个Observable发射完毕后,才将缓存的数据发射出去。code

上面代码中,请求缓存的时长改成500ms,而请求网络的时长改成2000ms,运行结果为:

那么这种实现方式的缺点是什么呢?就是在某些异常状况下,若是读取缓存的时间要大于网络请求的时间,那么就会致使出现“网络请求的结果”等待“读取缓存”这一过程完成后才能传递给下游,白白浪费了一段时间。

咱们将请求缓存的时长改成2000ms,而请求网络的时长改成500ms,查看控制台的输出,能够验证上面的结论:

2.4 使用 merge 实现

下面,咱们来看一下merge操做符的示例:

private void refreshArticleUseMerge() {
        Observable<List<NewsResultEntity>> contactObservable = Observable.merge(
                getCacheArticle(500).subscribeOn(Schedulers.io()), getNetworkArticle(2000).subscribeOn(Schedulers.io()));
        DisposableObserver<List<NewsResultEntity>> disposableObserver = getArticleObserver();
        contactObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
    }
复制代码

merge的原理图以下所示:

merge 原理图
它和 concatEager同样,会让多个 Observable同时开始发射数据,可是它不须要 Observable之间的互相等待,而是直接发送给下游。

当缓存时间为500ms,而请求网络时间为2000ms时,它的结果为:

在读取缓存的时间小于请求网络的时间时,这个操做符可以很好的工做,可是反之,就会出现咱们先展现了网络的数据,而后又被刷新成旧的缓存数据。
发生该异常时的现象以下所示:

2.5 使用 publish 实现

使用publish的实现以下所示:

private void refreshArticleUsePublish() {
        Observable<List<NewsResultEntity>> publishObservable = getNetworkArticle(2000).subscribeOn(Schedulers.io()).publish(new Function<Observable<List<NewsResultEntity>>, ObservableSource<List<NewsResultEntity>>>() {

            @Override
            public ObservableSource<List<NewsResultEntity>> apply(Observable<List<NewsResultEntity>> network) throws Exception {
                return Observable.merge(network, getCacheArticle(500).subscribeOn(Schedulers.io()).takeUntil(network));
            }

        });
        DisposableObserver<List<NewsResultEntity>> disposableObserver = getArticleObserver();
        publishObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
    }
复制代码

这里面一共涉及到了三个操做符,publishmergetakeUnti,咱们先来看一下它可否解决咱们以前三种方式的缺陷:

  • 读取缓存的时间为500ms,请求网络的时间为2000ms

  • 读取缓存的时间为2000ms,请求网络的时间为500ms

能够看到,在读取缓存的时间大于请求网络时间的时候,仅仅只会展现网络的数据,显示效果为:
而且读取缓存和请求网络是同时发起的,很好地解决了前面几种实现方式的缺陷。

这里要感谢简友 无意下棋 在评论里提到的问题:若是网络请求先返回时发生了错误(例如没有网络等)致使发送了onError事件,从而使得缓存的Observable也没法发送事件,最后界面显示空白。

针对这个问题,咱们须要对网络的Observable进行优化,让其不将onError事件传递给下游。其中一种解决方式是经过使用onErrorResume操做符,它能够接收一个Func函数,其形参为网络发送的错误,而在上游发生错误时会回调该函数。咱们能够根据错误的类型来返回一个新的Observable,让订阅者镜像到这个新的Observable,而且忽略onError事件,从而避免onError事件致使整个订阅关系的结束。

这里为了不订阅者在镜像到新的Observable时会收到额外的时间,咱们返回一个Observable.never(),它表示一个永远不发送事件的上游。

private Observable<List<NewsResultEntity>> getNetworkArticle(final long simulateTime) {
        return Observable.create(new ObservableOnSubscribe<List<NewsResultEntity>>() {
            @Override
            public void subscribe(ObservableEmitter<List<NewsResultEntity>> observableEmitter) throws Exception {
                try {
                    Log.d(TAG, "开始加载网络数据");
                    Thread.sleep(simulateTime);
                    List<NewsResultEntity> results = new ArrayList<>();
                    for (int i = 0; i < 10; i++) {
                        NewsResultEntity entity = new NewsResultEntity();
                        entity.setType("网络");
                        entity.setDesc("序号=" + i);
                        results.add(entity);
                    }
                    //a.正常状况。
                    //observableEmitter.onNext(results);
                    //observableEmitter.onComplete();
                    //b.发生异常。
                    observableEmitter.onError(new Throwable("netWork Error"));
                    Log.d(TAG, "结束加载网络数据");
                } catch (InterruptedException e) {
                    if (!observableEmitter.isDisposed()) {
                        observableEmitter.onError(e);
                    }
                }
            }
        }).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends List<NewsResultEntity>>>() {

            @Override
            public ObservableSource<? extends List<NewsResultEntity>> apply(Throwable throwable) throws Exception {
                Log.d(TAG, "网络请求发生错误throwable=" + throwable);
                return Observable.never();
            }
        });
    }
复制代码

当发生错误时,控制台的输出以下,能够看到缓存仍然正常地发送给了下游:

下面,咱们就来分析一下它的实现原理。

2.5.1 takeUntil

takeUntil的原理图以下所示:

这里,咱们给 sourceObservable经过 takeUntil传入了另外一个 otherObservable,它表示 sourceObservableotherObservable发射数据以后,就不容许再发射数据了,这就恰好知足了咱们前面说的“只要网络源发送了数据,那么缓存源就不该再发射数据”。

以后,咱们再用前面介绍过的merge操做符,让两个缓存源和网络源同时开始工做,去取数据。

2.5.2 publish

可是上面有一点缺陷,就是调用mergetakeUntil会发生两次订阅,这时候就须要使用publish操做符,它接收一个Function函数,该函数返回一个Observable,该Observable是对原Observable,也就是上面网络源的Observable转换以后的结果,该Observable能够被takeUntilmerge操做符所共享,从而实现只订阅一次的效果。

publish的原理图以下所示:

publish 原理图


更多文章,欢迎访问个人 Android 知识梳理系列:

相关文章
相关标签/搜索