博客主页java
RxJava 的条件操做符主要包括如下几个:segmentfault
RxJava 的布尔操做符主要包括:ide
断定 Observable 发射的全部数据是否都知足某个条件函数
传递一个谓词函数给 all 操做符,这个函数接受原始 Observable 发射的数据,根据计算返回一个布尔值。 all 返回一个只发射单个布尔值的 Observable,若是原始 Observable 正常终止而且每一项数据都知足条件,就返回 true。若是原始 Observabl 的任意一项数据不知足条件,就返回falseui
Observable.just(1, 2, 3, 4, 5) .all(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer < 10; } }).subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.d(TAG, "Success: " + aBoolean); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }); // 执行结果 Success: true
判断 Observable 发射的全部数据是否都大于 3spa
Observable.just(1, 2, 3, 4, 5) .all(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer > 3; } }).subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.d(TAG, "Success: " + aBoolean); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }); // 执行结果 Success: false
断定一个 Observable 是否发射了一个特定的值code
给 contains 传一个指定的值,若是原始 Observable 发射了那个值,那么返回的 Observable 将发射 true,不然发射 false 。与它相关的一个操做符是 isEmpty ,用于断定原始 Observable 是否未发射任何数据。对象
Observable.just(2, 30, 22, 5, 60, 1) .contains(22) .subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.d(TAG, "Success: " + aBoolean); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }); // 执行结果 Success: true
给定两个或多个 Observable ,它只发射首先发射数据或通知的那个 Observable 的全部数据blog
当传递多个 Observable 给 amb 时,它只发射其中一个 Observable 数据和通知: 首先发送通知给 amb 的那个 Observable ,无论发射的是一项数据 ,仍是一个 onError 或 onCompleted 通知。 amb 忽略和丢弃其余全部 Observables 的发射物。ip
在 RxJava 中, amb 还有一个相似的操做符 ambWith。 例如, Observable.amb(ol, o2 )和
ol.ambWith(o2)是等价的
在 RxJava 2.x 中, amb 须要传递 Iterable 对象,或者使用 ambArray 来传递可变参数。
Observable.ambArray( Observable.just(1, 2, 3), Observable.just(4, 5, 6) ).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next: " + integer); } }); // 执行结果 Next: 1 Next: 2 Next: 3
修改一下代码,第一个 Observable 延迟 ls 后再发射数据
Observable.ambArray( Observable.just(1, 2, 3).delay(1, TimeUnit.SECONDS), Observable.just(4, 5, 6) ).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next: " + integer); } }); // 执行结果 Next: 4 Next: 5 Next: 6
因为第一个 Observable 延迟发射,所以咱们只消费了第二个 Observable 的数据,第一个 Observable 发射的数据就再也不处理了。
发射来自原始 Observable 值,若是原始 Observable 没有发射任何值,就发射一个默认值
defaultIfEmpty 简单精确地发射原始 Observable 的值,若是原始 Observable 没有发射任何数据,就正常终止(以 onComplete 形式了),那么 defaultlfEmpty 返回的 Observable 就发射一个咱们提供的默认值。
Observable.empty() .defaultIfEmpty(8) .subscribe(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { Log.d(TAG, "Next: " + o); } }); // 执行结果 Next: 8
在 defaultIfEmpty 方法内部,其实调用的是 switchIfEmpty 操做符,源码以下:
public final Observable<T> defaultIfEmpty(T defaultItem) { ObjectHelper.requireNonNull(defaultItem, "defaultItem is null"); return switchIfEmpty(just(defaultItem)); }
defaultIfEmpty 和 switchIfEmpty 的区别是, defaultIfEmpty 操做符只能在被观察者不发送数据时发送一个默认的数据 ,若是想要发送更多数据,则可使用 switchIfEmpty 操做符,发送自定义的被观察者做为替代。
Observable.empty() .switchIfEmpty(Observable.just(1, 2, 3)) .subscribe(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { Log.d(TAG, "Next: " + o); } }); // 执行结果 Next: 1 Next: 2 Next: 3
断定两个 Observable 是否发射相同的数据序列
传递两个 Observable 给 sequenceEqual 操做符时,它会比较两个 Observable 发射物,若是两个序列相同(相同的数据,相同的顺序,相同的终止状态〉 ,则发射 true 不然发射 false
Observable.sequenceEqual( Observable.just(1, 2, 3, 4, 5), Observable.just(1, 2, 3, 4, 5) ).subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.d(TAG, "Success: " + aBoolean); } }); // 执行结果 Success: true
将两个 Observable 改为不一致
Observable.sequenceEqual( Observable.just(1, 2, 3, 4, 5), Observable.just(1, 2, 3, 4, 5, 6) ).subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.d(TAG, "Success: " + aBoolean); } }); // 执行结果 Success: false
sequenceEqual 还有一个版本接受第三个参数,能够传递一个函数用于比较两个数据项是否相同。对于复杂对象的比较,用三个参数的版本更为合适。
Observable.sequenceEqual( Observable.just(1, 2, 3, 4, 5), Observable.just(1, 2, 3, 4, 5), new BiPredicate<Integer, Integer>() { @Override public boolean test(Integer integer, Integer integer2) throws Exception { return integer == integer2; } } ).subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { Log.d(TAG, "Success: " + aBoolean); } }); // 执行结果 Success: true
丢弃原始 Observable 发射的数据,直到第二个 Observable 发射了一项数据
skipUntil 订阅原始的 Observable,可是忽略它的发射物,直到第二个 Observable 发射一项数据那一刻,它才开始发射原始的 Observabl。 skipUntil 默认不在任何特定的调度器上执行。
Observable.intervalRange(1, 9, 0, 1, TimeUnit.MILLISECONDS) .skipUntil(Observable.timer(4, TimeUnit.MILLISECONDS)) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, "Next: " + aLong); } }); // 执行结果 Next: 4 Next: 5 Next: 6 Next: 7 Next: 8 Next: 9
上述代码,原始的 Observable 发射 1 到 9 这 9 个数 ,初始延迟时间是 0,每间隔 lms。因为使用 skipUntil,所以它会发射原始 Observable 在 3ms 以后的数据。
丢弃 Observable 发射的数据,直到一个指定的条件不成立。
skipWhile 订阅原始的 Observable ,可是忽略它的发射物,直到指定的某个条件变为 false。它才开始发射原始的 Observable。skipWhile 默认不在任何特定的调度器上执行
Observable.just(1, 2, 3, 4, 5) .skipWhile(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer <= 2; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next: " + integer); } }); // 执行结果 Next: 3 Next: 4 Next: 5
当第二个 Observable 发射了一项数据或者终止时,丢弃原始 Observable 发射的任何数据
takeUntil 订阅并开始发射原始 Observable ,它还监视你提供的第二个 Observable。若是第二个 Observable 发射了一项数据或者发射了一个终止通知,则 takeUntil 返回的 Observable 会中止发射原始 Observable 并终止。
Observable.just(1, 2, 3, 4, 5, 6) .takeUntil(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer == 4; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next: " + integer); } }); // 执行结果 Next: 1 Next: 2 Next: 3 Next: 4
发射原始 Observable 发射的数据,直到一个指定的条件不成立
takeWhile 发射原始的 Observable 直到某个指定的条件不成立,它会当即中止发射原始 Observable,并终止本身的 Observable。
RxJava 中的 takeWhile 操做符返回一个原始 Observable 行为的 Observable,直到某项数据,指定的函数返回 false ,这个新的 Observable 就会发射 onComplete 终止通知
Observable.just(1, 2, 3, 4, 5, 6) .takeWhile(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer <= 2; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next: " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next: 1 Next: 2 Complete.
若是个人文章对您有帮助,不妨点个赞鼓励一下(^_^)