重学Rx(一)


一.为何重学?
java

       从开始接触rx到如今也有几年时间,但仅仅局限于简单的用,一直处于看似懂其实不懂的状态,相比较让本身思想更加rx仍是有必定的距离。关于Flowable/Observable、cold/hot、subject/Processor,操做符结合使用,都须要去梳理清楚。android

      其次,mvvm开发模式是我心里向往的一种开发模式,但同时又对Databinding那种蛮横的数据绑定方式以及难定位语法的错误和运行时的崩溃缘由望而却步。当知道mvvm模式除了依赖Databinding以外还能够依靠rx去实现以后,更加坚决了我拥抱rx的决心。数据库

Tips:本文主要基于Rxjava 2.0复制代码

二.从哪里学呢?缓存

        从操做符的基本用法开始学?立刻我否认了这个想法。就好像自定义控件API不是重点同样,Rx的操做符也不该该是重点。重点应该放在比较异同,感觉设计目的,最后落脚点应该才是学习操做符。Rx试图统一同步和异步回调这类总结性的语言先不去说就从rx是一个扩展的观察者模式提及。既然是观察者模式就应该有被观察者(Observable)观察(Observer)两个角色和订阅(Subscribe)一个动做。第一个任务应该先熟知Observable的分类。bash

三.被观察者(Observable)分类网络

3.1按照热冷分类异步

cold Observablehot Observable,它们的区别是:mvvm

Hot Observable 不管有没有订阅,事件始终都会发生(发生的前提是调用connect,ps:connect方法是ConnectableObservable类内的方法)。当 Hot Observable 有多个订阅者时,Hot Observable 与订阅者们的关系是一对多的关系,能够与多个订阅者共享信息学习

Cold Observable 只有订阅才开始发送事件。而且Cold Observable 和订阅者只能是一对一的关系,当有多个不一样的订阅者时,消息是从新完整发送的。ui

Rx提供常见建立的Observable默认状况下都是冷流----订阅才发送。以下入建立方式


首先怎么建立热流

方式一:经过冷流转换成热流。

冷流调用publish方法转换成热流——实质Observable转换成ConnectableObservable。要触发一个热流发送数据并非去订阅,而是调用connect方法。若是不调用connect方法,即便订阅了热流也不会触发上游发送事件。

方式二:直接建立热流

咱们点进去publish方法,发现以下代码:

public final ConnectableObservable<T> publish() {
        return ObservablePublish.create(this);
    } 复制代码

能够看到其本质仍是用过ObservablePublish$create方法去建立热流的。由此能够得到直接建立热流的方法。直接建立热流也须要调用connect方法,触发上游事件

方式三: 借助 Subject(它自己是hot) 转换为 Hot Observable

subject便是被观察者又是观察者,当 Subject 做为观察者时,它能够订阅目标 Cold Observable 使对方开始发送事件。同时它又做为被观察者转发或者发送新的事件,让 Cold Observable 借助 Subject 转换为 Hot Observable。

  作为观察者接受数据:

Observable<Object> objectObservable = Observable.create(e -> {
                Observable.interval(500, TimeUnit.MILLISECONDS).subscribe(e::onNext);
            });

            PublishSubject<Object> subject = PublishSubject.create();
            objectObservable.subscribe(subject);复制代码

 作为被观察者发送数据 :

subject.subscribe(e->{});复制代码

让 Cold Observable 借助 Subject 转换为 Hot Observable。关于Subject另开一篇阐述。

方式四:replay方法

replay()方法和 publish()同样,会返回一个 ConnectableObservable,区别是, replay()会为新的subscriber重放他以前所收到的上游数据(指定缓存以前数量)。

冷流能转换成热流,那么反过来,热流能转换成冷流嘛?

不能!

       不过发现热流中有一个refCount操做符,被他调用事后返回一个Observable对象,看似转化成了冷流,其实和冷流仍是不一样的,返回的它拥有独有的特性:只要有订阅者,数据流就不会中止,若是全部的订阅者都取消订阅了,则数据流中止。更加相似android中绑定服务的特性。-----我本身叫它伴热不冷流。

一样的冷流中也提供了share方法变成一个伴热不冷流。

public final Observable<T> share() {
        return publish().refCount();
    } 复制代码

3.2按照是否支持背压分类

        背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的状况下,一种请求才去发送的策略。在rx1中有的操做符支持背压,有的不支持背压,混在一块儿。rx2中它们分开了。


  • Observeable流的建立方法,Flowable也一样拥有(Create、just  ...)。
  • Flowable默认建立的也是冷流(想要转换成热流一样有四种方法)。
    1. 经过publish方法。经过调用publish方法。
    2. 经过 FlowablePublish$create方法直接建立
    3. 经过Processor类,它和Subject是一个道理,无非是它支持背压。
    4. 经过replay()方法。

咱们去对比Observeable/Flowable,从表面上观察他们的流的思路:

不支持背压:被观察者是主动的推送数据给观察者,观察者是被动接收的。

支持背压:观察者主动从被观察者那里去拉取数据,而被观察者变成被动的等待通知再发送数据。

3.3其余的被观察者

3.3.1 Single特色

建立的是一个cold流不能直接转换成hot流,同时也不支持背压,只发射单个数据错误事件,只有onSuccess、onError回调方法,可是能够经过toXXX方法转换成Observable、Flowable、Completable以及Maybe。


3.3.2 Completable特色

建立的是一个cold流,也不能直接转换成hot流,同时也不支持背压,不发射数据,只有onComplete、onError回调方法,可是能够经过toXXX方法转换成Observable、Flowable、Single以及Maybe。


3.3.3 Maybe特色

建立的是一个cold流,也不能直接转换成hot流,同时也不支持背压。是Single和Completable的结合体。可是能够经过toXXX方法转换成Observable、Flowable、Single。


那么问题来了

1.为何要这么多类型的被观察者?这不是画蛇添足嘛?

   这个问题以我目前的水平还不能用简单直白的语言描述清楚,只能经过场景去阐述:为何用Single?仔细思考一个网络请求场景,咱们根本不须要onComplete回调,为了使用意图更加明确,因此提供了Single被观察者。为何用Completable?好比向本地数据库存数据,而后存完成后,流继续往下发送,当咱们使用Completable而后结合andThen回让使用意图包括封装更加明确。Maybe同样的道理--------使用意图更加明确

2.这些被观察者和观察者之间有什么继承关系吗?

上述五对被观察者和观察者以前功能类似、方法相同。可是经过查看源码发现:被观察者都是各自实现本身的接口,同时观察者也是各自实现本身的接口——保证了他们各自的拓展或者配套的操做符不会相互影响。这就意味着不少操做符都有多套------没有继承关系

四. 订阅方法重载

每个订阅方法都有多个重载方法:

public final Disposable subscribe() {
        return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }

    public final Disposable subscribe(Consumer<? super T> onNext) {
        return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }
  
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
        return subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete) {
        return subscribe(onNext, onError, onComplete, Functions.emptyConsumer());
    }

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {
        return ls;
    }
    public final void subscribe(Observer<? super T> observer) { 
    }复制代码

咱们能够经过返回的Disposable取消订阅,那么有一个问题来了,void返回值这个怎么取消订阅?经过观察者回调方法返回Disposable用于取消订阅。可是Flowable并无从观察者中返回这个Disposable对象。那么怎么取消订阅防止内存泄漏呢?Flowable提供了subscribeWith这个方法能够返回当前订阅的观察者,而且经过ResourceSubscriber DisposableSubscriber等观察者来提供 Disposable的接口。

小彩蛋

tips1:

observeOn 后面的全部操做都会在它指定线程工做。subscribeOn 指定的线程是从这个Observable生成一直到遇到其余 observeOn。若是程序须要屡次切换线程,使用屡次observeOn是彻底能够的。 而subscribeOn只有最上方的subscribeOn会起做用

tips2:

尽可能避免过多的使用操做符,由于每一个操做符都会根据操做符的特性生成新的Observable,订阅他的上游而后给下游发送数据,避免使用过多的操做符能够下降内存抖动。

tips3

connect方法返回一个Disposable对象,用来取消热流发送数据。

相关文章
相关标签/搜索