RxJava2 实战知识梳理(12) 实战讲解 publish & replay & share & refCount & autoCo

1、前言

今天,咱们来整理如下几个你们容易弄混的概念,并用实际例子来演示,能够从 RxSample 的第十二章中获取:java

  • publish
  • reply
  • ConnectableObservable
  • connect
  • share
  • refCount
  • autoConnect

对于以上这些概念,能够用一幅图来归纳: git

从图中能够看出,这里面能够供使用者订阅的 Observable能够分为四类,下面咱们将逐一介绍这几种 Observable的特色:

  • 第一类:Cold Observable,就是咱们经过Observable.createObservable.interval等建立型操做符生成的Observable
  • 第二类:由Cold Observable通过publish()或者replay(int N)操做符转换成的ConnectableObservable
  • 第三类:由ConnectableObservable通过refCount(),或者由Cold Observable通过share()转换成的Observable
  • 第四类:由ConnectableObservable通过autoConnect(int N)转换成的Observable

2、Cold Observable

Cold Observable就是咱们经过Observable.createObservable.interval等建立型操做符生成的Observable,它具备如下几个特色:github

  • 当一个订阅者订阅Cold Observable时,Cold Observable会从新开始发射数据给该订阅者。
  • 当多个订阅者订阅到同一个Cold Observable,它们收到的数据是相互独立的。
  • 当一个订阅者取消订阅Cold Observable后,Cold Observable会中止发射数据给该订阅者,但不会中止发射数据给其它订阅者。

下面,咱们演示一个例子,首先咱们建立一个Cold Observable缓存

//直接订阅Cold Observable。
    private void createColdSource() {
        mConvertObservable = getSource();
    }

    private Observable<Integer> getSource() {
        return Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                try {
                    int i = 0;
                    while (true) {
                        Log.d(TAG, "源被订阅者发射数据=" + i + ",发送线程ID=" + Thread.currentThread().getId());
                        mSourceOut.add(i);
                        observableEmitter.onNext(i++);
                        updateMessage();
                        Thread.sleep(1000);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).subscribeOn(Schedulers.io());
    }
复制代码

在建立两个订阅者,它们能够随时订阅到Cold Observable或者取消对它的订阅:ide

private void startSubscribe1() {
        if (mConvertObservable != null && mDisposable1 == null) {
            mDisposable1 = mConvertObservable.subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "订阅者1收到数据=" + integer + ",接收线程ID=" + Thread.currentThread().getId());
                    mSubscribe1In.add(integer);
                    updateMessage();
                }
            });
        }
    }

    private void disposeSubscribe1() {
        if (mDisposable1 != null) {
            mDisposable1.dispose();
            mDisposable1 = null;
            mSubscribe1In.clear();
            updateMessage();
        }
    }

    private void startSubscribe2() {
        if (mConvertObservable != null && mDisposable2 == null) {
            mDisposable2 = mConvertObservable.subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "订阅者2收到数据=" + integer + ",接收线程ID=" + Thread.currentThread().getId());
                    mSubscribe2In.add(integer);
                    updateMessage();
                }
            });
        }
    }

    private void disposeSubscribe2() {
        if (mDisposable2 != null) {
            mDisposable2.dispose();
            mDisposable2 = null;
            mSubscribe2In.clear();
            updateMessage();
        }
    }
复制代码

为了验证以前说到的几个特色,进入程序以后,咱们会先建立该Cold Observable,以后进行一系列的操做,效果以下: spa

在上面的图中,咱们作了一下几步操做:

  • 第一步:启动应用,建立Cold Observable,这时候Cold Observable没有发送任何数据。
  • 第二步:Observer1订阅Observable,此时Cold Observable开始发送数据,Observer1也能够收到数据,即 一个订阅者订阅 Cold Observable 时, Cold Observable 会开始发射数据给该订阅者
  • 第三步:Observer2订阅Observable,此时Observable2也能够收到数据,可是它和Observable1收到的数据是相互独立的,即 当多个订阅者订阅到同一个 Cold Observable ,它们收到的数据是相互独立的
  • 第四步:Observer1取消对Observable的订阅,这时候Observer1收不到数据,而且Observable也不会发射数据给它,可是仍然会发射数据给Observer2,即 当一个订阅者取消订阅 Cold Observable 后,Cold Observable 会中止发射数据给该订阅者,但不会中止发射数据给其它订阅者
  • 第五步:Observer1从新订阅Observable,这时候Observable0开始发射数据给Observer1,即 一个订阅者订阅 Cold Observable 时, Cold Observable 会从新开始发射数据给该订阅者

3、由 Cold Observable 转换的 ConnectableObservable

在了解完Cold Observable以后,咱们再来看第二类的Observable,它的类型为ConnectableObservable,它是经过Cold Observable通过下面两种方式生成的:线程

  • .publish()
  • .reply(int N)

若是使用.publish()建立,那么订阅者只能收到在订阅以后Cold Observable发出的数据,而若是使用reply(int N)建立,那么订阅者在订阅后能够收到Cold Observable在订阅以前发送的N个数据。3d

咱们先以publish()为例,介绍ConnectableObservable的几个特色:code

  • 不管ConnectableObservable有没有订阅者,只要调用了ConnectableObservableconnect方法,Cold Observable就开始发送数据。
  • connect会返回一个Disposable对象,调用了该对象的dispose方法,Cold Observable将会中止发送数据,全部ConnectableObservable的订阅者也没法收到数据。
  • 在调用connect返回的Disposable对象后,若是从新调用了connect方法,那么Cold Observable会从新发送数据。
  • 当一个订阅者订阅到ConnectableObservable后,该订阅者会收到在订阅以后,Cold Observable发送给ConnectableObservable的数据。
  • 当多个订阅者订阅到同一个ConnectableObservable时,它们收到的数据是相同的。
  • 当一个订阅者取消对ConnectableObservable,不会影响其余订阅者收到消息。

下面,咱们建立一个ConnectableObservable,两个订阅者以后会订阅到它,而不是Cold Observablecdn

//.publish()将源Observable转换成为HotObservable,当调用它的connect方法后,不管此时有没有订阅者,源Observable都开始发送数据,订阅者订阅后将能够收到数据,而且订阅者解除订阅不会影响源Observable数据的发射。
    public void createPublishSource() {
        mColdObservable = getSource();
        mConvertObservable = mColdObservable.publish();
        mConvertDisposable = ((ConnectableObservable<Integer>) mConvertObservable).connect();
    }
复制代码

和上面同样,仍是用一个例子来演示,该例子的效果为:

  • 第一步:启动应用,经过Cold Observablepublish方法建立ConnectableObservable,并调用ConnectableObservableconnect方法,能够看到,此时虽然ConnectableObservable没有任何订阅者,可是Cold Observable也已经开始发送数据。
  • 第二步:Observer1订阅到ConnectableObservable,此时它只能收到订阅以后Cold Observable发射的数据。
  • 第三步:Observer2订阅到ConnectableObservableCold Observable只会发射一份数据,而且Observer1Observer2收到的数据是相同的。
  • 第三步:Observer1取消对ConnectableObservable的订阅,Cold Observable仍然会发射数据,Observer2仍然能够收到Cold Observable发射的数据。
  • 第四步:Observer1从新订阅ConnectableObservable,和第三步相同,它仍然只会收到订阅以后Cold Observable发射的数据。
  • 第五步:经过connect返回的Disposable对象,调用dispose方法,此时Cold Observable中止发射数据,而且Observer1Observer2都收不到数据。

上面这些现象发生的根本缘由在于:如今ObserverObserver2都是订阅到ConnectableObservable,真正产生数据的Cold Observable并不知道他们的存在,和它交互的是ConnectableObservableConnectableObservable至关于一个中介,它完成下面两项任务:

  • 对于上游:经过connectdispose方法决定是否要订阅到Cold Observer,也就是决定了Cold Observable是否发送数据。
  • 对于下游:将Cold Observable发送的数据转交给它的订阅者。

4、由 ConnectableObservable 转换成 Observable

ConnectableObservable转换成Observable有两种方法,咱们分为两节介绍下当订阅到转换后的Observable时的现象:

  • .refCount()
  • .autoConnect(int N)

4.1 ConnectableObservable 由 refCount 转换成 Observable

通过refCount方法,ConnectableObservable能够转换成正常的Observable,咱们称为refObservable,这里咱们假设ConnectableObservable是由Cold Observable经过publish()方法转换的,对于它的订阅者,有如下几个特色:

  • 第一个订阅者订阅到refObservable后,Cold Observable开始发送数据。
  • 以后的订阅者订阅到refObservable后,只能收到在订阅以后Cold Observable发送的数据。
  • 若是一个订阅者取消订阅到refObservable后,假如它是当前refObservable的惟一一个订阅者,那么Cold Observable会中止发送数据;不然,Cold Observable仍然会继续发送数据,其它的订阅者仍然能够收到Cold Observable发送的数据。

接着上例子,咱们建立一个refObservable

//.share()至关于.publish().refCount(),当有订阅者订阅时,源订阅者会开始发送数据,若是全部的订阅者都取消订阅,源Observable就会中止发送数据。
    private void createShareSource() {
        mColdObservable = getSource();
        mConvertObservable = mColdObservable.publish().refCount();
    }
复制代码

示例以下:

操做分为如下几步:

  • 第一步:经过.publish().refCount()建立由ConnectableObservable转换后的refObservable,此时Cold Observable没有发送任何消息。
  • 第二步:Observer1订阅到refObservableCold Observable开始发送数据,Observer1接收数据。
  • 第三步:Observer2订阅到refObservable,它只能收到在订阅以后Cold Observable发送的数据。
  • 第四步:Observer1取消订阅,Cold Observable继续发送数据,Observer2仍然能收到数据。
  • 第五步:Observer2取消订阅,Cold Observable中止发送数据。
  • 第六步:Observer1从新订阅,Cold Observable从新开始发送数据。

最后说明一点:订阅到Cold Observable.publish().refCount()Cold Observableshare()所返回的Observable是等价的。

4.2 ConnectableObservable 由 autoConnect(int N) 转换成 Observable

autoConnect(int N)refCount很相似,都是将ConnectableObservable转换成普通的Observable,咱们称为autoObservable,一样咱们先假设ConnectableObservable是由Cold Observable经过publish()方法生成的,它有如下几个特色:

  • 当有N个订阅者订阅到refObservable后,Cold Observable开始发送数据。
  • 以后的订阅者订阅到refObservable后,只能收到在订阅以后Cold Observable发送的数据。
  • 只要Cold Observable开始发送数据,即便全部的autoObservable的订阅和都取消了订阅,Cold Observable也不会中止发送数据,若是想要Cold Observable中止发送数据,那么可使用autoConnect(int numberOfSubscribers, Consumer connection)Consumer返回的Disposable,它的做用和ConnectableObservableconnect方法返回的Disposable相同。

其建立方法以下所示:

//.autoConnect在有指定个订阅者时开始让源Observable发送消息,可是订阅者是否取消订阅不会影响到源Observable的发射。
    private void createAutoConnectSource() {
        mColdObservable = getSource();
        mConvertObservable = mColdObservable.publish().autoConnect(1, new Consumer<Disposable>() {
            @Override
            public void accept(Disposable disposable) throws Exception {
                mConvertDisposable = disposable;
            }
        });
    }
复制代码

示例效果以下:

咱们进行了以下几步操做:

  • 第一步:启动应用,建立autoConnect转换后的autoObservable
  • 第二步:Observer1订阅到autoObservable,此时知足条件,Cold Observable开始发送数据。
  • 第三步:Observer2订阅到autoObservable,它只能收到订阅后发生的数据。
  • 第四步:Observer1取消订阅,Cold Observable继续发送数据,Observer2仍然能够收到数据。
  • 第五步:Observer2取消订阅,Cold Observable仍然继续发送数据。
  • 第六步:Observer2订阅到autoObservable,它只能收到订阅后发送的消息了。
  • 第七步:调用mConvertDisposabledisposeCold Observable中止发送数据。

5、publish 和 reply(int N) 的区别

在上面的例子当中,全部总结的特色都是创建在ConnectableObservable是由publish()生成,只因此这么作,是为了方便你们理解,不管是订阅到ConnectableObservable,仍是由ConnectableObservable转换的refObservableautoObservable,使用这两种方式建立的惟一区别就是,订阅者在订阅后,若是是经过publish()建立的,那么订阅者以后收到订阅后Cold Observable发送的数据;而若是是reply(int N)建立的,那么订阅者还能额外收到N个以前Cold Observable发送的数据,咱们用下面一个小例子来演示,订阅者订阅到的Observable以下:

//.reply会让缓存源Observable的N个数据项,当有新的订阅者订阅时,它会发送这N个数据项给它。
    private void createReplySource() {
        mColdObservable = getSource();
        mConvertObservable = mColdObservable.replay(3);
        mConvertDisposable = ((ConnectableObservable<Integer>) mConvertObservable).connect();
    }
复制代码

示例演示效果:

操做步骤:

  • 第一步:启动应用,经过Cold Observablepublish方法建立ConnectableObservable,并调用ConnectableObservablereplay(3)方法,能够看到,此时虽然ConnectableObservable没有任何订阅者,可是Cold Observable也已经开始发送数据。
  • 第二步:Observer1订阅到ConnectableObservable,此时它会先收到以前发射的3个数据,以后收到订阅以后Cold Observable发射的数据。

最后再提一下,更详细的代码你们能够从 RxSample 的第十二章中获取。


更多文章,欢迎访问个人 Android 知识梳理系列:

相关文章
相关标签/搜索