博客主页java
RxJava 的过滤操做符主要包括如下几种:segmentfault
只发射第 一 项(或者知足某个条件的第一项)数据ide
若是只对 Observable 发射的第一项数据,或者知足某个条件的第一项数据感兴趣,那么就可使用 first 操做符。函数
在 RxJava 2.x 中,使用 first() 须要一个默认的 Item ,对于 Observable 而言,使用了 first()会返回 Single 类型。测试
public final Single<T> first(T defaultItem) { return elementAt(0L, defaultItem); } Observable.just(3, 4, 5) .first(8) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }); // 执行结果 Next-> 3
若是 Observable 不发射任何数据,那么 first 操做符的默认值就起了做用。ui
Observable.<Integer>empty() .first(8) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }); // 执行结果 Next-> 8
在 R.Java 2.x 中,还有 firstElement 操做符表示只取第一个数据,没有默认值。 firstOrError 操做符表示要么能取到第一个数据,要么执行 onError 方法,它们分别返回 Maybe 类型和 Single 类型。this
只发射最后一项(或者知足某个条件的最后一项)数据spa
若是只对 Observable 发射的最后一项数据, 或者知足某个条件的最后一项数据感兴趣,那么就可使用 last 操做符。code
last 操做符跟 first 操做符相似,须要一个默认的 Item ,也是返回 Single 类型。blog
public final Single<T> last(T defaultItem) { ObjectHelper.requireNonNull(defaultItem, "defaultItem is null"); return RxJavaPlugins.onAssembly(new ObservableLastSingle<T>(this, defaultItem)); } Observable.just(3, 4, 5) .last(8) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }); // 执行结果 Next-> 5
在 RxJava 2.x 中,有 lastElement 操做符和 lastOrError 操做符。
只发射前面的 N 项数据
使用 take 操做符能够只修改 Observable 的行为,返回前面的 N 项数据,发射完成通知,忽略剩余的数据
Observable.just(1, 2, 3, 4, 5) .take(3) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next-> 1 Next-> 2 Next-> 3 Complete.
若是对一个 Observable 使用 take 操做符,而那个 Observabl 发射的数据少于 N 项,那么 take 操做符生成的 Observable 就不会抛出异常或者发射 Error 通知,而是仍然会发射那些数据
Observable.just(1, 2, 3, 4, 5) .take(6) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next-> 1 Next-> 2 Next-> 3 Next-> 4 Next-> 5 Complete.
take 有一个重载方法可以接受一个时长而不是数量参数。它会丢掉发射 Observable 开始的那段时间发射的数据,时长和时间单位经过参数指定。
Observable.intervalRange(0, 10, 1, 1, TimeUnit.SECONDS) .take(3, TimeUnit.SECONDS) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, "Next-> " + aLong); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next-> 0 Next-> 1 Next-> 2 Complete.
上述代码使用了 intervalRange 操做符表示每隔 ls 会发射一个数据,它们从 0 开始到 9 结束,发射 10 个数据。因为在这里使用了 take 操做符,最后只打印前 3 个数据.
take 的这个重载方法默认在 computation 调度器上执行,也可使用参数来指定其余调度器。
发射 Observable 发射的最后 N 项数据
使用 takeLast 操做符修改原始 Observable,咱们能够只发射 Observable 发射的最后 N 项数据,忽略前面的数据。
Observable.just(1, 2, 3, 4, 5) .takeLast(3) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next-> 3 Next-> 4 Next-> 5 Complete.
一样,若是对一个 Observable 使用 takeLast(n) 操做符,而那个 Observable 发射的数据少于 N 项,那么 takeLast 操做符生成的 Observable 不会抛出异常或者发射 onError 通知,而是仍然发射那些数据。
takeLast 也有一个重载方法可以接受一个时长而不是数量参数。它会发射在原始 Observable 生命周期内最后一段时间发射的数据,时长和时间单位经过参数指定。
Observable.intervalRange(0, 10, 1, 1, TimeUnit.SECONDS) .takeLast(3, TimeUnit.SECONDS) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, "Next-> " + aLong); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next-> 7 Next-> 8 Next-> 9 Complete.
抑制 Observable 发射的前 N 项数据
使用 skip 操做符,能够忽略 Observable 发射的前 N 项数据,只保留以后的数据
Observable.just(1, 2, 3, 4, 5) .skip(3) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next-> 4 Next-> 5 Complete.
skip 有一个重载方法可以接受一个时长而不是数量参数。它会丢弃原始 Observable 开始那段时间发射的数据,时长和时间单位经过参数指定。
Observable.interval(1, TimeUnit.SECONDS) .skip(3, TimeUnit.SECONDS) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, "Next-> " + aLong); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next-> 3 Next-> 4 Next-> 5 Next-> 6 Next-> 7 Next-> 8 Next-> 9 ......
抑制 Observable 发射的后 N 项数据
使用 skipLast 操做符修改原始 Observable,能够忽略 Observable 发射后 N 项数据,只保留前面的数据。
Observable.just(1, 2, 3, 4, 5) .skipLast(3) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next-> 1 Next-> 2 Complete.
一样, skipLast 也有一个重载方法接受一个时长而不是数量参数。它会丢弃在原始 Observable 生命周期最后一段时间内发射的数据,时长和时间单位经过参数指定。
Observable.interval(1, TimeUnit.SECONDS) .skipLast(3, TimeUnit.SECONDS) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, "Next-> " + aLong); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next-> 0 Next-> 1 Next-> 2 Next-> 3 Next-> 4 Next-> 5 Next-> 6 ......
只发射第 N 项数据
elementAt 操做符获取原始 Observable 发射的数据序列指定索引位置的数据项,而后看成本身的惟一数据发射
它传递一个基于 0 的索引值,发射原始 Observable 数据序列对应索引位置的值,若是传递给 elementAt 的值为 5,那么它会发射第 6 项数据。若是传递的是一个负数,则将会抛出 IndexOutOfBoundsException 异常。
Observable.just(1, 2, 3, 4, 5) .elementAt(2) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next-> " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next-> 3
elementAt(index)返回一个 Maybe 类型。
public final Maybe<T> elementAt(long index) { if (index < 0) { throw new IndexOutOfBoundsException("index >= 0 required but it was " + index); } return RxJavaPlugins.onAssembly(new ObservableElementAtMaybe<T>(this, index)); }
若是原始 Observable 的数据项数小于 index+1 ,那么会调用 onComplete 方法(在 RxJava l.x 中也会抛出一个 IndexOutOfBoundsException 异常)。因此 elementAt 还提供了一个带默认值的方法,它返回一个 Single 类型。
Observable.just(1, 2, 3, 4, 5) .elementAt(8, 10) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Success: " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }); // 执行结果 Success: 10
若是 index 超出了索引范围,那么取默认值
不发射任何数据,只发射 Observable 终止通知
ignoreElements 操做符抑制原始 Observable 发射的全部数据,只容许它的终止通知( onError 或 onComplete )经过。它返回 Completable 类型
若是咱们不关心一个 Observable 发射的数据,可是但愿在它完成时或遇到错误终止时收到通知,那么就能够对 Observable 使用 gnoreElements 操做符,它将确保永远不会调用观察者的 onNext 方法。
Observable.just(1, 2, 3, 4, 5) .ignoreElements() .subscribe(new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error->" + throwable); } }); // 执行结果 Complete.
过滤掉重复的数据项
distinct 的过滤规则是: 只容许尚未发射过的数据项经过
Observable.just(1, 2, 2, 3, 4, 4, 4, 5) .distinct() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next->" + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error->" + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next->1 Next->2 Next->3 Next->4 Next->5 Complete.
distinct 还能接受 Function 做为参数,这个函数根据原始 Observable 发射的数据项产生一个 Key ,而后比较这些 Key 而不是数据自己,来断定两个数据是否不一样。
与 distinct 相似的是 distinctUntilChanged 操做符,该操做符与 distinct 的区别是:它只断定一个数据和它的直接前驱是否不一样
Observable.just(1, 2, 1, 2, 3, 4, 4, 4, 5) .distinctUntilChanged() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next->" + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error->" + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next->1 Next->2 Next->1 Next->2 Next->3 Next->4 Next->5 Complete.
只发射经过谓词测试的数据项
filter 操做符使用你指定的一个谓词函数测试数据项,只有经过测试的数据才会被发射。
Observable.just(2, 30, 22, 5, 60, 1) .filter(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer > 10; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next->" + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error->" + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next->30 Next->22 Next->60 Complete.
仅在过了一段指定的时间还没发射数据的才发射一个数据
debounce 操做符会过滤掉发射速率过快的数据项
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { if (emitter.isDisposed()) return; try { for (int i = 0; i < 10; i++) { emitter.onNext(i); Thread.sleep(i * 100); } emitter.onComplete(); } catch (Exception e) { emitter.onError(e); } } }).debounce(500, TimeUnit.MILLISECONDS) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next->" + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error->" + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next->6 Next->7 Next->8 Next->9 Complete.
debounce 还有另一种形式,使用一个 Function 函数来限制发送的数据。
跟 debounce 相似的是由throttleWithTimeout 操做符,它与只使用时间参数来限流的 debounce 的功能相同。
若是个人文章对您有帮助,不妨点个赞鼓励一下(^_^)