目录html
需求了解:java
在使用 Rxjava 开发中,常常有一些各类条件的操做 ,如比较两个 Observable 谁先发射了数据、跳过指定条件的 Observable 等一系列的条件操做需求,那么很幸运, Rxjava 中已经有了不少条件操做符,一块儿来了解一下吧。react
下面列出了一些Rxjava
的用于条件操做符:git
Amb
:给定两个或多个Observables,它只发射首先发射数据或通知的那个Observable的全部数据。DefaultIfEmpty
:发射来自原始Observable的值,若是原始 Observable 没有发射任何数据项,就发射一个默认值。SwitchIfEmpty
:若是原始Observable没有发射数据时,发射切换一个指定的Observable继续发射数据。SkipUntil
:丢弃原始 Observable 发射的数据,直到第二个 Observable 发射了一个数据,而后发射原始 Observable 的剩余数据。SkipWhile
:丢弃原始 Observable 发射的数据,直到一个特定的条件为假,而后发射原始 Observable 剩余的数据。TakeUntil
:发射来自原始 Observable 的数据,直到第二个 Observable 发射了一个数据或一个通知。给定两个或多个Observables,它只发射首先发射数据或通知的那个Observable的全部数据。github
解析: 对多个Observable进行监听,首先发射通知(包括数据)的Observable将会被观察者观察,发射这个Observable的全部数据。数组
示例代码:ide
// 建立Observable Observable<Integer> delayObservable = Observable.range(1, 5) .delay(100, TimeUnit.MILLISECONDS); // 延迟100毫秒发射数据 Observable<Integer> rangeObservable = Observable.range(6, 5); // 建立Observable的集合 ArrayList<Observable<Integer>> list = new ArrayList<>(); list.add(delayObservable); list.add(rangeObservable); // 建立Observable的数组 Observable<Integer>[] array = new Observable[2]; array[0] = delayObservable; array[1] = rangeObservable; /** * 1. ambWith(ObservableSource<? extends T> other) * 与另一个Observable比较,只发射首先发射通知的Observable的数据 */ rangeObservable.ambWith(delayObservable) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("--> accept(1): " + integer); } }); System.in.read(); System.out.println("------------------------------------------------"); /** * 2. amb(Iterable<? extends ObservableSource<? extends T>> sources) * 接受一个Observable类型的集合, 只发射集合中首先发射通知的Observable的数据 */ Observable.amb(list) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("--> accept(2): " + integer); } }); System.in.read(); System.out.println("------------------------------------------------"); /** * 3. ambArray(ObservableSource<? extends T>... sources) * 接受一个Observable类型的数组, 只发射数组中首先发射通知的Observable的数据 */ Observable.ambArray(array) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("--> accept(3): " + integer); } }); System.in.read();
输出:函数
--> accept(1): 6 --> accept(1): 7 --> accept(1): 8 --> accept(1): 9 --> accept(1): 10 ------------------------------------------------ --> accept(2): 6 --> accept(2): 7 --> accept(2): 8 --> accept(2): 9 --> accept(2): 10 ------------------------------------------------ --> accept(3): 6 --> accept(3): 7 --> accept(3): 8 --> accept(3): 9 --> accept(3): 10
Javadoc: ambWith(ObservableSource other)
Javadoc: amb(Iterable sources)
Javadoc: ambArray(ObservableSource... sources).net
发射来自原始Observable的值,若是原始 Observable 没有发射数据项,就发射一个默认值。
3d
解析: DefaultIfEmpty
简单的精确地发射原始Observable的值,若是原始Observable没有发射任何数据正常终止(以 onCompleted 的形式), DefaultIfEmpty
返回的Observable就发射一个你提供的默认值。若是你须要发射更多的数据,或者切换备用的Observable,你能够考虑使用 switchIfEmpty
操做符 。
示例代码:
/** * defaultIfEmpty(@NotNull T defaultItem) * 若是原始Observable没有发射任何数据正常终止(以 onCompleted 的形式), * DefaultIfEmpty 返回的Observable就发射一个你提供的默认值defaultItem。 */ Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { emitter.onComplete(); // 不发射任何数据,直接发射完成通知 } }).defaultIfEmpty("No Data emitter!!!") .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe"); } @Override public void onNext(String s) { System.out.println("--> onNext: " + s); } @Override public void onError(Throwable e) { System.out.println("--> onError: " + e); } @Override public void onComplete() { System.out.println("--> onComplete"); } });
输出:
--> onSubscribe --> onNext: No Data emitter!!! --> onComplete
Javadoc: defaultIfEmpty(T defaultItem)
若是原始Observable没有发射数据时,发射切换一个指定的Observable继续发射数据。
解析: 若是原始 Observable 没有发射数据时,发射切换指定的 other
继续发射数据。
示例代码:
/** * switchIfEmpty(ObservableSource other) * 若是原始Observable没有发射数据时,发射切换指定的other继续发射数据 */ Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onComplete(); // 不发射任何数据,直接发射完成通知 } }).switchIfEmpty(Observable.just(888)) // 若是原始Observable没有发射数据项,默认发射备用的Observable,发射数据项888 .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe"); } @Override public void onNext(Integer integer) { System.out.println("--> onNext: " + integer); } @Override public void onError(Throwable e) { System.out.println("--> onError: " + e); } @Override public void onComplete() { System.out.println("--> onComplete"); } });
输出:
--> onSubscribe --> onNext: 888 --> onComplete
丢弃原始 Observable 发射的数据,直到第二个 Observable 发射了一个数据,而后发射原始 Observable 的剩余数据。
示例代码:
/** * skipUntil(ObservableSource other) * 丢弃原始Observable发射的数据,直到other发射了一个数据,而后发射原始Observable的剩余数据。 */ Observable.intervalRange(1, 10, 0, 500, TimeUnit.MILLISECONDS) // 丢弃2000毫秒的原始Observable发射的数据,接受后面的剩余部分数据 .skipUntil(Observable.timer(2000, TimeUnit.MILLISECONDS)) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe"); } @Override public void onNext(Long aLong) { System.out.println("--> onNext: " + aLong); } @Override public void onError(Throwable e) { System.out.println("--> onError: " + e); } @Override public void onComplete() { System.out.println("--> onComplete"); } }); System.in.read();
输出:
--> onSubscribe --> onNext: 5 --> onNext: 6 --> onNext: 7 --> onNext: 8 --> onNext: 9 --> onNext: 10 --> onComplete
Javadoc: skipUntil(ObservableSource other)
丢弃原始 Observable 发射的数据,直到一个特定的条件为假,而后发射原始 Observable 剩余的数据。
示例代码:
/** * skipWhile(Predicate<? super T> predicate) * 丢弃原始 Observable 发射的数据,直到函数predicate的条件为假,而后发射原始Observable剩余的数据。 */ Observable.intervalRange(1, 10, 0, 500, TimeUnit.MILLISECONDS) .skipWhile(new Predicate<Long>() { @Override public boolean test(Long aLong) throws Exception { if (aLong > 5) { return false; // 当原始数据大于5时,发射后面的剩余部分数据 } return true; // 丢弃原始数据项 } }).subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe"); } @Override public void onNext(Long aLong) { System.out.println("--> onNext: " + aLong); } @Override public void onError(Throwable e) { System.out.println("--> onError: " + e); } @Override public void onComplete() { System.out.println("--> onComplete"); } }); System.in.read();
输出:
--> onSubscribe --> onNext: 6 --> onNext: 7 --> onNext: 8 --> onNext: 9 --> onNext: 10 --> onComplete
Javadoc: skipWhile(Predicate predicate)
发射来自原始 Observable 的数据,直到第二个 Observable 发射了一个数据或一个通知。
TakeUntil
订阅并开始发射原始 Observable,它还监视你提供的第二个 Observable。若是第二个 Observable 发射了一项数据或者发射了一个终止通知,TakeUntil
返回的 Observable 会中止发射原始 Observable 并终止。
解析: 第二个Observable发射一项数据或一个 onError
通知或一个 onCompleted
通知都会致使 takeUntil
中止发射数据。
示例代码:
// 建立Observable,发送数字1~10,每间隔200毫秒发射一个数据 Observable<Long> observable = Observable.intervalRange(1, 10, 0, 200, TimeUnit.MILLISECONDS); /** * 1. takeUntil(ObservableSource other) * 发射来自原始Observable的数据,直到other发射了一个数据或一个通知后中止发射原始Observable并终止。 */ observable.takeUntil(Observable.timer(1000, TimeUnit.MILLISECONDS)) // 1000毫秒后中止发射原始数据 .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(1)"); } @Override public void onNext(Long aLong) { System.out.println("--> onNext(1): " + aLong); } @Override public void onError(Throwable e) { System.out.println("--> onError(1): " + e); } @Override public void onComplete() { System.out.println("--> onComplete(1)"); } }); System.in.read();
输出:
--> onSubscribe(1) --> onNext(1): 1 --> onNext(1): 2 --> onNext(1): 3 --> onNext(1): 4 --> onNext(1): 5 --> onComplete(1)
Javadoc: takeUntil(ObservableSource other)
每次发射数据后,经过一个谓词函数来断定是否须要终止发射数据。
解析: 每次发射数据后,经过一个谓词函数 stopPredicate
来断定是否须要终止发射数据,若是 stopPredicate
返回 true
怎表示中止发射原始Observable后面的数据,不然继续发射后面的数据。
示例代码:
/** * 2. takeUntil(Predicate<? super T> stopPredicate) * 每次发射数据后,经过一个谓词函数stopPredicate来断定是否须要终止发射数据 * 若是stopPredicate返回true怎表示中止发射后面的数据,不然继续发射后面的数据 */ observable.takeUntil(new Predicate<Long>() { @Override public boolean test(Long aLong) throws Exception { // 函数返回false则为继续发射原始数据,true则中止发射原始数据 if(aLong > 5){ return true; // 知足条件后,中止发射数据 } return false; // 继续发射数据 } }).subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(2)"); } @Override public void onNext(Long aLong) { System.out.println("--> onNext(2): " + aLong); } @Override public void onError(Throwable e) { System.out.println("--> onError(2): " + e); } @Override public void onComplete() { System.out.println("--> onComplete(2)"); } }); System.in.read();
输出:
--> onSubscribe(2) --> onNext(2): 1 --> onNext(2): 2 --> onNext(2): 3 --> onNext(2): 4 --> onNext(2): 5 --> onNext(2): 6 --> onComplete(2)
Javadoc: takeUntil(Predicate stopPredicate)
发射原始Observable的数据,直到一个特定的条件,而后跳过剩余的数据。
解析: 发射原始 Observable 的数据,直到 predicate
的条件为 false
,而后跳过剩余的数据。
示例代码:
// 建立Observable,发送数字1~10,每间隔200毫秒发射一个数据 Observable<Long> observable = Observable.intervalRange(1, 10, 0, 200, TimeUnit.MILLISECONDS); /** * takeWhile(Predicate predicate) * 发射原始Observable的数据,直到predicate的条件为false,而后跳过剩余的数据 */ observable.takeWhile(new Predicate<Long>() { @Override public boolean test(Long aLong) throws Exception { // 函数返回值决定是否继续发射后续的数据 if(aLong > 5){ return false; // 知足条件后跳事后面的数据 } return true; // 继续发射数据 } }).subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe"); } @Override public void onNext(Long aLong) { System.out.println("--> onNext: " + aLong); } @Override public void onError(Throwable e) { System.out.println("--> onError: " + e); } @Override public void onComplete() { System.out.println("--> onComplete"); } }); System.in.read();
输出:
--> onSubscribe(1) --> onNext(1): 1 --> onNext(1): 2 --> onNext(1): 3 --> onNext(1): 4 --> onNext(1): 5 --> onComplete(1)
Javadoc: takeWhile(Predicate predicate)
本节主要介绍了Rxjava
条件操做符能够根据不一样的条件进行数据的发射,变换等相关行为。
提示:以上使用的Rxjava2版本: 2.2.12
Rx介绍与讲解及完整目录参考:Rxjava2 介绍与详解实例
实例代码: