RxJava链接操做符

RxJava系列教程:css

1. RxJava使用介绍 【视频教程】
2. RxJava操做符
  • Creating Observables(Observable的建立操做符) 【视频教程】
  • Transforming Observables(Observable的转换操做符) 【视频教程】
  • Filtering Observables(Observable的过滤操做符) 【视频教程】
  • Combining Observables(Observable的组合操做符) 【视频教程】
  • Error Handling Operators(Observable的错误处理操做符) 【视频教程】
  • Observable Utility Operators(Observable的辅助性操做符) 【视频教程】
  • Conditional and Boolean Operators(Observable的条件和布尔操做符) 【视频教程】
  • Mathematical and Aggregate Operators(Observable数学运算及聚合操做符) 【视频教程】
  • 其余如observable.toList()、observable.connect()、observable.publish()等等; 【视频教程】
3. RxJava Observer与Subcriber的关系 【视频教程】
4. RxJava线程控制(Scheduler) 【视频教程】
5. RxJava 并发之数据流发射太快如何办(背压(Backpressure)) 【视频教程】java


RxJava链接操做符

目录

ConnectableObservable 和它的子类以及它们的操做符:web

  • ConnectableObservable.connect() — 指示一个可链接的Observable开始发射数据
  • Observable.publish() — 将一个Observable转换为一个可链接的Observable
  • Observable.replay() — 确保全部的订阅者看到相同的数据序列,即便它们在Observable开始发射数据以后才订阅
  • ConnectableObservable.refCount() — 让一个可链接的Observable表现得像一个普通的Observable

一个可链接的Observable与普通的Observable差很少,除了这一点:可链接的Observable在被订阅时并不开始发射数据,只有在它的connect()被调用时才开始。用这种方法,你能够等全部的潜在订阅者都订阅了这个Observable以后才开始发射数据。缓存


Publish

Publish 操做符将普通的Observable转换为可链接的Observable(ConnectableObservable)ConnectableObservable是Observable的子类。 可链接的Observable (connectable Observable)与普通的Observable差很少,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操做符时才会开始,这样能够更灵活的控制发射数据的时机。 并发

注意:若是一个ConnectableObservable已经开始发射数据,再对其进行订阅只能接受以后发射的数据,订阅以前已经发射过的数据就丢失了。less

publish

示例代码

Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS);
        //使用publish操做符将普通Observable转换为可链接的Observable
        ConnectableObservable<Long> connectableObservable = observable.publish();

        //第一个订阅者订阅,不会开始发射数据
        connectableObservable.subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                System.out.println("1.onCompleted");
            }
            @Override
            public void onError(Throwable e) {
                System.out.println("1.onError");
            }
            @Override
            public void onNext(Long value) {
                System.out.println("1.onNext value :"+ value);
            }
        });

        //若是不调用connect方法,connectableObservable则不会发射数据
        connectableObservable.connect();
        //第二个订阅者延迟2s订阅,这将致使丢失前面2s内发射的数据
        connectableObservable
                .delaySubscription(2, TimeUnit.SECONDS)// 0、1数据丢失
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("2.onCompleted");
                    }
                    @Override
                    public void onError(Throwable e) {
                        System.out.println("2.onError");
                    }
                    @Override
                    public void onNext(Long value) {
                        System.out.println("2.onNext value :"+ value);
                    }
                });
        //eclipse下运行加上下面代码,Android Studio则不须要
        Thread.sleep(6000);

输出结果以下:eclipse

1.onNext value :0
1.onNext value :1
1.onNext value :2
2.onNext value :2
1.onNext value :3
2.onNext value :3
1.onNext value :4
2.onNext value :4
1.onNext value :5
2.onNext value :5

不管connect方法什么时候调用,只要被调用后全部的订阅者都能发射数据。ide

Connect

RxJava中connect是ConnectableObservable接口的一个方法,使用publish操做符能够将一个普通的Observable转换为一个ConnectableObservable。svg

调用ConnectableObservable的connect方法会让它后面的Observable开始给发射数据给订阅者。spa

connect方法返回一个Subscription对象,能够调用它的unsubscribe方法让Observable中止发射数据给观察者。

即便没有任何订阅者订阅它,你也可使用connect方法让一个Observable开始发射数据(或者开始生成待发射的数据)。这样,你能够将一个”冷”的Observable变为”热”的。

示例代码

Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS);
        //使用publish操做符将普通Observable转换为可链接的Observable
        ConnectableObservable<Long> connectableObservable = observable.publish();
        //开始发射数据,若是不调用connect方法,connectableObservable则不会发射数据
        Subscription subscription = connectableObservable.connect();

        //第二个订阅者延迟2s订阅,这将致使丢失前面2s内发射的数据
        connectableObservable
                .delaySubscription(2, TimeUnit.SECONDS)// 0、1数据丢失
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }
                    @Override
                    public void onError(Throwable e) {
                        System.out.println("onError");
                    }
                    @Override
                    public void onNext(Long value) {
                        System.out.println("onNext value :"+ value);
                    }
                });

        //5秒后取消订阅
        Observable.interval(1, TimeUnit.SECONDS)
        .take(5)
        .subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted2");
                subscription.unsubscribe();//取消订阅
            }
            @Override
            public void onError(Throwable e) {
                System.out.println("onError2");
            }
            @Override
            public void onNext(Long along) {
                System.out.println("onNext2:"+along);
            }
        });

        //eclipse下运行加上下面代码,Android Studio则不须要
        Thread.sleep(10000);

输出结果以下:

onNext2:0
onNext2:1
onNext value :2
onNext2:2
onNext value :3
onNext2:3
onNext value :4
onNext2:4
onCompleted2

RefCount

RefCount操做符能够看作是Publish的逆向,它能将一个ConnectableObservable对象再从新转化为一个普通的Observable对象,若是转化后有订阅者对其进行订阅将会开始发射数据,后面若是有其余订阅者订阅,将只能接受后面的数据(这也是转化以后的Observable 与普通的Observable的一点区别 )。

还有一个操做符叫share,它的做用等价于对一个Observable同时应用publish和refCount操做。

RefCount

示例代码

//建立一个可链接的Observable
        ConnectableObservable<Long> connectableObservable = Observable.interval(1, TimeUnit.SECONDS).take(6)
                .publish();

        connectableObservable.subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted1.");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError1: " + e.getMessage());
            }

            @Override
            public void onNext(Long along) {
                System.out.println("onNext1: " + along);
            }
        });

        connectableObservable.delaySubscription(3, TimeUnit.SECONDS)
        .subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted2.");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError2: " + e.getMessage());
            }

            @Override
            public void onNext(Long along) {
                System.out.println("onNext2: " + along);
            }
        });

        //若是不调用connect方法,connectableObservable则不会发射数据
        connectableObservable.connect();

        System.out.println("------after refCount()------");

        Observable<Long> observable = connectableObservable.refCount();

        observable.subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted3.");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError3: " + e.getMessage());
            }

            @Override
            public void onNext(Long along) {
                System.out.println("onNext3: " + along);
            }
        });

        observable.delaySubscription(3, TimeUnit.SECONDS)
        .subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted4.");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError4: " + e.getMessage());
            }

            @Override
            public void onNext(Long along) {
                System.out.println("onNext4: " + along);
            }
        });
        Thread.sleep(10000);

输出结果以下:

------after refCount()------
onNext1: 0
onNext3: 0
onNext1: 1
onNext3: 1
onNext1: 2
onNext3: 2
onNext1: 3
onNext3: 3
onNext2: 3
onNext4: 3
onNext1: 4
onNext3: 4
onNext2: 4
onNext4: 4
onNext1: 5
onNext3: 5
onNext2: 5
onNext4: 5
onCompleted1.
onCompleted3.
onCompleted2.
onCompleted4.

由运行结果能够看出,RefCount操做符将一个Connectable Observable 对象从新转化为一个普通的Observable对象,这时候订阅者进行订阅将会触发数据的发射。

Replay

使用Replay操做符返回的ConnectableObservable 会缓存订阅者订阅以前已经发射的数据,这样即便有订阅者在其发射数据开始以后进行订阅也能收到以前发射过的数据。Replay操做符能指定缓存的大小或者时间,这样能避免耗费太多内存。

示例代码:

//建立一个可链接的Observable
        ConnectableObservable<Long> connectableObservable = Observable.interval(1, TimeUnit.SECONDS).take(5)
                .publish();
        //若是不调用connect方法,connectableObservable则不会发射数据
        connectableObservable.connect();
        connectableObservable.delaySubscription(3, TimeUnit.SECONDS)//延时订阅
        .subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted1.");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError1: " + e.getMessage());
            }

            @Override
            public void onNext(Long along) {
                System.out.println("onNext1: " + along);
            }
        });

        //建立一个可链接的Observable
        ConnectableObservable<Long> connectableObservable2 = Observable.interval(1, TimeUnit.SECONDS).take(6)
                .replay(1);//这里不在使用publish,replay(1)缓存1个数据

        //若是不调用connect方法,connectableObservable则不会发射数据
        connectableObservable2.connect();
        connectableObservable2.delaySubscription(3, TimeUnit.SECONDS)//延时订阅
        .subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted2.");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError2: " + e.getMessage());
            }

            @Override
            public void onNext(Long along) {
                System.out.println("onNext2: " + along);
            }
        });

        //建立一个可链接的Observable
        ConnectableObservable<Long> connectableObservable3 = Observable.interval(1, TimeUnit.SECONDS).take(6)
                .replay(3, TimeUnit.SECONDS);//这里不在使用publish,replay(3, TimeUnit.SECONDS)缓存3s内的数据

        //若是不调用connect方法,connectableObservable则不会发射数据
        connectableObservable3.connect();
        connectableObservable3.delaySubscription(3, TimeUnit.SECONDS)//延时订阅
        .subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted3.");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError3: " + e.getMessage());
            }

            @Override
            public void onNext(Long along) {
                System.out.println("onNext3: " + along);
            }
        });

输出结果以下:

onNext3: 0
onNext3: 1
onNext3: 2

onNext2: 2 onNext1: 3 onNext2: 3 onNext3: 3 onNext1: 4 onNext2: 4 onNext3: 4 onNext1: 5 onCompleted1. onNext3: 5 onNext2: 5 onCompleted2. onCompleted3.