在不少资讯应用当中,当咱们进入一个新的页面,为了提高用户体验,不让页面空白过久,咱们通常会先读取缓存中的数据,再去请求网络。java
今天这篇文章,咱们将实现下面这个效果:同时发起读取缓存、访问网络的请求,若是缓存的数据先回来,那么就先展现缓存的数据,而若是网络的数据先回来,那么就再也不展现缓存的数据。缓存
为了让你们对这一过程有更深入的理解,咱们介绍"先加载缓存,再请求网络"这种模型的四种实现方式,其中第四种实现能够达到上面咱们所说的效果,而前面的三种实现虽然也可以实现相同的需求,而且能够正常工做,可是在某些特殊状况下,会出现意想不到的状况:网络
concat
实现concatEager
实现merge
实现publish
实现咱们须要准备两个Observable
,分别表示 缓存数据源 和 网络数据源,在其中填入相应的缓存数据和网络数据,为了以后演示一些特殊的状况,咱们能够在建立它的时候指定它执行的时间:app
//模拟缓存数据源。
private Observable<List<NewsResultEntity>> getCacheArticle(final long simulateTime) {
return Observable.create(new ObservableOnSubscribe<List<NewsResultEntity>>() {
@Override
public void subscribe(ObservableEmitter<List<NewsResultEntity>> observableEmitter) throws Exception {
try {
Log.d(TAG, "开始加载缓存数据");
Thread.sleep(simulateTime);
List<NewsResultEntity> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
NewsResultEntity entity = new NewsResultEntity();
entity.setType("缓存");
entity.setDesc("序号=" + i);
results.add(entity);
}
observableEmitter.onNext(results);
observableEmitter.onComplete();
Log.d(TAG, "结束加载缓存数据");
} catch (InterruptedException e) {
if (!observableEmitter.isDisposed()) {
observableEmitter.onError(e);
}
}
}
});
}
//模拟网络数据源。
private Observable<List<NewsResultEntity>> getNetworkArticle(final long simulateTime) {
return Observable.create(new ObservableOnSubscribe<List<NewsResultEntity>>() {
@Override
public void subscribe(ObservableEmitter<List<NewsResultEntity>> observableEmitter) throws Exception {
try {
Log.d(TAG, "开始加载网络数据");
Thread.sleep(simulateTime);
List<NewsResultEntity> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
NewsResultEntity entity = new NewsResultEntity();
entity.setType("网络");
entity.setDesc("序号=" + i);
results.add(entity);
}
observableEmitter.onNext(results);
observableEmitter.onComplete();
Log.d(TAG, "结束加载网络数据");
} catch (InterruptedException e) {
if (!observableEmitter.isDisposed()) {
observableEmitter.onError(e);
}
}
}
});
}
复制代码
在最终的下游,咱们接收数据,并在页面上经过RecyclerView
进行展现:ide
private DisposableObserver<List<NewsResultEntity>> getArticleObserver() {
return new DisposableObserver<List<NewsResultEntity>>() {
@Override
public void onNext(List<NewsResultEntity> newsResultEntities) {
mNewsResultEntities.clear();
mNewsResultEntities.addAll(newsResultEntities);
mNewsAdapter.notifyDataSetChanged();
}
@Override
public void onError(Throwable throwable) {
Log.d(TAG, "加载错误, e=" + throwable);
}
@Override
public void onComplete() {
Log.d(TAG, "加载完成");
}
};
}
复制代码
concat
是不少文章都推荐使用的方式,由于它不会有任何问题,实现代码以下:函数
private void refreshArticleUseContact() {
Observable<List<NewsResultEntity>> contactObservable = Observable.concat(
getCacheArticle(500).subscribeOn(Schedulers.io()), getNetworkArticle(2000).subscribeOn(Schedulers.io()));
DisposableObserver<List<NewsResultEntity>> disposableObserver = getArticleObserver();
contactObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
}
复制代码
上面这段代码的运行结果为: 优化
2500ms
。
Observable
,而且必需要等到前一个
Observable
的全部数据项都发送完以后,才会开始下一个
Observable
数据的发送。
那么,concat
操做符的缺点是什么呢?很明显,咱们白白浪费了前面读取缓存的这段时间,能不能同时发起读取缓存和网络的请求,而不是等到读取缓存完毕以后,才去请求网络呢?spa
为了解决前面没有同时发起请求的问题,咱们可使用concatEager
,它的使用方法以下:3d
private void refreshArticleUseConcatEager() {
List<Observable<List<NewsResultEntity>>> observables = new ArrayList<>();
observables.add(getCacheArticle(500).subscribeOn(Schedulers.io()));
observables.add(getNetworkArticle(2000).subscribeOn(Schedulers.io()));
Observable<List<NewsResultEntity>> contactObservable = Observable.concatEager(observables);
DisposableObserver<List<NewsResultEntity>> disposableObserver = getArticleObserver();
contactObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
}
复制代码
它和concat
最大的不一样就是多个Observable
能够同时开始发射数据,若是后一个Observable
发射完成后,前一个Observable
还有发射完数据,那么它会将后一个Observable
的数据先缓存起来,等到前一个Observable
发射完毕后,才将缓存的数据发射出去。code
上面代码中,请求缓存的时长改成500ms
,而请求网络的时长改成2000ms
,运行结果为:
咱们将请求缓存的时长改成2000ms
,而请求网络的时长改成500ms
,查看控制台的输出,能够验证上面的结论:
下面,咱们来看一下merge
操做符的示例:
private void refreshArticleUseMerge() {
Observable<List<NewsResultEntity>> contactObservable = Observable.merge(
getCacheArticle(500).subscribeOn(Schedulers.io()), getNetworkArticle(2000).subscribeOn(Schedulers.io()));
DisposableObserver<List<NewsResultEntity>> disposableObserver = getArticleObserver();
contactObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
}
复制代码
merge
的原理图以下所示:
concatEager
同样,会让多个
Observable
同时开始发射数据,可是它不须要
Observable
之间的互相等待,而是直接发送给下游。
当缓存时间为500ms
,而请求网络时间为2000ms
时,它的结果为:
使用publish
的实现以下所示:
private void refreshArticleUsePublish() {
Observable<List<NewsResultEntity>> publishObservable = getNetworkArticle(2000).subscribeOn(Schedulers.io()).publish(new Function<Observable<List<NewsResultEntity>>, ObservableSource<List<NewsResultEntity>>>() {
@Override
public ObservableSource<List<NewsResultEntity>> apply(Observable<List<NewsResultEntity>> network) throws Exception {
return Observable.merge(network, getCacheArticle(500).subscribeOn(Schedulers.io()).takeUntil(network));
}
});
DisposableObserver<List<NewsResultEntity>> disposableObserver = getArticleObserver();
publishObservable.observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
}
复制代码
这里面一共涉及到了三个操做符,publish
、merge
和takeUnti
,咱们先来看一下它可否解决咱们以前三种方式的缺陷:
500ms
,请求网络的时间为2000ms
2000ms
,请求网络的时间为500ms
这里要感谢简友 无意下棋 在评论里提到的问题:若是网络请求先返回时发生了错误(例如没有网络等)致使发送了onError
事件,从而使得缓存的Observable
也没法发送事件,最后界面显示空白。
针对这个问题,咱们须要对网络的Observable
进行优化,让其不将onError
事件传递给下游。其中一种解决方式是经过使用onErrorResume
操做符,它能够接收一个Func
函数,其形参为网络发送的错误,而在上游发生错误时会回调该函数。咱们能够根据错误的类型来返回一个新的Observable
,让订阅者镜像到这个新的Observable
,而且忽略onError
事件,从而避免onError
事件致使整个订阅关系的结束。
这里为了不订阅者在镜像到新的Observable
时会收到额外的时间,咱们返回一个Observable.never()
,它表示一个永远不发送事件的上游。
private Observable<List<NewsResultEntity>> getNetworkArticle(final long simulateTime) {
return Observable.create(new ObservableOnSubscribe<List<NewsResultEntity>>() {
@Override
public void subscribe(ObservableEmitter<List<NewsResultEntity>> observableEmitter) throws Exception {
try {
Log.d(TAG, "开始加载网络数据");
Thread.sleep(simulateTime);
List<NewsResultEntity> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
NewsResultEntity entity = new NewsResultEntity();
entity.setType("网络");
entity.setDesc("序号=" + i);
results.add(entity);
}
//a.正常状况。
//observableEmitter.onNext(results);
//observableEmitter.onComplete();
//b.发生异常。
observableEmitter.onError(new Throwable("netWork Error"));
Log.d(TAG, "结束加载网络数据");
} catch (InterruptedException e) {
if (!observableEmitter.isDisposed()) {
observableEmitter.onError(e);
}
}
}
}).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends List<NewsResultEntity>>>() {
@Override
public ObservableSource<? extends List<NewsResultEntity>> apply(Throwable throwable) throws Exception {
Log.d(TAG, "网络请求发生错误throwable=" + throwable);
return Observable.never();
}
});
}
复制代码
当发生错误时,控制台的输出以下,能够看到缓存仍然正常地发送给了下游:
下面,咱们就来分析一下它的实现原理。
takeUntil
的原理图以下所示:
sourceObservable
经过
takeUntil
传入了另外一个
otherObservable
,它表示
sourceObservable
在
otherObservable
发射数据以后,就不容许再发射数据了,这就恰好知足了咱们前面说的“只要网络源发送了数据,那么缓存源就不该再发射数据”。
以后,咱们再用前面介绍过的merge
操做符,让两个缓存源和网络源同时开始工做,去取数据。
可是上面有一点缺陷,就是调用merge
和takeUntil
会发生两次订阅,这时候就须要使用publish
操做符,它接收一个Function
函数,该函数返回一个Observable
,该Observable
是对原Observable
,也就是上面网络源的Observable
转换以后的结果,该Observable
能够被takeUntil
和merge
操做符所共享,从而实现只订阅一次的效果。
publish
的原理图以下所示: