目录html
接续上篇: Rxjava2 Observable的数据过滤详解及实例(一)java
只发射经过了函数过滤的数据项。react
实例代码:git
// filter(Predicate<? super Integer> predicate) // 验证数据,决定是否发射数据 Observable.range(1, 10) .filter(new Predicate<Integer>() { @Override public boolean test(Integer t) throws Exception { // 进行测试验证是否须要发射数据 return t > 5 ? true : false; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept filter: " + t); } });
输出:github
--> accept filter: 6 --> accept filter: 7 --> accept filter: 8 --> accept filter: 9 --> accept filter: 10
Javadoc: filter(predicate)数据库
只发射第一项或者知足某个条件的第一项数据。若是你只对Observable发射的第一项数据,或者知足某个条件的第一项数据感兴趣,你可使用 First
操做符。缓存
Frist
操做符有如下几种操做:网络
只发射第一个数据,当数据存在的状况。并发
实例代码:ide
// 1. firstElement() // 只发射第一个数据 Observable.range(1, 10) .firstElement() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept firstElement(1): " + t); } });
输出:
--> accept firstElement(1): 1
Javadoc: firstElement()
first(defaultItem)
与 firstElement()
相似,可是在Observagle没有发射任何数据时发射一个你在参数中指定的 defaultItem
默认值。
实例代码:
// 2. first(Integer defaultItem) // 发射第一个数据项,若是没有数据项,发送默认的defaultItem Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onComplete(); } }).first(999) // 没有数据发送时,发送默认值999 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept first(2): " + t); } });
输出:
--> accept first(2): 999
Javadoc: first(defaultItem)
发射第一个数据项,若是没有数据项,会发送 NoSuchElementException
通知。
实例代码:
// 3. first(Integer defaultItem) // 发射第一个数据项,若是没有数据项,会有Error: NoSuchElementException Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onComplete(); } }).firstOrError() // 没有数据发送时,将会发送NoSuchElementException通知 .subscribe(new SingleObserver<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe: "); } @Override public void onSuccess(Integer t) { System.out.println("--> accept onSuccess(3): " + t); } @Override public void onError(Throwable e) { System.out.println("--> acctpt onError(3): " + e); } });
输出:
--> onSubscribe: --> acctpt onError(3): java.util.NoSuchElementException
Javadoc: firstOrError()
single
与 first
相似,可是若是原始Observable在完成以前不是正好发射一次数据,它会抛出一个NoSuchElementException
的通知。
Single 有如下几种操做:
发射单例数据,超过一个就会发送 NoSuchElementException
通知。
实例代码:
// 1.singleElement() // 发射单例数据,超过一个就会NoSuchElementException Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onComplete(); } }).singleElement() // 发送单个数据,大于1项数据就会有Error通知 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept singleElement(1): " + t); } },new Consumer<Throwable>() { @Override public void accept(Throwable t) throws Exception { System.out.println("--> OnError(1): " + t); } });
输出:
--> OnError(1): java.lang.IllegalArgumentException: Sequence contains more than one element!
Javadoc: singleElement()
发射单例数据,没有接收到数据项则发送指定默认 defaultItem
数据。
实例代码:
// 2. single(Integer defaultItem) // 发射单例数据,没有数据项发送指定默认defaultItem Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onComplete(); } }).single(999) // 没有接受到数据则发送默认数据999 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept single(2): " + t); } });
输出:
--> accept single(2): 999
Javadoc: single(defaultItem)
发射一个单例的数据,若是数据源没有数据项,则发射一个 NoSuchElementException
通知。
实例代码:
// 3.singleOrError() // 发射一个单例的数据,若是数据源 没有数据项,则发送一个NoSuchElementException异常通知 Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onComplete(); } }).singleOrError() // 若是没有数据项发送,则发送一个NoSuchElementException异常通知 .subscribe(new SingleObserver<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(3): "); } @Override public void onSuccess(Integer t) { System.out.println("--> onSuccess(3): " + t); } @Override public void onError(Throwable e) { System.out.println("--> onError(3): " + e); } });
输出:
--> onSubscribe(3): --> onError(3): java.util.NoSuchElementException
Javadoc: singleOrError()
ElementAt
操做符获取原始Observable发射的数据序列指定索引位置的数据项,而后当作本身的惟一数据发射。
ElementAt 操做符有如下几种操做:
发射索引位置第 index
项数据(从0开始计数),若是数据不存在,会 IndexOutOfBoundsException
异常。
实例代码:
// 1. elementAt(long index) // 指定发射第N项数据(从0开始计数),若是数据不存在,会IndexOutOfBoundsException异常 Observable.range(1, 10) .elementAt(5) // 发射数据序列中索引为5的数据项,索引从0开始 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept ElementAt(1): " + t); } });
输出:
--> accept ElementAt(1): 6
Javadoc: elementAt(index)
发射索引位置第 index
项数据(从0开始计数),若是数据不存在,发送默认 defaultItem
数据。
实例代码:
// 2. elementAt(long index, Integer defaultItem) // 指定发射第N项数据(从0开始计数),若是数据不存在,发送默认defaultItem Observable.range(1, 10) .elementAt(20, 0) // 发射索引第20项数据,不存在此项数据时,发送默认数据0 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept elementAt(2): " + t); } });
输出:
--> accept elementAt(2): 0
Javadoc: elementAt(index, defaultItem)
发射索引位置第 index
项数据(从0开始计数),若是指定发射的数据不存在,会发射NoSuchElementException
异常通知。
实例代码:
// 3. elementAtOrError(long index) // 若是指定发射的数据不存在,会抛出NoSuchElementException Observable.range(1, 10) .elementAtOrError(50) // 发射索引为50的数据,不存在则发送NoSuchElementException异常通知 .subscribe(new SingleObserver<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(3): "); } @Override public void onSuccess(Integer t) { System.out.println("--> onSuccess(3): " + t); } @Override public void onError(Throwable e) { System.out.println("--> onError(3): " + e); } });
输出:
--> onSubscribe(3): --> onError(3): java.util.NoSuchElementException
Javadoc: elementAtOrError(index)
不发射任何数据,只发射Observable的终止通知。
IgnoreElements
操做符抑制原始Observable发射的全部数据,只容许它的终止通知 (onError 或 onCompleted )经过。
解析: 若是你不关心一个Observable发射的数据,可是但愿在它完成时或遇到错误终止时收到通知,你能够对Observable使用 ignoreElements 操做符,它会确保永远不会调用观察者的 onNext() 方法。
实例代码:
// ignoreElements() // 只接受onError或onCompleted通知,拦截onNext事件(不关心发射的数据,只但愿在成功或者失败的时候收到通知) Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); // int i = 1/0; emitter.onComplete(); } }).ignoreElements() .subscribe(new CompletableObserver() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe"); } @Override public void onError(Throwable e) { System.out.println("--> onError: " + e); } @Override public void onComplete() { System.out.println("--> onComplete"); } });
输出:
--> onSubscribe --> onComplete
Javadoc: ignoreElements()
只发射最后一项(或者知足某个条件的最后一项)数据。
若是你只对Observable发射的最后一项数据,或者知足某个条件的最后一项数据感兴趣,你可使用 Last
操做符。
Last
有如下几种操做:
只发射最后一项数据,使用没有参数的 last
操做符,若是Observable中没有数据发送,则一样没有数据发送。
实例代码:
// 1. lastElement() // 接受最后一项数据 Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }).lastElement() // 存在数据发送的话,即发射最后一项数据,不然没有数据发射 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept lastElement(1): " + t); } });
输出:
--> accept lastElement(1): 3
Javadoc: lastElement()
只发射最后一项数据,若是Observable中没有数据发送,则发送指定的默认值 defaultItem
。
实例代码:
// 2. last(Integer defaultItem) // 接受最后一项数据,若是没有数据发送,发送默认数据:defaultItem Observable.range(0, 0) .last(999) // 接受最后一项数据,没有数据则发送默认数据999 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept: last(2): " + t); } });
输出:
--> accept: last(2): 999
Javadoc: last(defaultItem)
接受最后一项数据,若是没有数据发送,抛出 NoSuchElementException
异常通知。
实例代码:
// 3. lastOrError() // 接受最后一项数据,若是没有数据发送,抛出onError: NoSuchElementException Observable.range(0, 0) .lastOrError() // 接受最后一项数据,若是没有数据,则反射NoSuchElementException异常通知 .subscribe(new SingleObserver<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe: "); } @Override public void onSuccess(Integer t) { System.out.println("--> onSuccess(3)"); } @Override public void onError(Throwable e) { System.out.println("--> onError(3): " + e); } });
输出:
--> onSubscribe: --> onError(3): java.util.NoSuchElementException
Javadoc: lastOrError()
使用Take
操做符让你能够修改Observable的行为,只返回前面的N项数据,而后发射完成通知,忽略剩余的数据。
Take 操做符有如下几种操做:
若是你对一个Observable使用 take(n)
操做符,而那个Observable发射的数据少于N项,那么 take
操做生成的Observable不会抛异常或发射 onError
通知,在完成前它只会发射相同的少许数据。
实例代码:
// 1. take(long count) // 返回前count项数据 Observable.range(1, 100) .take(5) // 返回前5项数据 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept take(1): " + t); } });
输出:
--> accept take(1): 1 --> accept take(1): 2 --> accept take(1): 3 --> accept take(1): 4 --> accept take(1): 5
Javadoc: take(count)
取必定时间间隔内的数据,有可选参数 scheduler
指定线程调度器。
实例代码:
// 2. take(long time, TimeUnit unit,[Scheduler] scheduler) // 取必定时间间隔内的数据,可选参数scheduler指定线程调度器 Observable.intervalRange(1, 10, 1, 1, TimeUnit.SECONDS) .take(5, TimeUnit.SECONDS) // 返回前5秒的数据项 .subscribe(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept take(2): " + t); } });
输出:
--> accept take(2): 1 --> accept take(2): 2 --> accept take(2): 3 --> accept take(2): 4 --> accept take(2): 5
Javadoc: take(timeout, TimeUnit)
Javadoc: take(timeout, TimeUnit, Scheduler)
使用 TakeLast
操做符修改原始Observable,你能够只发射Observable发射的后N项数据,忽略前面的数据。
takeLast
的这个变体默认在 computation
调度器上执行,可是你可使用第三个参数指定其它的调度器。
TakeLast 通常有下面几种操做:
使用 takeLast(count)
操做符,你能够只发射原始Observable发射的后 count
项数据(或者原始Observable发射onCompleted()
前的 count
项数据),忽略以前的数据。 注意:这会延迟原始Observable发射的任何数据项,直到它所有完成。
实例代码:
// 1. takeLast(int count) // 接受Observable数据发射完成前的Count项数据, 忽略前面的数据 Observable.range(1, 10) .doOnNext(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept(1): " + t); } }) .doOnComplete(new Action() { @Override public void run() throws Exception { System.out.println("--> onCompleted(1): "); } }) .takeLast(5) // 发送数据发射完成前的5项数据 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept takeLast(1): " + t); } });
输出:
--> accept(1): 1 --> accept(1): 2 --> accept(1): 3 --> accept(1): 4 --> accept(1): 5 --> accept(1): 6 --> accept(1): 7 --> accept(1): 8 --> accept(1): 9 --> accept(1): 10 --> onCompleted(1): --> accept takeLast(1): 6 --> accept takeLast(1): 7 --> accept takeLast(1): 8 --> accept takeLast(1): 9 --> accept takeLast(1): 10
Javadoc: takeLast(count)
还有一个 takeLast
变体接受一个时长而不是数量参数。它会发射在原始Observable的生命周期内最后一段时间内发射的数据。时长和时间单位经过参数指定。
注意: 这会延迟原始Observable发射的任何数据项,直到它所有完成。
实例代码:
// 2. takeLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize) // 可选参数 scheduler:指定工做调度器 delayError:延迟Error通知 bufferSize:指定缓存大小 // 接受Observable数据发射完成前指定时间间隔发射的数据项 Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS) .doOnNext(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept(2): " + t); } }) .doOnComplete(new Action() { @Override public void run() throws Exception { System.out.println("--> onCompleted(2): "); } }) .takeLast(3, TimeUnit.SECONDS) // 发送数据发射完成前3秒时间段内的数据 .subscribe(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept takeLast(2): " + t); } });
输出:
--> accept(2): 1 --> accept(2): 2 --> accept(2): 3 --> accept(2): 4 --> accept(2): 5 --> onCompleted(2): --> accept takeLast(2): 3 --> accept takeLast(2): 4 --> accept takeLast(2): 5
Javadoc: takeLast(long time, TimeUnit unit)
Javadoc: takeLast(long time, TimeUnit unit, boolean delayError)
Javadoc: takeLast(long time, TimeUnit unit, Scheduler scheduler)
Javadoc: takeLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError)
Javadoc: takeLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)
接受 Observable 发射完成前 time
时间段内收集 count
项数据并发射。
示例代码:
// 3. takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize) // 可选参数 scheduler:指定工做调度器 delayError:延迟Error通知 bufferSize:指定缓存大小 // 接受Observable数据发射完成前time时间段内收集count项数据并发射 Observable.intervalRange(1, 10, 1, 100, TimeUnit.MILLISECONDS) .doOnNext(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept(3): " + t); } }) .doOnComplete(new Action() { @Override public void run() throws Exception { System.out.println("--> onCompleted(3): "); } }) .takeLast(2, 500, TimeUnit.MILLISECONDS) // 在原数据发射完成前500毫秒内接受2项数据 .subscribe(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept takeLast(3): " + t); } });
输出:
--> accept(3): 1 --> accept(3): 2 --> accept(3): 3 --> accept(3): 4 --> accept(3): 5 --> accept(3): 6 --> accept(3): 7 --> accept(3): 8 --> accept(3): 9 --> accept(3): 10 --> onCompleted(3): --> accept takeLast(3): 9 --> accept takeLast(3): 10
Javadoc: takeLast(long count, long time, TimeUnit unit)
Javadoc: takeLast(long count, long time, TimeUnit unit, Scheduler scheduler)
Javadoc: takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)
ofType
是 filter 操做符的一个特殊形式。它过滤一个Observable只返回指定类型的数据。
示例代码:
Object[] dataObjects = {1, "Hello", 2.1f, 8.88, "1", new Integer(5)}; // ofType(Class clazz) // 过滤数据,只返回特定类型的数据 Observable.fromArray(dataObjects) .ofType(Integer.class) // 过滤Integer类型的数据 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept ofType: " + t); } });
输出:
--> accept ofType: 1 --> accept ofType: 5
Javadoc: ofType(Class clazz)
数据过滤的操做符主要是过滤被观察者(Observable)发射的数据序列,按照指定的规则过滤数据项,忽略并丢弃其余的数据。实际开发场景如网络数据的过滤,数据库数据的过滤等,是开发中重要且常见的操做之一。
Rx介绍与讲解及完整目录参考:Rxjava2 介绍与详解实例
实例代码: