拥抱 RxJava(三):关于 Observable 的冷热,常见的封装方式以及误区

这一系列文章原本我发表在简书。最近开始转移到掘金。之后也会在掘金发表(慢慢抛弃简书了应该,掘金的技术环境确实比简书好些)。javascript


前言: 不少朋友误会我文章的意思。我写这个系列文章的意思主要是帮助了解一下RxJava的常见用法。而不是使用一下本身或别人封装好的RxBus就以为本身的项目使用RxJava了。可是这也仅仅是我的口味问题,不少状况下确实RxBus/EventBus会很方便,很刺激,很上瘾。因此从这篇文章开始,我把标题中的"放弃RxBus"去除。java

不管在简书,微信平台,GitHub,掘金等等分享平台。一个名字上写着 "MVP(MVVM) + RxJava + Retrofit + Dagger2 + ........"这样的名字,再熟悉不过了。然而,大多数状况进去看一下RxJava部分。要么就是简单的把取到的数据用Observable.just()直接传给下一层,要么就是直接使用Retrofit的Adapter来直接得到Observable,而app中其余部分并无reactive。并且还有不少Observable用法错误,好比冷热不分,连续太多的Map/FlatMap等等。react

0. RxBus/Retrofit 足够用了,我为何要让本身的App 更加的Reactive?

为何不用RxBus我已经写了两篇文章了,可能因为我不常写文,不少人并无理解。在这里我再解释一次:EventBus若是是一辆穿梭在全部代码之间的公交车。那么Observable就是穿梭在少量人之间的Uber专车。他比起EventBus有不少优点,好比类型安全,异常处理,线程切换,强大的操做符等等。你固然能够作出一辆超级Uber来当全局公交车(RxBus)使用,然而这却损失了RxJava原本的许多优点,而且又给本身挖了许多坑,得不偿失。android

0.1 一个常见误区,过多的operator

刚开始使用RxJava的时候,咱们会以为operator的链式调用会很是的爽,一个简单的例子:git

Observable.just("1", "2", "3", "4", "5", "6", "7")
          .map(x -> Integer.valueOf(x))
          .map(x -> x * 2)
          .map(x -> x + 4)
          .filter(x -> x >2)
          // and much more operators
          .subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread());复制代码

当你只有不多数据的时候,这样固然能够,可是你数据量上来的时候,这就会有不少的overhead。 其实几乎全部的operator都会给你生成一个新的Observable。因此在上面这个例子中,咱们在过程当中生成了至少7个Observable。然而咱们彻底能够将中间的.map().map().map().filter合并在一个FlatMap中,减小不少的overhead。github

1. Observable.just()的局限性。

  1. 使用Observable.just() 即便你没有调用subscribe方法。just()括号里面的代码也已经执行了。显然,Observable.just()不适合封装网络数据,由于咱们一般不想在subscribe以前作网络请求。
    举个例子:
    class TestClass{
    TestClass(){
     System.out.println("I'm created!");
    }
    }
    Observable.just(new TestClass());复制代码
    这时你运行代码,你就看到确实你的TestClass 已经被建立了:
    I/System.out: I'm created!复制代码
    同理,fromIterable也和just有一样的缺点。固然,这个能够简单的用defer()/fromCallable()/create()操做符来是实现只有subscribe只有才加载。
    好比:
    // use fromCallable
    Observable.fromCallable(TestClass::new);
    //or
    Observable.defer(() -> Observable.just(new TestClass()));复制代码
  2. Observable.just()不够灵活。虽说设计模式上咱们追求 "Minimize Mutability" 可是若是咱们的程序愈来愈 reactive的时候。一个 ObservableJust 每每是不知足需求的。好比以前必定订阅的subscriber。若是数据更新了,你不能够同过ObservableJust 来通知全部的Observable 新数据更新了,须要你的subscriber主动更新。这显然有悖于咱们追求的reactive programming。 主动pull数据而不是数据告诉你,我更新了而后再作出反应。

固然ObservableJust在不少状况下,确实不错。若是你不须要监听后续的更新,那么ObservableJust能够知足你的需求。设计模式

2. Hot Observable 和 cold Observable

这部分是本篇文章的重点!缓存

不少人在封装数据的时候,并无太多考虑冷热的问题,一般状况下并不会出错。由于目前不少开源项目(Demo)里除了RxBus,并无太多的RxJava的实时状况。然而,当你的App愈来愈Reactive的时候,冷热即是一个必须考虑的问题。
Hot Observable 意思是若是他开始传输数据,你不主动喊停(dispose()/cancel()),那么他就不会停,一直发射数据,即便他已经没有Subscriber了。而Cold Observable则是subscribe时才会发射数据。
然而,问题来了。我上篇文章讲过,只有subscribeActual方法调用了的时候,Observable发射数据,那为何Hot Observable没有Subscriber也会发射数据,他把数据发射给谁了呢?咱们在解决这个问题以前,先看一下Cold Observable:安全

2.1 Cold Observable

咱们常见的工厂方法提供的都是ColdObservable,包括just(),fromXX,create(),interval(),defer()。 他们的共同点是当你有多个Subscriber的时候,他们的事件是独立的,举个例子:微信

Observable interval = Observable.interval(1,TimeUnit.SECONDS);复制代码

若是咱们有两个subscriber,那么他们会各自有本身的计时器,而且互不干扰。效果以下图:

Hot Observable

2.2 Hot Observable

不一样于Cold Observable, Hot Observable是共享数据的。对于Hot Observable的全部subscriber,他们会在同一时刻收到相同的数据。咱们一般使用publish()操做符来将ColdObservable变为Hot。或者咱们在RxBus中经常用到的Subjects 也是Hot Observable。
刚刚咱们刚刚提出了一个问题,

既然Hot Observable在没有subscriber的时候,还会继续发送数据,那么数据究竟发给谁了呢?

其实Hot Observable其实并无发送数据,而是他上层的Observable 发送数据给这个hot Observable。不信?咱们来分别看一下:

2.2.1 ConnectableObservable

咱们在上面的误区中知道了,几乎全部operator都会生成一个新的Observable。publish固然不例外。可是有区别的是,publish会给你一个ConnectableObservable。具体实现类是ObservablePublish。这个Observable的区别是他提供一个connect()方法,若是你调用connect()方法,ConnectableObservable就会开始接收上游Observable的数据。咱们来测试一下:

ConnectableObservable interval = Observable.interval(1, TimeUnit.SECONDS).publish();
//connect even when no subscribers
interval.connect();复制代码

ConnectableObservable

果真,因为咱们subscribe晚了一些。0这个数据没有收到,当咱们两个 Subscriber 都dispose的时候,ConnectableObservable 也仍在接受数据,致使咱们6这个数据没有接收到。
ConnectableObservable 其实在内部,有一个PublishObserver,他有两个做用。一个是当咱们调用 connect()方法时, PublishObserver开始接受上游的数据,咱们的例子里即是 Observable.interval(1, TimeUnit.SECONDS) 。因此才能在咱们没有调用 subscribe方法时,他也能开始发送数据。第二个做用是 PublishObserver存储全部的下游Subscriber, 也就是咱们例子中的Subscriber1 和Subscriber2,在 PublishObserver 每次接到一个上游数据,就会将接收到的结果,依次分发给他存储的全部 Subscribers ,若是下游 Subscriber 调用了 dispose方法,那么他就会在本身的缓存中删除这个 Subscriber,下次接受到上游数据便不会传给这个Subscriber
那么这时候,有同窗应该要问了:

咱们可不能够中止从上游接受数据?

咱们固然能够。 connect()方法会返回一个 Disposable 给咱们来控制是否继续接受上游的数据。

2.2.2 ConnectableObservable的经常使用操做符

咱们固然不但愿每次都手动控制 ConnectableObservable的开关。RxJava给咱们提供了一些经常使用的控制操做符

  1. refCount()
    refCount()能够说是最经常使用的操做符了。他会把 ConnectableObservable变为一个一般的Observable但又保持了HotObservable的特性。也就是说,若是出现第一个Subscriber,他就会自动调用 connect()方法,若是他开始接受以后,下游的 Subscribers所有dispose,那么他也会中止接受上游的数据。具体看图:

refCount()

每一个 Subscriber 每次都会接受一样的数据,可是当全部 subscriber 都 dispose时候,他也会自动dipose上游的 Observable 。因此咱们从新subscribe的时候,又从新从0开始。
这个操做符经常使用到,RxJava将他和publish合并为一个操做符 :share()

  1. autoConnect()
    autoConnect()看名字就知道,他会自动连接,若是你单纯调用 autoConnect() ,那么,他会在你连接第一个 Subscriber 的时候调用 connect(),或者你调用 autoConnect(int Num),那么他将会再收到Num个 subscriber的时候连接。
    可是,这个操做符的关键在于,因为咱们为了链式调用,autoConnect会返回Observable给你,你不会在返回方法里得到一个 Disposable来控制上游的开关。 不过没问题,autoConnect提供了另外一种重载方法 :
    autoConnect(int numberOfSubscribers, Consumer<? super Disposable> connection)
    他会在这个 Consumer传给你 你须要的那个总开关。并且,autoConnect并不会autoDisconnect, 也就是若是他即便没有subscriber了。他也会继续接受数据。
  2. replay()
    replay()方法和 publish()同样,会返回一个 ConnectableObservable,区别是, replay()会为新的subscriber重放他以前所收到的上游数据,咱们再来举个例子:

    //only replay 3 values
    Observable.interval(1, TimeUnit.SECONDS).replay(3).refCount();复制代码

    replay()

    果真,Subscriber2在subscribe时候,当即收到了以前已经错过的三个数据,而后继续接受后面的数据。
    可是,这里有几点须要考虑:replay() 会缓存上游发过来的数据,因此并不须要担忧从新生成新数据给新的 Subscriber。

  3. ReplayingShare()
    其实ReplayingShare并不能算是ConnectableObservable的一个操做符,他是JakeWhaton的一个开源库,只有百来行。实现的功能是几乎和replay(1).refCount()差很少。可是若是中断 Conncection以后,从新开始subscribe,他仍然会给你一个重放他上一次的结果。 具体看图:

ReplayingShare()

咱们看到和刚才的replay不一样,即便两个Subscriber都 dispose, 从新开始仍然会接收到咱们缓存过的一个数据。

2.3 Subjects

Subjects 做为一个Reactive世界中的特殊存在,他特殊在于他本身既是一个Observable又是一个Observer(Subscriber)。你既能够像普通Observable同样让别的Subscriber来订阅,也能够用Subjects来订阅别人。更方便的是他甚至暴露了OnXX(),方法给你。你直接调用能够通知全部的Subscriber。 这也是RxBus的基础,RxBus几乎离不开Subjects。 蜘蛛侠的老爹告诉咱们,力量越大,责任就也大。Subjects也同样。 Subjects由于暴露了OnXX()方法,使得Subjects的数据来源变得难以控制。并且,Subjects一直是HotObservable,咱们来看下Subject的OnNext()方法的实现:

@Override
public void onNext(T t) {
    if (subscribers.get() == TERMINATED) {
        return;
    }
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    for (PublishDisposable<T> s : subscribers.get()) {
        s.onNext(t);
    }
}复制代码

能够看出来Subjects只要调用了OnNext()方法就会当即发送数据。因此,使用时必定要注意Subjects和Subscriber的连接时序问题。具体Subjects的用法我想介绍帖子已经足够多了。这里就不赘述了。

3. 在Android中常见的几种封装和注意事项

1.封装View 的Listener

View 的各类Listener 咱们经常使用create方法来封装,好比OnClickListener:

Observable.create(emitter -> {
    button.setOnClickListener(v -> emitter.onNext("I'm Clicked"));
    emitter.setCancellable(() -> button.setOnClickListener(null));
});复制代码

这里很是关键的一点是必定要设置解除绑定,不然你将持续使用这个会形成内存泄漏。并且最好配合使用share()。不然只有最后一个Subscriber能收到OnClick。固然,若是不考虑方法数的话,推荐配合使用RxBinding。

并且,用create()方法封装Listener适合几乎全部的callback, 而且安全。

2.封装简单的数据源

设想一个场景,咱们有一个User类。里面有咱们的用户名,头像,各类信息。然而在咱们的app中,可能有三四个Fragment/Activity须要根据这个User作出不一样的反应。这时咱们就能够简单的使用Subject来封装User类。

public class UserRepository {
    private User actualUser;

    private Subject<User> subject = ReplaySubject.createWithSize(1);

    /** * *Get User Data from wherever you want Network/Database etc */

    public Observable<User> getUpdate(){
        return subject;
    }

    public void updateUser(User user){
        actualUser = user;
        subject.onNext(actualUser);
    }
}复制代码

若是咱们某些模块须要这个User,那么只须要subscribe到这个Repository,若是User有更新,每个Subscriber都会收到更新后的User而且互相不影响。并且咱们使用ReplaySubject,即便有新的Subscriber,也会收到最新的一个User的缓存。
可是使用的时候必定要注意,由于用的是Subject.因此在onNext方法中一旦出现了error。那么全部的Subscriber都将和这个subject断开了连接。这里也能够用 RxRelay 代替Subject,简单来讲Relay就是一个没有onError和onComplete的Subject。

3.简单的使用concat().first()来处理多来源

Dan Lew在他的博客Loading data from multiple sources with RxJava 中介绍过他这种处理方法,

// Our sources (left as an exercise for the reader)
Observable<Data> memory = ...;  
Observable<Data> disk = ...;  
Observable<Data> network = ...;

// Retrieve the first source with data
Observable<Data> source = Observable  
  .concat(memory, disk, network)
  .first();复制代码

而后在每次作不一样请求的时候刷新缓存

Observable<Data> networkWithSave = network.doOnNext(data -> {  
  saveToDisk(data);
  cacheInMemory(data);
});

Observable<Data> diskWithCache = disk.doOnNext(data -> {  
  cacheInMemory(data);
});复制代码

具体也能够看这篇简书,我也不在过多赘述 :RxJava(八)concat符操做处理多数据源

这里也说一下这个方法的缺点。 首先,这个只适合一个Item的时候。若是咱们有多个Item从这个Observable中流出。 fisrt()操做符只会取第一个。

4.本身继承Observable 手动写subscribeActual()方法

这多是最灵活的写法?若是你想用RxJava封装本身的库,推荐这种方法封装。由于这样不只仅能够有效的进行错误处理,而且不会暴露过多逻辑给外面,许多优秀的RxJava相关库都是这样封装,就连RxJava本身也是把一个个的operator封装成一个个不一样的Observable。可是这种方法确实要求很高,要作不少考虑,好比异步,多线程冲突,错误处理。对新手不是很推荐。
仍是稍微讲一个例子: 仍是咱们的onClickLisnter的封装:
RxBinding 的 封装:

final class ViewClickObservable extends Observable<Object> {
  private final View view;

  ViewClickObservable(View view) {
    this.view = view;
  }

  @Override protected void subscribeActual(Observer<? super Object> observer) {
    if (!checkMainThread(observer)) {
      return;
    }
    Listener listener = new Listener(view, observer);
    observer.onSubscribe(listener);
    view.setOnClickListener(listener);
  }

  static final class Listener extends MainThreadDisposable implements OnClickListener {
    private final View view;
    private final Observer<? super Object> observer;

    Listener(View view, Observer<? super Object> observer) {
      this.view = view;
      this.observer = observer;
    }

    @Override public void onClick(View v) {
      if (!isDisposed()) {
        observer.onNext(Notification.INSTANCE);
      }
    }

    @Override protected void onDispose() {
      view.setOnClickListener(null);
    }
  }
}复制代码

其实这里虽然代码更多,可是实质上是刚才咱们说到的用Observable.create()来封装没有不少区别。那咱们为何还要这么麻烦本身写Observable? 由于与create相比,减小了对象个数。 Observable封装一个OnClickListener 须要 ObservableCreate, ObservableOnSubscribe,ObservableEmitter 这三个类的实例。来确保你封装出来的Observable 是遵照 Observable Contract的。 而若是你默认本身遵照Observable Contract, 你只须要一个 CustomObservable来实现。减小了两个对象的生成。 这个观点也获得了证明,我在StackOverFlow问过相关问题。很幸运获得了RxJava 2.x 做者 David 的回复:

若是直接继承与Observable来封装。大概分以下几步:

  1. 将你须要监听的类/接口,经过构造方法传入这个Observable。好比将咱们须要监听OnClick的View传入:
    ViewClickObservable(View view) {
     this.view = view;
    }复制代码
  2. 将你须要使用的Listener/CallBack 和 Disposable/Observer接口结合成为一个实现类,实现监听。而且经过构造方法,将下游的Observer传入,以实现传输数据。好比OnClickListener:

    static final class Listener extends MainThreadDisposable implements OnClickListener {
     private final View view;
     private final Observer<? super Object> observer;
    
     Listener(View view, Observer<? super Object> observer) {
       this.view = view;
       this.observer = observer;
     }
    
     @Override public void onClick(View v) {
       if (!isDisposed()) {
         observer.onNext(Notification.INSTANCE);
       }
     }
    
     @Override protected void onDispose() {
       view.setOnClickListener(null);
     }
    }复制代码

这里推荐使用内部类的形式,下降内部可见性。并且这里须要注意的点很是多。
首先Disposable的 两个方法都须要实现,isDisposed()通常使用AtomicBoolean来控制监听是否已经取消订阅,好比:

private final AtomicBoolean unsubscribed = new AtomicBoolean();

    @Override
    public final boolean isDisposed() {
        return unsubscribed.get();
    }复制代码

dispose()方法通常会放一些取消监听等方法。好比咱们上面看到的view.setOnClickListener(null);。 这里 onDispose正常是没有这个方法的。 这个是Jake Wharton为了方便封装出来的接口,放在dispose方法里运行的。

public abstract class MainThreadDisposable implements Disposable {
    @Override
    public final void dispose() {
        onDispose();
    }
    protected abstract void onDispose();
}复制代码

他固然还在dipose方法里作了其余安全检查,消除了一些Boilerplate。

其次,这里的对于Observer的控制是十分宽松的。因此你的行为最好必定遵循Observable Contract。不然出现其余问题,好比下游取消订阅上游还在发送数据。 又或者onComplete/onError以后还有onNext出发。 又或者出现异常并不会在onError中获得等等。 这里推荐看一下RxBinding其余实现类的源码。或者是Retrofit RxJava 2 的 Adapter。 都会颇有帮助。

最后,若是你是封装一个生产Observable的方法,那么使用Disposable。若是你是想封装一个自定义Operator。那么须要实现Observer接口。使用这个observer来监听上游数据。自定义Operator实在复杂。这里我就不讲了,我本身也没精通这个。

  1. 在SubscribeActual里,注册监听方法,仍是咱们刚才的例子:
@Override protected void subscribeActual(Observer<? super Object> observer) {
    if (!checkMainThread(observer)) {
      return;
    }
    Listener listener = new Listener(view, observer);
    observer.onSubscribe(listener);
    view.setOnClickListener(listener);
  }复制代码

看了我第二篇文章的朋友这里应该对subscribeActual方法不陌生。这里的参数是下游的observer。将他直接传入咱们刚才设计好的Listener/Disposable/Observer。 而后经过observer.onSubscribe注册咱们的disposable。 而后咱们开始注册监听。这里顺序很重要。必定要先调用onSubscribe方法再注册监听。不然可能会出现下游disposable空指针异常。

总结

这篇文章在简书上也发了有一阵子了。转到掘金时我又从新检查了下,补充了一些内容。可能有些地方看起来与我以前的文章有些许重复。但愿见谅。

相关文章
相关标签/搜索