这一系列文章原本我发表在简书。最近开始转移到掘金。之后也会在掘金发表(慢慢抛弃简书了应该,掘金的技术环境确实比简书好些)。javascript
前言: 不少朋友误会我文章的意思。我写这个系列文章的意思主要是帮助了解一下RxJava的常见用法。而不是使用一下本身或别人封装好的RxBus就以为本身的项目使用RxJava了。可是这也仅仅是我的口味问题,不少状况下确实RxBus/EventBus会很方便,很刺激,很上瘾。因此从这篇文章开始,我把标题中的"放弃RxBus"去除。java
不管在简书,微信平台,GitHub,掘金等等分享平台。一个名字上写着 "MVP(MVVM) + RxJava + Retrofit + Dagger2 + ........"这样的名字,再熟悉不过了。然而,大多数状况进去看一下RxJava部分。要么就是简单的把取到的数据用Observable.just()
直接传给下一层,要么就是直接使用Retrofit的Adapter来直接得到Observable,而app中其余部分并无reactive。并且还有不少Observable用法错误,好比冷热不分,连续太多的Map/FlatMap等等。react
为何不用RxBus我已经写了两篇文章了,可能因为我不常写文,不少人并无理解。在这里我再解释一次:EventBus若是是一辆穿梭在全部代码之间的公交车。那么Observable就是穿梭在少量人之间的Uber专车。他比起EventBus有不少优点,好比类型安全,异常处理,线程切换,强大的操做符等等。你固然能够作出一辆超级Uber来当全局公交车(RxBus)使用,然而这却损失了RxJava原本的许多优点,而且又给本身挖了许多坑,得不偿失。android
刚开始使用RxJava的时候,咱们会以为operator的链式调用会很是的爽,一个简单的例子:git
Observable.just("1", "2", "3", "4", "5", "6", "7")
.map(x -> Integer.valueOf(x))
.map(x -> x * 2)
.map(x -> x + 4)
.filter(x -> x >2)
// and much more operators
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());复制代码
当你只有不多数据的时候,这样固然能够,可是你数据量上来的时候,这就会有不少的overhead。 其实几乎全部的operator都会给你生成一个新的Observable。因此在上面这个例子中,咱们在过程当中生成了至少7个Observable。然而咱们彻底能够将中间的.map().map().map().filter合并在一个FlatMap中,减小不少的overhead。github
Observable.just()
即便你没有调用subscribe方法。just()括号里面的代码也已经执行了。显然,Observable.just()
不适合封装网络数据,由于咱们一般不想在subscribe以前作网络请求。class TestClass{
TestClass(){
System.out.println("I'm created!");
}
}
Observable.just(new TestClass());复制代码
这时你运行代码,你就看到确实你的TestClass 已经被建立了:I/System.out: I'm created!复制代码
同理,fromIterable也和just有一样的缺点。固然,这个能够简单的用defer()
/fromCallable()
/create()
操做符来是实现只有subscribe只有才加载。// use fromCallable
Observable.fromCallable(TestClass::new);
//or
Observable.defer(() -> Observable.just(new TestClass()));复制代码
固然ObservableJust在不少状况下,确实不错。若是你不须要监听后续的更新,那么ObservableJust能够知足你的需求。设计模式
这部分是本篇文章的重点!缓存
不少人在封装数据的时候,并无太多考虑冷热的问题,一般状况下并不会出错。由于目前不少开源项目(Demo)里除了RxBus,并无太多的RxJava的实时状况。然而,当你的App愈来愈Reactive的时候,冷热即是一个必须考虑的问题。
Hot Observable 意思是若是他开始传输数据,你不主动喊停(dispose()
/cancel()
),那么他就不会停,一直发射数据,即便他已经没有Subscriber了。而Cold Observable则是subscribe时才会发射数据。
然而,问题来了。我上篇文章讲过,只有subscribeActual方法调用了的时候,Observable发射数据,那为何Hot Observable没有Subscriber也会发射数据,他把数据发射给谁了呢?咱们在解决这个问题以前,先看一下Cold Observable:安全
咱们常见的工厂方法提供的都是ColdObservable,包括just()
,fromXX
,create()
,interval()
,defer()
。 他们的共同点是当你有多个Subscriber的时候,他们的事件是独立的,举个例子:微信
Observable interval = Observable.interval(1,TimeUnit.SECONDS);复制代码
若是咱们有两个subscriber,那么他们会各自有本身的计时器,而且互不干扰。效果以下图:
不一样于Cold Observable, Hot Observable是共享数据的。对于Hot Observable的全部subscriber,他们会在同一时刻收到相同的数据。咱们一般使用publish()
操做符来将ColdObservable变为Hot。或者咱们在RxBus中经常用到的Subjects
也是Hot Observable。
刚刚咱们刚刚提出了一个问题,
既然Hot Observable在没有subscriber的时候,还会继续发送数据,那么数据究竟发给谁了呢?
其实Hot Observable其实并无发送数据,而是他上层的Observable 发送数据给这个hot Observable。不信?咱们来分别看一下:
咱们在上面的误区中知道了,几乎全部operator都会生成一个新的Observable。publish固然不例外。可是有区别的是,publish会给你一个ConnectableObservable。具体实现类是ObservablePublish。这个Observable的区别是他提供一个connect()
方法,若是你调用connect()
方法,ConnectableObservable就会开始接收上游Observable的数据。咱们来测试一下:
ConnectableObservable interval = Observable.interval(1, TimeUnit.SECONDS).publish();
//connect even when no subscribers
interval.connect();复制代码
果真,因为咱们subscribe晚了一些。0这个数据没有收到,当咱们两个 Subscriber
都dispose的时候,ConnectableObservable
也仍在接受数据,致使咱们6这个数据没有接收到。ConnectableObservable
其实在内部,有一个PublishObserver
,他有两个做用。一个是当咱们调用 connect()
方法时, PublishObserver
开始接受上游的数据,咱们的例子里即是 Observable.interval(1, TimeUnit.SECONDS)
。因此才能在咱们没有调用 subscribe
方法时,他也能开始发送数据。第二个做用是 PublishObserver
存储全部的下游Subscriber, 也就是咱们例子中的Subscriber1 和Subscriber2,在 PublishObserver
每次接到一个上游数据,就会将接收到的结果,依次分发给他存储的全部 Subscribers
,若是下游 Subscriber
调用了 dispose
方法,那么他就会在本身的缓存中删除这个 Subscriber,下次接受到上游数据便不会传给这个Subscriber
。
那么这时候,有同窗应该要问了:
咱们可不能够中止从上游接受数据?
咱们固然能够。 connect()
方法会返回一个 Disposable 给咱们来控制是否继续接受上游的数据。
咱们固然不但愿每次都手动控制 ConnectableObservable
的开关。RxJava给咱们提供了一些经常使用的控制操做符
refCount()
能够说是最经常使用的操做符了。他会把 ConnectableObservable
变为一个一般的Observable但又保持了HotObservable的特性。也就是说,若是出现第一个Subscriber,他就会自动调用 connect()
方法,若是他开始接受以后,下游的 Subscribers
所有dispose,那么他也会中止接受上游的数据。具体看图: 每一个 Subscriber
每次都会接受一样的数据,可是当全部 subscriber
都 dispose时候,他也会自动dipose上游的 Observable
。因此咱们从新subscribe的时候,又从新从0开始。
这个操做符经常使用到,RxJava将他和publish合并为一个操做符 :share()
。
autoConnect()
看名字就知道,他会自动连接,若是你单纯调用 autoConnect()
,那么,他会在你连接第一个 Subscriber
的时候调用 connect()
,或者你调用 autoConnect(int Num)
,那么他将会再收到Num个 subscriber
的时候连接。Disposable
来控制上游的开关。 不过没问题,autoConnect提供了另外一种重载方法 :autoConnect(int numberOfSubscribers, Consumer<? super Disposable> connection)
Consumer
传给你 你须要的那个总开关。并且,autoConnect并不会autoDisconnect, 也就是若是他即便没有subscriber了。他也会继续接受数据。replay()replay()
方法和 publish()
同样,会返回一个 ConnectableObservable
,区别是, replay()
会为新的subscriber重放他以前所收到的上游数据,咱们再来举个例子:
//only replay 3 values
Observable.interval(1, TimeUnit.SECONDS).replay(3).refCount();复制代码
ReplayingShare()
其实ReplayingShare并不能算是ConnectableObservable的一个操做符,他是JakeWhaton的一个开源库,只有百来行。实现的功能是几乎和replay(1).refCount()
差很少。可是若是中断 Conncection以后,从新开始subscribe,他仍然会给你一个重放他上一次的结果。 具体看图:
Subjects 做为一个Reactive世界中的特殊存在,他特殊在于他本身既是一个Observable又是一个Observer(Subscriber)。你既能够像普通Observable同样让别的Subscriber来订阅,也能够用Subjects来订阅别人。更方便的是他甚至暴露了OnXX(),方法给你。你直接调用能够通知全部的Subscriber。 这也是RxBus的基础,RxBus几乎离不开Subjects。 蜘蛛侠的老爹告诉咱们,力量越大,责任就也大。Subjects也同样。 Subjects由于暴露了OnXX()方法,使得Subjects的数据来源变得难以控制。并且,Subjects一直是HotObservable,咱们来看下Subject的OnNext()
方法的实现:
@Override
public void onNext(T t) {
if (subscribers.get() == TERMINATED) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
for (PublishDisposable<T> s : subscribers.get()) {
s.onNext(t);
}
}复制代码
能够看出来Subjects只要调用了OnNext()
方法就会当即发送数据。因此,使用时必定要注意Subjects和Subscriber的连接时序问题。具体Subjects的用法我想介绍帖子已经足够多了。这里就不赘述了。
View 的各类Listener 咱们经常使用create方法来封装,好比OnClickListener:
Observable.create(emitter -> {
button.setOnClickListener(v -> emitter.onNext("I'm Clicked"));
emitter.setCancellable(() -> button.setOnClickListener(null));
});复制代码
这里很是关键的一点是必定要设置解除绑定,不然你将持续使用这个会形成内存泄漏。并且最好配合使用share()。不然只有最后一个Subscriber能收到OnClick。固然,若是不考虑方法数的话,推荐配合使用RxBinding。
并且,用create()方法封装Listener适合几乎全部的callback, 而且安全。
设想一个场景,咱们有一个User类。里面有咱们的用户名,头像,各类信息。然而在咱们的app中,可能有三四个Fragment/Activity须要根据这个User作出不一样的反应。这时咱们就能够简单的使用Subject来封装User类。
public class UserRepository {
private User actualUser;
private Subject<User> subject = ReplaySubject.createWithSize(1);
/** * *Get User Data from wherever you want Network/Database etc */
public Observable<User> getUpdate(){
return subject;
}
public void updateUser(User user){
actualUser = user;
subject.onNext(actualUser);
}
}复制代码
若是咱们某些模块须要这个User,那么只须要subscribe到这个Repository,若是User有更新,每个Subscriber都会收到更新后的User而且互相不影响。并且咱们使用ReplaySubject,即便有新的Subscriber,也会收到最新的一个User的缓存。
可是使用的时候必定要注意,由于用的是Subject.因此在onNext方法中一旦出现了error。那么全部的Subscriber都将和这个subject断开了连接。这里也能够用 RxRelay 代替Subject,简单来讲Relay就是一个没有onError和onComplete的Subject。
Dan Lew在他的博客Loading data from multiple sources with RxJava 中介绍过他这种处理方法,
// Our sources (left as an exercise for the reader)
Observable<Data> memory = ...;
Observable<Data> disk = ...;
Observable<Data> network = ...;
// Retrieve the first source with data
Observable<Data> source = Observable
.concat(memory, disk, network)
.first();复制代码
而后在每次作不一样请求的时候刷新缓存
Observable<Data> networkWithSave = network.doOnNext(data -> {
saveToDisk(data);
cacheInMemory(data);
});
Observable<Data> diskWithCache = disk.doOnNext(data -> {
cacheInMemory(data);
});复制代码
具体也能够看这篇简书,我也不在过多赘述 :RxJava(八)concat符操做处理多数据源
这里也说一下这个方法的缺点。 首先,这个只适合一个Item的时候。若是咱们有多个Item从这个Observable中流出。 fisrt()
操做符只会取第一个。
这多是最灵活的写法?若是你想用RxJava封装本身的库,推荐这种方法封装。由于这样不只仅能够有效的进行错误处理,而且不会暴露过多逻辑给外面,许多优秀的RxJava相关库都是这样封装,就连RxJava本身也是把一个个的operator封装成一个个不一样的Observable。可是这种方法确实要求很高,要作不少考虑,好比异步,多线程冲突,错误处理。对新手不是很推荐。
仍是稍微讲一个例子: 仍是咱们的onClickLisnter的封装:
RxBinding 的 封装:
final class ViewClickObservable extends Observable<Object> {
private final View view;
ViewClickObservable(View view) {
this.view = view;
}
@Override protected void subscribeActual(Observer<? super Object> observer) {
if (!checkMainThread(observer)) {
return;
}
Listener listener = new Listener(view, observer);
observer.onSubscribe(listener);
view.setOnClickListener(listener);
}
static final class Listener extends MainThreadDisposable implements OnClickListener {
private final View view;
private final Observer<? super Object> observer;
Listener(View view, Observer<? super Object> observer) {
this.view = view;
this.observer = observer;
}
@Override public void onClick(View v) {
if (!isDisposed()) {
observer.onNext(Notification.INSTANCE);
}
}
@Override protected void onDispose() {
view.setOnClickListener(null);
}
}
}复制代码
其实这里虽然代码更多,可是实质上是刚才咱们说到的用Observable.create()
来封装没有不少区别。那咱们为何还要这么麻烦本身写Observable? 由于与create相比,减小了对象个数。 Observable封装一个OnClickListener 须要 ObservableCreate
, ObservableOnSubscribe
,ObservableEmitter
这三个类的实例。来确保你封装出来的Observable 是遵照 Observable Contract的。 而若是你默认本身遵照Observable Contract, 你只须要一个 CustomObservable
来实现。减小了两个对象的生成。 这个观点也获得了证明,我在StackOverFlow问过相关问题。很幸运获得了RxJava 2.x 做者 David 的回复:
若是直接继承与Observable来封装。大概分以下几步:
ViewClickObservable(View view) {
this.view = view;
}复制代码
将你须要使用的Listener/CallBack 和 Disposable/Observer接口结合成为一个实现类,实现监听。而且经过构造方法,将下游的Observer传入,以实现传输数据。好比OnClickListener:
static final class Listener extends MainThreadDisposable implements OnClickListener {
private final View view;
private final Observer<? super Object> observer;
Listener(View view, Observer<? super Object> observer) {
this.view = view;
this.observer = observer;
}
@Override public void onClick(View v) {
if (!isDisposed()) {
observer.onNext(Notification.INSTANCE);
}
}
@Override protected void onDispose() {
view.setOnClickListener(null);
}
}复制代码
这里推荐使用内部类的形式,下降内部可见性。并且这里须要注意的点很是多。
首先Disposable的 两个方法都须要实现,isDisposed()通常使用AtomicBoolean来控制监听是否已经取消订阅,好比:
private final AtomicBoolean unsubscribed = new AtomicBoolean();
@Override
public final boolean isDisposed() {
return unsubscribed.get();
}复制代码
dispose()方法通常会放一些取消监听等方法。好比咱们上面看到的view.setOnClickListener(null);
。 这里 onDispose
正常是没有这个方法的。 这个是Jake Wharton为了方便封装出来的接口,放在dispose方法里运行的。
public abstract class MainThreadDisposable implements Disposable {
@Override
public final void dispose() {
onDispose();
}
protected abstract void onDispose();
}复制代码
他固然还在dipose方法里作了其余安全检查,消除了一些Boilerplate。
其次,这里的对于Observer的控制是十分宽松的。因此你的行为最好必定遵循Observable Contract。不然出现其余问题,好比下游取消订阅上游还在发送数据。 又或者onComplete/onError以后还有onNext出发。 又或者出现异常并不会在onError中获得等等。 这里推荐看一下RxBinding其余实现类的源码。或者是Retrofit RxJava 2 的 Adapter。 都会颇有帮助。
最后,若是你是封装一个生产Observable的方法,那么使用Disposable。若是你是想封装一个自定义Operator。那么须要实现Observer接口。使用这个observer来监听上游数据。自定义Operator实在复杂。这里我就不讲了,我本身也没精通这个。
@Override protected void subscribeActual(Observer<? super Object> observer) {
if (!checkMainThread(observer)) {
return;
}
Listener listener = new Listener(view, observer);
observer.onSubscribe(listener);
view.setOnClickListener(listener);
}复制代码
看了我第二篇文章的朋友这里应该对subscribeActual方法不陌生。这里的参数是下游的observer。将他直接传入咱们刚才设计好的Listener/Disposable/Observer。 而后经过observer.onSubscribe注册咱们的disposable。 而后咱们开始注册监听。这里顺序很重要。必定要先调用onSubscribe方法再注册监听。不然可能会出现下游disposable空指针异常。
这篇文章在简书上也发了有一阵子了。转到掘金时我又从新检查了下,补充了一些内容。可能有些地方看起来与我以前的文章有些许重复。但愿见谅。