RxJava操做符

文章目录


RxJava添加依赖:

dependencies {
    implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
    implementation "io.reactivex.rxjava2:rxjava:2.2.8"
}

其中rxandroid是在android系统上针对对rxjava作的扩展,若是你是android开发,那么可使用rxandroid便可。可是rxandroid官方说最好也同时依赖rxjava的最新版本,以便修复未知bug和使用最新的特性。java

RxJava建立型操做符

create()操做符

做用:完整建立1个被观察者对象(Observable)
示例代码:react

//经过Observable.create() 建立一个被观察者 Observable 对象
      Observable.create(new ObservableOnSubscribe<Integer>() {
            /** 在subscribe()里定义须要发送的事件 */
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                // 经过 ObservableEmitter类对象产生 & 发送事件
                // ObservableEmitter类负责定义事件发射器 & 向观察者发送事件
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {//订阅一个观察者对象

 			// 默认最早调用复写的 onSubscribe()
           @Override
           public void onSubscribe(Disposable d) {
               Log.d(TAG, "开始采用subscribe链接");
           }
          

           @Override
           public void onNext(Integer value) {
               Log.d(TAG, "接收到了事件"+ value  );
           }

           @Override
           public void onError(Throwable e) {
               Log.d(TAG, "对Error事件做出响应");
           }

           @Override
           public void onComplete() {
               Log.d(TAG, "对Complete事件做出响应");
           }

       });

输出:
在这里插入图片描述
从第一个例子分析RxJava的调用机制:android

既然说RxJava是基于观察者模式的,那咱们就从第一个例子开刀来分析一下它是如何实现观察者模式的。web

首先, Observable.create() 在建立一个被观察者 Observable 对象时,传入参数是一个ObservableOnSubscribe接口的实现,而 这个方法返回的实际是一个ObservableCreate类的对象:
在这里插入图片描述
在这里插入图片描述
create()传入的接口被这个ObservableCreate对象持有, 当 Observable 对象被订阅时,Observable.subscribe()方法内部会调用它本身的subscribeActual()方法:
在这里插入图片描述
这个方法实际上是Observable类的一个抽象方法:
在这里插入图片描述
它的实现有不少类,而ObservableCreate就是它的实现类之一,在ObservableCreate类的subscribeActual()方法中则会调用ObservableOnSubscribe接口的subscribe()方法:
在这里插入图片描述
这个source就是 Observable.create() 在建立被观察者对象时传入的接口实现对象,而调用subscribe()方法时传入的参数parent,在内部的实现类实际是CreateEmitter(ObservableCreate类的静态内部类),在CreateEmitter建立时持有了Observer对象:
在这里插入图片描述
CreateEmitter类的onNext、onError、onComplete方法中分别调用了它持有的Observer对象的onNext、onError、onComplete方法:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
这样就会触发观察者依次调用对应事件的onNext实现方法从而响应事件,即事件序列会依照设定依次被触发,从而完成了被观察者向观察者发送事件的过程。(被观察者调用了观察者的回调方法 ,即观察者模式)数组

好了,第一个建立操做符的过程搞清楚了,其余建立型的操做符基本大同小异了。缓存

just()操做符

做用:快速建立1个被观察者对象,直接发送传入的事件
只能发送10个如下的事件
示例代码:网络

public void just() {
        // 1. 建立时传入整型一、二、三、4, 快速建立被观察者对象(Observable)最多只能发送10个如下事件
        // 在建立后就会发送这些对象,至关于执行了onNext(1)、onNext(2)、onNext(3)、onNext(4)
        Observable.just(1, 2, 3, 4).subscribe(new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "开始采用subscribe链接");
            }
            // 默认最早调用复写的 onSubscribe()

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件" + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件做出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件做出响应");
            }

        });
    }

输出:
在这里插入图片描述数据结构

fromArray()操做符

做用:快速建立1个被观察者对象,直接发送传入的数组数据,能够发送10个以上事件,可用于数组遍历
示例代码:app

public void fromArray() {
        // 1. 设置须要传入的数组, 快速建立被观察者对象(Observable) & 可发送10个以上事件(数组形式)
        // 可用于对数组进行遍历
        // 注:若直接传递一个list集合进去,不然会直接把list当作一个数据元素发送
        Integer[] items = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
        Observable.fromArray(items).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "开始采用subscribe链接");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件" + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件做出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件做出响应");
            }
        });
    }

输出:
在这里插入图片描述ide

fromIterable()操做符

做用:快速建立1个被观察者对象,直接发送 传入的集合List数据,可发送10个以上事件,可用于集合元素遍历
示例代码:

public void fromIterable() {
        // 1. 设置一个集合
        List<Integer> list = new ArrayList<>();
        for (int i = 0; i < 20; i++) {
            list.add(i+1);
        }
        // 2. 经过fromIterable()将集合中的对象 / 数据发送出去
        Observable.fromIterable(list)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "开始采用subscribe链接");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件做出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件做出响应");
                    }
                });
    }

输出:
在这里插入图片描述

differ()操做符

做用:动态建立(或延迟建立)被观察者对象,直到有观察者(Observer )订阅时,才动态建立被观察者对象(Observable) & 发送事件。

经过 Observable工厂方法建立被观察者对象(Observable),每次订阅后,都会获得一个刚建立的最新的Observable对象,这能够确保Observable对象里的数据是最新的。

示例代码:

public void differ() {
        // 经过defer 定义被观察者对象 注:此时被观察者对象还没建立
        Observable<Long> observable = Observable.defer(new Callable<ObservableSource<? extends Long>>() {
            @Override
            public ObservableSource<? extends Long> call() throws Exception {
                Log.d(TAG, "Observable建立的时间戳: "+System.currentTimeMillis());
                return Observable.just(System.currentTimeMillis());
            }
        });

        Log.d(TAG, "observable.subscribe的时间戳: "+System.currentTimeMillis());
        //观察者开始订阅 注:此时,才会调用defer() 建立被观察者对象(Observable)
        observable.subscribe(new Observer<Long>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "开始采用subscribe链接");
            }

            @Override
            public void onNext(Long value) {
                Log.d(TAG, "接收到的时间戳是" + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件做出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件做出响应");
            }
        });
    }

输出:
在这里插入图片描述
可见,observable的建立时间是在observable.subscribe以后。

timer()操做符

做用:快速建立1个被观察者对象, 延时指定时间发送一个事件(发送1个Long型的数值0)
示例代码:

public void timer() {
        //延时2s发送一个事件, 会发送一个long类型数值0,等同于延时2s后调用一次onNext(0)触发事件
        Observable.timer(2, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "开始采用subscribe链接");
                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "接收到了事件" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件做出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件做出响应");
                    }
                });
    }

输出:
在这里插入图片描述

interval()操做符

做用:快速建立1个被观察者对象,每隔指定时间发送1个事件(从0开始无限递增1的整数序列)
示例代码:

public void interval() {
        // 延迟3s后,每隔1秒发送1个事件, 产生1个数字(从0开始递增1,无限个)
        Observable.interval(3, 1, TimeUnit.SECONDS)
        		//.interval(300, TimeUnit.MILLISECONDS) //每一个0.3秒发送一个事件
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "开始采用subscribe链接");
                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "接收到了事件"+ value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件做出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件做出响应");
                    }

                });
    }

输出:
在这里插入图片描述

intervalRange()操做符

做用:快速建立1个被观察者对象,每隔指定时间发送1个事件,可指定发送数据的起始值和数量
示例代码:

public void intervalRange() {
        //延时3s后,每隔2s发送一个事件,事件序列:从5开始递增,总共发送10个事件
        //前四个参数含义为:long start, long count, long initialDelay, long period
        Observable.intervalRange(5, 10, 3, 2, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "开始采用subscribe链接");
                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "接收到了事件"+ value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件做出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件做出响应");
                    }
                });
    }

输出:
在这里插入图片描述

range()操做符

做用:快速建立1个被观察者对象, 连续发送 1个指定范围的事件序列(无延时)
示例代码:

public void range() {
        // 从2开始发送10个事件, 每次发送的事件递增1
        // final int start, final int count
        // 注意:参数是int型的,count必须大于0,且知足start + (count - 1) <= Integer.MAX_VALUE
        Observable.range(2, 10)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "开始采用subscribe链接");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件做出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件做出响应");
                    }

                });
    }

输出:
在这里插入图片描述

rangeLong()操做符

做用:快速建立1个被观察者对象, 连续发送 1个指定范围的事件序列(无延时)
与range操做符的不一样是:range发送的是int型数据,rangeLong()发送的是long型的数据
示例代码:

public void rangeLong() {
        // 从2开始发送10个事件, 每次发送的事件递增1
        // long start, long count
        // 注意:参数是long型的,count必须大于0,且知足start + (count - 1) <= Long.MAX_VALUE
        Observable.rangeLong(2, 10)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "开始采用subscribe链接");
                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "接收到了事件" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件做出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件做出响应");
                    }

                });
    }

输出:
在这里插入图片描述

其余

下列方法通常用于测试使用

public void other() {
        // 该方法建立的被观察者对象发送事件的特色:仅发送Complete事件,直接通知完成,
        // 即观察者接收后会直接调用onCompleted()不会调用onNext
        Observable observable1 = Observable.empty();
        observable1.subscribe(observer);

        // 该方法建立的被观察者对象发送事件的特色:仅发送Error事件,直接通知异常,
        // 即观察者接收后会直接调用onError()
        Observable observable2 = Observable.error(new RuntimeException());
        observable2.subscribe(observer);

        // 该方法建立的被观察者对象发送事件的特色:不发送任何事件, 即观察者接收后不会调用onComplete onNext
        Observable observable3 = Observable.never();
        observable3.subscribe(observer);
    }

RxJava变换操做符

map()操做符

做用:对被观察者发送的每1个事件都经过指定的函数处理,从而变换成另一种事件, 可用于数据类型转换
示例代码:

public void map() {
        Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                // 被观察者发送事件, 参数为整型一、二、3
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
            //使用Map变换操做符中的Function函数对被观察者发送的事件进行统一变换:整型变换成字符串类型
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return "使用 Map变换操做符 将事件" + integer + "的参数从 整型" + integer + " 变换成 字符串类型" + integer;
            }
        }).subscribe(new Consumer<String>() {

            //观察者接收到是变换后的事件: 字符串类型
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });
    }

输出:
在这里插入图片描述

flatMap()操做符

做用:将被观察者发送的事件序列进行 拆分 & 单独转换,再合并成一个新的事件序列,最后再进行发送.
将Observable每一次发射的事件都转换成一个新的Observable,最好将这些Observable合并成一个新的Observable发送给观察者,但不能保证观察者是按照原始序列的顺序收到事件的。
无序的将被观察者发送的整个事件序列进行变换。
示例代码:

public void flatMap() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
            // 采用flatMap()变换操做符
        }).flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                // 经过flatMap中将被观察者生产的事件序列先进行拆分,
                // 将每一个事件转换为一个新的发送三个String的事件,最终合并,再发送给被观察者
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("我是事件 " + integer + "拆分后的子事件" + i);
                }
                return Observable.fromIterable(list);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });
    }

输出:
在这里插入图片描述
map操做符的详细介绍和实例能够看这篇:https://www.jianshu.com/p/128e662906af

concatMap()操做符

做用:相似于flatMap(),可是保证观察者收到的事件顺序是严格按照原始事件序列发送的顺序。
有序的将被观察者发送的整个事件序列进行变换
示例代码:

public void concatMap() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
            //采用concatMap()变换操做符
        }).concatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                // 经过concatMap中将被观察者生产的事件序列先进行拆分,
                // 将每一个事件转换为一个新的发送三个String的事件,最终合并,再发送给被观察者
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) {
                    list.add("我是事件 " + integer + "拆分后的子事件" + i);
                }
                return Observable.fromIterable(list);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });
    }

buffer()操做符

做用:每次从被观察者(Obervable)须要发送的事件中获取必定数量的事件进行发送。

示例代码:

public void buffer() {
    	//每次从1, 2, 3, 4, 5发送3个数据,每次发送日后移动一个数量的位置
        Observable.just(1, 2, 3, 4, 5)
                .buffer(3, 1) // 设置缓存区大小 & 步长,
                // 缓存区大小: 每次从被观察者中获取的事件数量
                // 步长: 每次须要发送新事件时须要日后移动的位置
                .subscribe(new Observer<List<Integer>>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }

                    @Override
                    public void onNext(List<Integer> stringList) {
                        Log.d(TAG, " 缓存区里的事件数量 = " + stringList.size());
                        for (Integer value : stringList) {
                            Log.d(TAG, " 事件 = " + value);
                        }
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件做出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件做出响应");
                    }
                });
    }

输出:
在这里插入图片描述

RxJava组合/合并操做符

concat()操做符

做用:组合多个被观察者一块儿发送数据,合并后按发送顺序串行执行
示例代码:

public void concat() {
        // concat():组合多个被观察者(≤4个)一块儿发送数据
        // 注:串行执行
        Observable.concat(Observable.just(1, 2),
                Observable.just(3, 4),
                Observable.just(5, 6),
                Observable.just(7, 8))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件做出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件做出响应");
                    }
                });
    }

输出:
在这里插入图片描述

concatArray()操做符

做用:同contact, 组合多个被观察者一块儿发送数据,合并后按发送顺序串行执行(可超过4个参数)
示例代码:

public void concatArray() {
        // concatArray():组合多个被观察者一块儿发送数据(可>4个)
        // 注:串行执行
        Observable.concatArray(Observable.just(1, 2),
                Observable.just(3, 4),
                Observable.just(5, 6),
                Observable.just(7, 8),
                Observable.just(9, 10))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件做出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件做出响应");
                    }
                });
    }

输出:
在这里插入图片描述
注:使用concat(Iterable<? extends ObservableSource<? extends T>> sources)也能够发送一个Observable的List集合(超过4个)。

merge()操做符

做用:组合多个被观察者一块儿发送数据,合并后按时间线并行执行
示例代码:

public void merge() {
        // merge():组合多个被观察者(≤4个)一块儿发送数据
        // 注:合并后按照时间线并行执行
        Observable.merge(
                Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), // 从0开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
                Observable.intervalRange(2, 3, 1, 1, TimeUnit.SECONDS)) // 从2开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件做出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件做出响应");
                    }
                });

    }

输出:
在这里插入图片描述

mergeArray()操做符

做用:组合多个被观察者一块儿发送数据,合并后按时间线并行执行(可merge超过 4个)
示例代码:

public void mergeArray() {
        // mergeArray():组合多个被观察者一块儿发送数据(可>4个)
        // 注:合并后按照时间线并行执行
        Observable.mergeArray(
                Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS),
                Observable.intervalRange(3, 3, 1, 1, TimeUnit.SECONDS),
                Observable.intervalRange(6, 3, 1, 1, TimeUnit.SECONDS),
                Observable.intervalRange(9, 3, 1, 1, TimeUnit.SECONDS),
                Observable.intervalRange(13, 3, 1, 1, TimeUnit.SECONDS))
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件做出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件做出响应");
                    }
                });

    }

输出:
在这里插入图片描述
注:使用merge(Iterable<? extends ObservableSource<? extends T>> sources)也能够发送一个Observable的List集合(超过4个)。

concatDelayError()操做符

做用:使用concat()操做符时,若其中一个被观察者发出onError事件,则会立刻终止其余被观察者继续发送事件,使用concatDelayError()操做符能够避免这种状况,使得onError事件推迟到其余被观察者发送事件完成之后才触发。
示例代码:

public void concatDelayError() {
        Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                // 发送Error事件,由于使用了concatDelayError,因此第2个Observable将会发送事件,等发送完毕后,再发送错误事件
                emitter.onError(new NullPointerException());
                emitter.onComplete();
            }
        });
        Observable<Integer> observable2 = Observable.just(3, 4);
        List<Observable<Integer>> list = new ArrayList<>();
        list.add(observable1);
        list.add(observable2);
        Observable.concatDelayError(list)
        //concatArrayDelayError针对concatArray
// Observable.concatArrayDelayError(observable1, observable2)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件做出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件做出响应");
                    }
                });
    }

若是不使用concatDelayError的输出:
在这里插入图片描述
使用concatDelayError的输出:
在这里插入图片描述

mergeDelayError()操做符

做用:相似于concatDelayError()操做符,专门针对merge()操做的。

zip()操做符

做用:合并 多个被观察者(Observable)发送的事件,生成一个新的事件序列(即组合事后的事件序列)进行发送
新的事件序列的长度由长度较短的那个Observable决定
示例代码:

public void zip() {
        Observable<Integer> observable1 = Observable.just(1, 2, 3, 4);
        Observable<String> observable2 = Observable.just("A", "B", "C");
        Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
                    @Override
                    public String apply(Integer integer, String s) throws Exception {
                        return s + integer;
                    }
                }).subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "onSubscribe");
                    }

                    @Override
                    public void onNext(String value) {
                        Log.d(TAG, "最终接收到的事件 = " + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

输出:
在这里插入图片描述
关于zip操做符比较详细的解释看这篇文章:https://www.jianshu.com/p/bb58571cdb64

combineLatest()操做符

做用:当多个Observables中的任何一个发送了数据后,将先发送了数据的Observables 的最新(最后)一个数据 与 其余Observable发送的每一个数据结合,结合的规则由combineLatest()传入的函数决定,最终基于该函数的结果发送数据。

与Zip()的区别:Zip() = 按个数合并,即1对1合并;CombineLatest() = 按时间合并,即在同一个时间点上合并。

示例代码:

public void combineLatest() {
        Observable.combineLatest(
                Observable.just(1L, 2L, 3L), // 第1个发送数据事件的Observable
                Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), // 第2个发送数据事件的Observable:从0开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
                new BiFunction<Long, Long, Long>() {
                    @Override
                    public Long apply(Long o1, Long o2) throws Exception {
                        // o1 = 第1个Observable发送的最新(最后)1个数据
                        // o2 = 第2个Observable发送的每1个数据
                        // 合并的逻辑 = 相加
                        // 即第1个Observable发送的最后1个数据 与 第2个Observable发送的每1个数据进行相加
                        Log.e(TAG, "合并的数据是: " + o1 + " " + o2);
                        return o1 + o2;
                    }
                }).subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long s) throws Exception {
                        Log.e(TAG, "合并的结果是: " + s);
                    }
                });
    }

输出:

50.686 7870-7973/com.fly.rx.simple D/RxJavaCombineConcat: 合并的数据是: 3 0
50.686 7870-7973/com.fly.rx.simple D/RxJavaCombineConcat: 合并的结果是: 3
51.686 7870-7973/com.fly.rx.simple D/RxJavaCombineConcat: 合并的数据是: 3 1
51.687 7870-7973/com.fly.rx.simple D/RxJavaCombineConcat: 合并的结果是: 4
52.686 7870-7973/com.fly.rx.simple D/RxJavaCombineConcat: 合并的数据是: 3 2
52.686 7870-7973/com.fly.rx.simple D/RxJavaCombineConcat: 合并的结果是: 5

combineLatestDelayError()操做符

做用:做用相似于concatDelayError()

reduce()操做符

做用:把被观察者须要发送的事件按照指定规则聚合成一个新事件发送
示例代码:

public void reduce() {
        Observable.just(1,2,3,4)
                .reduce(new BiFunction<Integer, Integer, Integer>() {
                    // 在该复写方法中复写聚合的逻辑
                    @Override
                    public Integer apply(@NonNull Integer s1, @NonNull Integer s2) throws Exception {
                        // 本次聚合的逻辑是:所有数据相乘起来,每次将前两个数据相乘做为下一次的开始计算第一个数据
                        Log.e(TAG, "本次计算的数据是: "+s1 +" 乘 "+ s2);
                        return s1 * s2;
                    }
                }).subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer s) throws Exception {
                        Log.e(TAG, "最终计算的结果是: "+s);

                    }
                });
    }

输出:
在这里插入图片描述

collect()操做符

做用:将被观察者Observable发送的数据事件收集到一个数据结构里
示例代码:

public void collect() {
        Observable.just(1, 2, 3 ,4, 5, 6)
                .collect(new Callable<ArrayList<Integer>>() {
                            @Override
                            public ArrayList<Integer> call() throws Exception {
                                // 建立数据结构(容器),用于收集被观察者发送的数据
                                return new ArrayList<>();
                            }
                        }, new BiConsumer<ArrayList<Integer>, Integer>() {
                            @Override
                            public void accept(ArrayList<Integer> list, Integer integer) throws Exception {
                                //对发送的数据进行收集 参数说明:list = 容器,integer = 后者数据
                                list.add(integer);
                            }
                }).subscribe(new Consumer<ArrayList<Integer>>() {
                    @Override
                    public void accept(@NonNull ArrayList<Integer> s) throws Exception {
                        Log.e(TAG, "本次发送的数据是: "+s);

                    }
                });
    }

输出:
在这里插入图片描述

startWith()操做符

做用:在一个被观察者发送事件前,追加发送一些数据或追加一个新的被观察者
示例代码:

public void startWith() {
        Observable.just(7, 8)
                .startWith(6)  // 追加单个数据
                .startWithArray(4, 5) // 追加多个数据
                .startWith(Observable.just(1, 2, 3))//追加一个被观察者
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件做出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件做出响应");
                    }
                });
    }

输出:
在这里插入图片描述

count()操做符

做用:统计被观察者发送事件的数量
示例代码:

public void count() {
        Observable.just(1, 2, 3, 4)
                .count()
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.e(TAG, "发送的事件数量 = "+aLong);

                    }
                });
    }

输出:
在这里插入图片描述

RxJava功能型操做符

subscribe()操做符

做用: 被观察者订阅观察者,即链接观察者和被观察者造成订阅关系
示例代码:

public void subscribe() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "开始采用subscribe链接");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "对Next事件"+ value +"做出响应"  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件做出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件做出响应");
            }
        });
    }

subscribeOn() & observeOn()操做符

做用:线程调度,subscribeOn()指定被观察者发送事件的工做线程,observeOn()指定观察者响应事件的工做线程。

在 RxJava模型中,被观察者 (Observable) / 观察者(Observer)的工做线程 = 建立自身的线程,即,若被观察者 (Observable) / 观察者(Observer)在主线程被建立,那么他们的工做(生产事件 / 接收& 响应事件)就会发生在主线程。因此在在Android中默认被观察者和观察者是运行在相同的工做线程(即定义他们的线程,也就是默认的UI主线程中)。

在Android中,要实如今子线程中实现耗时的网络操做等,而后回到主线程实现 UI操做,那么对应的RxJava中,可理解为:

  • 被观察者 (Observable) 在 子线程 中生产事件(如实现耗时操做等等)
  • 观察者(Observer)在 主线程 接收 & 响应事件(即实现UI操做)

在 RxJava中,内置了多种用于调度的线程类型

类型 含义 应用场景
Schedulers.immediate() 当前线程 = 不指定线程 默认
AndroidSchedulers.mainThread() Android主线程 操做UI
Schedulers.newThread() 常规新线程 耗时等操做
Schedulers.io() io操做线程 网络请求、文件读写等io密集型操做
Schedulers.computation() CPU计算操做线程 大量计算操做
Schedulers.single() 共享单线程实例 在单线程中排队按顺序执行任务
Schedulers.trampoline() 以FIFO的队列执行 暂停当前任务,执行新插入进来的任务完以后,再将未完成的任务接着执行

注:RxJava内部使用 线程池 来维护这些线程,因此线程的调度效率很是高。

示例代码:

public void subscribOn() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "被观察者 Observer的工做线程是: " + Thread.currentThread().getName());
                emitter.onNext(1);
                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.io())
          //.subscribeOn(Schedulers.newThread())
          //.subscribeOn(Schedulers.computation())
          //.subscribeOn(Schedulers.single())
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "开始采用subscribe链接");
                }

                @Override
                public void onNext(Integer value) {
                    Log.d(TAG, "观察者 Observer的工做线程是: " + Thread.currentThread().getName());
                }

                @Override
                public void onError(Throwable e) {
                }

                @Override
                public void onComplete() {
                }
            });
    }

输出:
在这里插入图片描述
注意的地方:

  1. 若Observable.subscribeOn()屡次调用,则只有第一次指定有效,其他的指定线程无效
observable.subscribeOn(Schedulers.newThread()) // 第一次指定被观察者线程 = 新线程
          .subscribeOn(AndroidSchedulers.mainThread()) // 第二次指定无效
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe(observer);
  1. 若Observable.observeOn()屡次调用,则每次指定均有效,即每指定一次,就会进行一次线程的切换
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "被观察者Observable的工做线程是: " + Thread.currentThread().getName());
                emitter.onNext(1);
                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.newThread())
          .observeOn(AndroidSchedulers.mainThread())// 第一次指定观察者为主线程
          .doOnNext(new Consumer<Integer>() { // 生产事件
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "第一次观察者Observer的工做线程是: " + Thread.currentThread().getName());
                }
            })
          .observeOn(Schedulers.newThread()) // 第二次指定观察者为新的工做线程 有效
          .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "开始采用subscribe链接");
                }

                @Override
                public void onNext(Integer value) {
                    Log.d(TAG, "第二次观察者Observer的工做线程是: " + Thread.currentThread().getName());
                }

                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "对Error事件做出响应");
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "对Complete事件做出响应");
                }
            });

在这里插入图片描述
3. 在发送网络请求时 退出当前Activity,此时若是回到主线程更新 UI,App会崩溃
解决方案:当 Activity退出时,调用 Disposable.dispose()切断观察者和被观察者的链接,使得观察者没法收到事件 & 响应事件。
当出现多个Disposable时,可采用RxJava内置容器CompositeDisposable进行统一管理

// 添加Disposable到CompositeDisposable容器
CompositeDisposable.add()
// 清空CompositeDisposable容器
CompositeDisposable.clear()

实例能够看这篇:https://www.jianshu.com/p/8818b98c44e2

delay()操做符

做用:使得被观察者延迟一段时间再发送事件
示例代码:

public void delay() {
        Observable.just(1, 2, 3).delay(3, TimeUnit.SECONDS) // 延迟3s再发送
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件做出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件做出响应");
                    }
                });
    }

delay()有多个重载函数:
在这里插入图片描述
其中有的能够指定线程调度器和错误延迟

do操做符

做用:在某个事件的生命周期中调用
do操做符有一系列:

方法 含义
doOnEach() Observable每次发送事件前调用1次(包括完成和错误的事件)
doOnNext() 执行Next事件前调用
doAfterNext() 执行Next事件后调用
doOnError() 发送错误事件前调用
doOnCompleted() 正常发送事件完毕调用
doOnTerminate() 执行终止前调用(包括正常和异常终止的状况)
doAfterTerminate() 执行终止后调用
doFinally() 最后执行
doOnSubscribe 观察者订阅时调用

示例代码:

public void doOn() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    e.onNext(1);
                    e.onNext(2);
                    e.onError(new Throwable("发生错误了"));
                    e.onComplete();
                }
            }).doOnEach(new Consumer<Notification<Integer>>() {
                @Override
                public void accept(Notification<Integer> integerNotification) throws Exception {
                    Log.d(TAG, "doOnEach: " + integerNotification.getValue());
                }
            }).doOnNext(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "doOnNext: " + integer);
                }
            }).doAfterNext(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "doAfterNext: " + integer);
                }
            }).doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    Log.e(TAG, "doOnComplete: ");
                }
            }).doOnError(new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    Log.d(TAG, "doOnError: " + throwable.getMessage());
                }
            }).doOnSubscribe(new Consumer<Disposable>() {
                @Override
                public void accept(@NonNull Disposable disposable) throws Exception {
                    Log.e(TAG, "doOnSubscribe: ");
                }
            }).doOnTerminate(new Action() {
                @Override
                public void run() throws Exception {
                    Log.e(TAG, "doOnTerminate: ");
                }
            }).doAfterTerminate(new Action() {
                @Override
                public void run() throws Exception {
                    Log.e(TAG, "doAfterTerminate: ");
                }
            }).doFinally(new Action() {
                @Override
                public void run() throws Exception {
                    Log.e(TAG, "doFinally: ");
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "onSubscribe");
                }
                @Override
                public void onNext(Integer value) {
                    Log.d(TAG, "接收到了事件"+ value  );
                }

                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "对Error事件做出响应");
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "对Complete事件做出响应");
                }
            });
    }

输出:
在这里插入图片描述

onErrorReturn()操做符

做用:能够捕获错误。遇到错误时,发送一个特殊事件,而且正常终止。
注意后面的事件不会再发送
示例代码:

public void onErrorReturn() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onError(new Throwable("错误异常"));
                        emitter.onNext(3);

                    }
                }).onErrorReturn(new Function<Throwable, Integer>() {
                    @Override
                    public Integer apply(Throwable throwable) throws Exception {
                        Log.e(TAG, "发生了错误: " + throwable.getMessage());
                        //捕获错误 返回一个特殊事件正常终止
                        return 404;
                    }
                }).subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, String.valueOf(integer));
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

    }

输出:
在这里插入图片描述

onErrorResumeNext()

做用:发生错误的时候,发送1个新的Observable正常终止
示例代码:

public void onErrorResumeNext() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Throwable("发生错误了"));
                e.onNext(3);
            }
        }).onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
            @Override
            public ObservableSource<? extends Integer> apply(@NonNull Throwable throwable) throws Exception {
                Log.e(TAG, "在onErrorReturn处理了错误: " + throwable.toString());
                // 发生错误事件后,发送一个新的被观察者Observable
                return Observable.just(11, 22);

            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件" + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件做出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件做出响应");
            }
        });
    }

输出:
在这里插入图片描述

onExceptionResumeNext()操做符

做用:发生异常时,发送1个新的Observable正常终止

注意onExceptionResumeNext()拦截的类型是ExceptiononErrorResumeNext() 拦截的类型是Throwable,若用onExceptionResumeNext()拦截 Throwable,则会将错误传递给观察者的onError方法。

示例代码:

public void onExceptionResumeNext() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("发生错误了"));
                e.onNext(3);
            }
        }).onExceptionResumeNext(new Observable<Integer>() {
            @Override
            protected void subscribeActual(Observer<? super Integer> observer) {
                observer.onNext(55);
                observer.onNext(66);
                observer.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件" + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件做出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件做出响应");
            }
        });
    }

输出:
在这里插入图片描述

retry()操做符

做用:重试,即当出现错误时(出现onError),让被观察者从新发射数据

onExceptionResumeNext()onErrorResumeNext() 的区别是retry()能够对ThrowableException均可拦截,而且retry()完毕以后观察者仍是会收到onError事件的,前二者则不会。

retry有一些重载方法:

方法 说明
retry() 出现错误时,让被观察者从新发送数据。若错误一直发生,则一直从新发送
retry(long time) 与retry不一样的书,若错误一直发生,被观察者则一直从新发送数据,但这持续从新发送有次数限制
retry(Predicate predicate) 出现错误时,根据指定逻辑(能够捕获到发生的错误)决定是否让被观察者从新发送数据
retry(new BiPredicate<Integer, Throwable>) 出现错误时,根据指定逻辑(能够捕获重发的次数和发生的错误)决定是否让被观察者从新发送数据
retry(long time,Predicate predicate) 出现错误时,根据指定逻辑(能够捕获到发生的错误)决定是否让被观察者从新发送数据。而且可设置重发次数

示例代码:

public void retry() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("发生错误了"));
                e.onNext(3);
            }
        })//.retry() //一直从新发送(但这样就死循环了)
          //.retry(3)//重发最多3次
          .retry(new BiPredicate<Integer, Throwable>() {
              // 拦截错误后,根据逻辑判断是否须要从新发送请求
             @Override
             public boolean test(Integer integer, Throwable throwable) throws Exception {
                // 捕获异常
                 Log.e(TAG, "异常错误 = "+throwable.toString());
                // 获取当前重试次数
                Log.e(TAG, "当前重试次数 = "+integer);
                //返回false = 不从新从新发送数据 & 调用观察者的onError结束
                //返回true = 从新发送请求(若持续遇到错误,就持续从新发送)
                  if (integer > 3) {
                      return false;
                  }
                return true;
             }
          })
// .retry(3, new Predicate<Throwable>() {// 拦截错误后,根据逻辑判断是否须要从新发送请求, 最多从新发送3次
// @Override
// public boolean test(@NonNull Throwable throwable) throws Exception {
// // 捕获异常
// Log.e(TAG, "retry错误: "+throwable.toString());
// //返回false = 不从新从新发送数据 & 调用观察者的onError()结束
// //返回true = 从新发送请求
// return true;
// }
// })
           .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件" + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件做出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件做出响应");
            }
        });
    }

输出:
在这里插入图片描述

retryUntil()操做符

做用:出现错误后,知足判断条件以前持续从新发送数据
相似于retry(Predicate predicate),惟一区别:返回 true 则不从新发送数据事件。
示例代码:

public void retryUntil() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("发生错误了"));
                e.onNext(3);
            }
        }).retryUntil(new BooleanSupplier() {
            @Override
            public boolean getAsBoolean() throws Exception {

                //return true : 不从新发送请求,而且调用观察者的onError()方法结束
                // return false : 从新发送数据(若持续遇到错误,就持续从新发送)
                return false;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件" + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件做出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件做出响应");
            }
        });
    }

retryWhen()操做符

做用:遇到错误时,将发生的错误传递给一个新的被观察者,并决定是否须要从新订阅原始被观察者
示例代码:

public void retryWhen() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("发生错误了"));
                e.onNext(3);
            }
            // 遇到error事件才会回调
        }).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {

            @Override
            public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {
                // 参数Observable<Throwable>中的泛型 = 上游操做符抛出的异常,可经过该条件来判断异常的类型
                // 返回Observable<?> = 新的被观察者 Observable(任意类型)
                return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                    // 此处有两种状况:
                    // 1. 若 新的被观察者 Observable发送的事件 = Error事件,那么 原始Observable则不从新发送事件:
                    // 2. 若 新的被观察者 Observable发送的事件 = Next事件 ,那么原始的Observable则从新发送事件:
                    @Override
                    public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {

                        // 1. 若返回的Observable发送的事件 = Error事件,则原始的Observable不从新发送事件
                        // 该异常错误信息可在观察者中的onError()中得到
                        return Observable.error(new Throwable("retryWhen终止啦"));

                        //二、若返回的Observable发送的事件= Next事件(和next的内容无关),则原始的Observable从新发送事件(若持续遇到错误,则持续发送)
                        // 仅仅是做为一个触发从新订阅原被观察者的通知,什么数据并不重要,只要不是onComplete/onError事件
                        //return Observable.just(5);
                    }
                });

            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件做出响应" + e.toString());
                // 获取异常错误信息
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件做出响应");
            }
        });
    }

输出:
在这里插入图片描述

repeat()操做符

做用:无条件地、重复发送被观察者事件
接收到onCompleted()事件后,触发从新订阅,默认运行在一个新的线程上
示例代码:

public void repeat() {
        Observable.just(1, 2)
                .repeat(3) // 重复发送3次被观察者事件
                //.repeat()//无限重复
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "开始采用subscribe链接");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件做出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件做出响应");
                    }

                });

    }

输出:
在这里插入图片描述

repeatWhen()操做符

做用:有条件地、重复发送被观察者事件

将原始 Observable 中止发送事件的标识(Complete() / Error())转换成1个 Object 类型数据传递给1个新被观察者(Observable),以此决定是否从新订阅 & 发送原来的 Observable:

  • 若新被观察者(Observable)返回1个Complete / Error事件,则不从新订阅 & 发送原来的 Observable
  • 若新被观察者(Observable)返回其他事件时,则从新订阅 & 发送原来的 Observable

示例代码:

public void repeatWhen() {
        Observable.just(1,2,3).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {

            @Override
            public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {
                // 在Function函数中,必须对输入的 Observable<Object>进行处理,这里咱们使用的是flatMap操做符接收上游的数据
                // 此处有2种状况:
                // 1. 若新被观察者(Observable)返回1个Complete() / Error()事件,则不从新订阅 & 发送原来的 Observable
                // 2. 若新被观察者(Observable)返回其他事件,则从新订阅 & 发送原来的 Observable
                return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
                    @Override
                    public ObservableSource<?> apply(@NonNull Object throwable) throws Exception {

                        // 状况1:若新被观察者(Observable)返回1个Complete() / Error()事件,则不从新订阅 & 发送原来的 Observable
                        // Observable.empty() = 发送Complete事件,但不会回调观察者的onComplete()
                        return Observable.empty();

                        // 返回Error事件 = 回调onError()事件,并接收传过去的错误信息。
                        // return Observable.error(new Throwable("再也不从新订阅事件"));

                        // 状况2:若新被观察者(Observable)返回其他事件,则从新订阅 & 发送原来的 Observable
                        // 仅仅是做为1个触发从新订阅被观察者的通知,发送的是什么数据并不重要,只要不是Complete() / Error()事件
                        // return Observable.just(1);
                    }
                });

            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "开始采用subscribe链接");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件" + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件做出响应:" + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件做出响应");
            }
        });
    }

输出:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

debounce()操做符

做用: 必定的时间内没有操做就会发送事件(只会发送最后一次操做的事件)
示例代码:

public void debounce() {
        //发送5个事件,每一个事件间隔1秒, 可是debounce限定了2秒内没有任何操做才会真正发送事件,
        // 因此只有最后一次知足条件,只能接收到事件 5
        Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS)
                .debounce(2, TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.d(TAG, "debounce " + String.valueOf(aLong));
                    }
                });

    }

输出:
在这里插入图片描述

RxJava过滤操做符

filter()操做符

做用:过滤特定条件的事件
示例代码:

public void filter() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onNext(5);
            }
        }).filter(new Predicate<Integer>() {
            // 根据test()的返回值 对被观察者发送的事件进行过滤 & 筛选
            // 返回true,则继续发送, 返回false,则不发送(即过滤)
            @Override
            public boolean test(Integer integer) throws Exception {
                //过滤整数≤3的事件
                return integer > 3;
            }
        }).subscribe(new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "开始采用subscribe链接");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "过滤后获得的事件是:"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件做出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件做出响应");
            }
        });
    }

输出:
在这里插入图片描述

ofType()操做符

做用:过滤特定数据类型的数据
示例代码:

public void ofType() {
        Observable.just(1, "Carson", 3, "Ho", 5)
                .ofType(Integer.class) // 筛选出整型数据
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept( Integer integer) throws Exception {
                        Log.d(TAG,"获取到的整型事件元素是: "+ integer);
                    }
                });
    }

输出:
在这里插入图片描述

skip() / skipLast()操做符

做用:跳过某个事件
skip()从前面跳过,skipLast()从后面跳过
示例代码:

public void skipByOrder() {
        //根据顺序跳过数据项
        Observable.just(1, 2, 3, 4, 5)
                .skip(2) // 跳过正序的前2项
                .skipLast(2) // 跳过正序的后2项
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept( Integer integer) throws Exception {
                        Log.d(TAG,"获取到的整型事件元素是: "+ integer);
                    }
                });
    }
public void skipByTime() {
        // 根据时间跳过数据项
        // 发送事件特色:发送数据0-5,每隔1s发送一次,每次递增1;第1次发送延迟0s
        Observable.intervalRange(0, 5, 0, 1, TimeUnit.SECONDS)
                .skip(2, TimeUnit.SECONDS) // 跳过前2s发送的数据
                .skipLast(1, TimeUnit.SECONDS) // 跳过最后1s发送的数据
                .subscribe(new Consumer<Long>() {

                    @Override
                    public void accept( Long along ) throws Exception {
                        Log.d(TAG,"获取到的整型事件元素是: "+ along);
                    }
                });
    }

输出:
在这里插入图片描述
在这里插入图片描述

distinct() / distinctUntilChanged()操做符

做用:distinct()过滤事件序列中重复的事件,distinctUntilChanged()过滤事件序列中连续重复的事件
示例代码:

public void distinct() {
        //多余的1和2会被过滤掉
        Observable.just(1, 2, 3, 1 , 2 )
                .distinct()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept( Integer integer) throws Exception {
                        Log.d(TAG,"不重复的整型事件元素是: "+ integer);
                    }
                });
    }

输出:
在这里插入图片描述

public void distinctUntilChanged() {
        //连续重复多余的3和4会被过滤掉
        Observable.just(1,2,3,1,2,3,3,4,4 )
                .distinctUntilChanged()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept( Integer integer) throws Exception {
                        Log.d(TAG,"不连续重复的整型事件元素是: "+ integer);
                    }
                });
    }

输出:
在这里插入图片描述

take() & takeLast()操做符

做用:take()指定被观察者从前面过滤的事件数量,takeLast()指定被观察者从后面过滤的事件数量
示例代码:

public void take() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onNext(5);
            }
        }).take(2)// 指定被观察者只会发送前2个事件,也就是观察者只能接收2个事件
        .subscribe(new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "开始采用subscribe链接");
            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "过滤后获得的事件是:"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件做出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件做出响应");
            }
        });
    }

输出:
在这里插入图片描述

public void takeLast() {
        Observable.just(1, 2, 3, 4, 5)
                .takeLast(3) //指定被观察者只发送后面3个事件,也就是观察者只能接受最后3个事件
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "开始采用subscribe链接");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "过滤后获得的事件是:"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件做出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件做出响应");
                    }
                });
    }

输出:
在这里插入图片描述

throttleFirst() / throttleLast()操做符

做用:throttleFirst() 在某段时间内,只发送该段事件第一次事件, throttleLast() 在某段时间内,只发送该段事件最后一次事件
示例代码:

public void throttleFirst() {
        Observable.interval(200, TimeUnit.MILLISECONDS) //每隔0.2秒发送一个事件
                .throttleFirst(1, TimeUnit.SECONDS) //只接收每秒内发送的第一个数据
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.d(TAG + "throttleFirst", String.valueOf(aLong));
                    }
                });
    }

输出:
在这里插入图片描述

public void throttleLast() {
        Observable.interval(300, TimeUnit.MILLISECONDS) //每隔0.3秒发送一个事件
                .throttleLast(1, TimeUnit.SECONDS) //只接收每秒内发送的最后一个数据
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.d(TAG + "throttleFirst", String.valueOf(aLong));
                    }
                });
    }

输出:
在这里插入图片描述

sample()操做符

做用:在某段时间内,只发送该段时间内最新(最后)1次事件, 与throttleLast相似
示例代码:

public void sample() {
        Observable.interval(300, TimeUnit.MILLISECONDS) //每一个0.3秒发送一个事件
                .sample(1, TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.d(TAG + "sample", String.valueOf(aLong));
                    }
                });
    }

输出:
在这里插入图片描述

throttleWithTimeout()操做符

做用:发送数据事件时,若2次发送事件的间隔<指定时间,就会丢弃前一次的数据,直到指定时间内都没有新数据发射时才会发送后一次的数据
示例代码:

public void throttleWithTimeout() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                // 隔段事件发送时间
                e.onNext(1);
                Thread.sleep(500);
                e.onNext(2); // 1和2之间的间隔小于指定时间1s,因此前1次数据(1)会被抛弃,2会被保留
                Thread.sleep(1500);  // 由于2和3之间的间隔大于指定时间1s,因此以前被保留的2事件将发出
                e.onNext(3);
                Thread.sleep(1500);  // 由于3和4之间的间隔大于指定时间1s,因此3事件将发出
                e.onNext(4);
                Thread.sleep(500); // 由于4和5之间的间隔小于指定时间1s,因此前1次数据(4)会被抛弃,5会被保留
                e.onNext(5);
                Thread.sleep(500); // 由于5和6之间的间隔小于指定时间1s,因此前1次数据(5)会被抛弃,6会被保留
                e.onNext(6);
                Thread.sleep(1500); // 由于6和Complete实践之间的间隔大于指定时间1s,因此以前被保留的6事件将发出

                e.onComplete();
            }
        }).throttleWithTimeout(1, TimeUnit.SECONDS)//每1秒钟采用数据
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件做出响应");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件做出响应");
            }
        });
    }

输出:
在这里插入图片描述

firstElement() / lastElement()操做符

做用:firstElement()仅选取第1个元素 ,lastElement()仅选取最后1个元素
示例代码:

public void firstElement() {
        Observable.just(1, 2, 3)
                .firstElement()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG + "firstElement", String.valueOf(integer));
                    }
                });
    }

    public void lastElement() {
        Observable.just(1, 2, 3)
                .lastElement()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG + "firstElement", String.valueOf(integer));
                    }
                });
    }

输出:
在这里插入图片描述
在这里插入图片描述

elementAt()操做符

做用:只发送第N个事件
n为负数时,报IndexOUtOfBoundExection, n为正数但超过发射数据长度时不会报异常会使用设置的默认值代替(若是有)
示例代码:

public void elementAt() {
        Observable.range(1, 5)//发1到5
                .elementAt(3)//只取index为3的事件,注:索引从0开始
                //.elementAt(6, 10)//获取的位置索引 > 发送事件序列长度时,设置默认参数
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG + "elementAt", String.valueOf(integer));
                    }
                });
    }

输出:
在这里插入图片描述
在这里插入图片描述

elementAtOrError()操做符

做用:在elementAt()的基础上,当出现越界状况(即获取的位置索引 > 发送事件序列长度)时抛出异常
示例代码:

public void elementAtOrError() {
        Observable.range(1, 5)
                .elementAtOrError(6)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG + "elementAtOrErr", String.valueOf(integer));
                    }
                });
    }

输出:
在这里插入图片描述

ignoreElements() 操做符

做用:无论发射的数据.只但愿在它完成时和遇到错误时收到通知
示例代码:

public void ignoreElements() {
        Observable.range(0, 10)
                .ignoreElements()
                .subscribe(new CompletableObserver() {

                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "开始采用subscribe链接");
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件做出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件做出响应");
                    }
                });
    }

输出:
在这里插入图片描述

RxJava条件/布尔操做符

all()操做符

做用:判断发送的每项数据是否都知足设置的函数条件, 若知足返回 true;不然返回 false
示例代码:

public void all() {
        Observable.just(1, 2, 3, 4, 5, 6)
                .all(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        // 该函数用于判断Observable发送的10个数据是否都知足integer<=10
                        return (integer <= 10);
                    }
                }).subscribe(new Consumer<Boolean>() {
                    @Override
                    public void accept(Boolean aBoolean) throws Exception {
                        // 输出返回结果, 若知足返回 true;不然返回 false
                        Log.d(TAG, "result is " + aBoolean);
                    }
                });
    }

输出:
在这里插入图片描述

takeWhile()操做符

做用:判断发送的每项数据是否知足设置函数条件,若发送的数据知足该条件,则发送该项数据;不然不发送
示例代码:

public void takeWhile() {
        //每1s发送1个数据, 从0开始,递增1,即0、一、二、3
        Observable.interval(1, TimeUnit.SECONDS)
                .takeWhile(new Predicate<Long>(){
                    @Override
                    public boolean test(Long integer) throws Exception {
                        // 当发送的数据知足<3时,才发送Observable的数据
                        return (integer<3);
                    }
                }).subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG,"收到事件 "+ value);
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onComplete() {
                    }
                });
    }

输出:
在这里插入图片描述

skipWhile()操做符

做用:判断发送的每项数据是否知足设置函数条件,直到该判断条件 = false时,才开始发送Observable的数据
示例代码:

public void skipWhile() {
        Observable.range(1, 8)
                .skipWhile(new Predicate<Integer>(){
                    @Override
                    public boolean test( Integer integer) throws Exception {
                        // 直到发射的数据≥5,才开始发送数据
                        return (integer<5);
                    }
                }).subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG,"发送了事件 "+ value);
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onComplete() {
                    }
                });
    }

输出:
在这里插入图片描述

takeUntil()操做符

做用:执行到某个条件时,中止发送事件
示例代码:

public void takeUntil() {
        Observable.range(1, 8)
                .takeUntil(new Predicate<Integer>(){
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        // 当发射数据>5时中止发射数据
                        return (integer>5);
                    }
                }).subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG,"发送了事件 "+ value);
                    }

                    @Override
                    public void onError(Throwable e) {
                    }

                    @Override
                    public void onComplete() {
                    }
                });
    }

输出:
在这里插入图片描述
该判断条件也能够是Observable,即 等到 takeUntil()传入的Observable开始发送数据,(原始)第1个Observable的数据中止发送数据:

public void takeUntil2() {
        Observable.interval(1, TimeUnit.SECONDS)
                // 第2个Observable:延迟5s后开始发送1个Long型数据, 而后第1个Observable会中止发送
                .takeUntil(Observable.timer(5, TimeUnit.SECONDS))
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "开始采用subscribe链接");
                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件做出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件做出响应");
                    }
                });
    }

输出:
在这里插入图片描述

skipUntil()操做符

做用:等到 skipUntil() 传入的Observable开始发送数据时,才能收到(原始)第1个Observable的数据
示例代码:

public void skipUntil() {
        // (原始)第1个Observable:每隔1s发送1个数据 = 从0开始,每次递增1
        Observable.interval(1, TimeUnit.SECONDS)
                // 第2个Observable:延迟5s后开始发送1个Long型数据, 而后才能收到第1个Observable发送的数据
                .skipUntil(Observable.timer(5, TimeUnit.SECONDS))
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "开始采用subscribe链接");
                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件做出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件做出响应");
                    }

                });
    }

输出:
在这里插入图片描述

sequenceEqual()操做符

做用:断定两个Observables须要发送的数据是否相同, 相同返回 true, 不然返回 false
示例代码:

public void sequenceEqual() {
        Observable.sequenceEqual(Observable.just(4,5,6), Observable.just(4,5,6))
                .subscribe(new Consumer<Boolean>() {
                    @Override
                    public void accept( Boolean aBoolean) throws Exception {
                    	// 输出返回结果
                        Log.d(TAG,"2个Observable是否相同:"+ aBoolean);
                    }
                });
    }

输出:
在这里插入图片描述

contains()操做符

做用:判断发送的数据中是否包含指定数据,包含返回true, 不然返回false
示例代码:

public void contains() {
        Observable.just(1,2,3,4,5,6)
                .contains(4)
                .subscribe(new Consumer<Boolean>() {
                    @Override
                    public void accept(Boolean aBoolean) throws Exception {
                        // 输出返回结果
                        Log.d(TAG,"result is "+ aBoolean);
                    }

                });
    }

输出:
在这里插入图片描述

isEmpty()操做符

做用:判断发送的数据是否为空,为空返回 true,不然返回 false
示例代码:

public void isEmpty() {
        Observable.just(1,2,3,4,5,6)
                .isEmpty()
                .subscribe(new Consumer<Boolean>() {
                    @Override
                    public void accept(Boolean aBoolean) throws Exception {
                        // 输出返回结果
                        Log.d(TAG,"result is "+ aBoolean);
                    }
                });
    }

输出:
在这里插入图片描述

amb()操做符

做用:当须要发送多个Observable时,只发送先发送数据的Observable的数据,而其他 Observable则被丢弃
示例代码:

public void amb() {
        // 设置2个须要发送的Observable & 放入到集合中
        List<ObservableSource<Integer>> list= new ArrayList<>();
        // 第1个Observable延迟1秒发射数据
        list.add(Observable.just(1,2,3).delay(1,TimeUnit.SECONDS));
        // 第2个Observable正常发送数据
        list.add(Observable.just(4,5,6));

        // 一共须要发送2个Observable的数据
        // 但因为使用了amb(),因此仅发送先发送数据的Observable, 即第2个(由于第1个延时了)
        Observable.amb(list).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "接收到了事件 "+integer);
            }
        });
    }

输出:
在这里插入图片描述

defaultIfEmpty()操做符

做用:在不发送任何有效事件( Next事件)、仅发送了 Complete 事件的前提下,发送一个默认值
示例代码:

public void defaultEmpty() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                        // 不发送任何有效事件
                        // e.onNext(1);
                        // e.onNext(2);

                        // 仅发送Complete事件
                        e.onComplete();
                    }
                }).defaultIfEmpty(10) // 若仅发送了Complete事件,默认发送值 = 10
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "开始采用subscribe链接");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件做出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件做出响应");
                    }
                });
    }

输出:
在这里插入图片描述
若是发送next事件,则不会收到默认值:
在这里插入图片描述