需求了解:java
对于数据的观察以及处理过程当中每每有须要过滤一些不须要的数据的需求,好比防抖(防止快速操做),获取第一项、指定序列项或者最后一项的须要,获取指定时间内的有效数据等。Rx中提供了丰富的数据过滤处理的操做方法。react
可用于过滤和选择Observable发射的数据序列的方法:缓存
仅在过了一段指定的时间还没发射数据时才发射一个数据。Debounce
操做符会过滤掉发射速率过快的数据项。app
提示: 操做默认在 computation 调度器上执行,可是你能够指定其它的调度器。ide
指定每一个数据发射后在 timeout
时间内,原始数据序列中没有下一个数据发射时,发射此项数据,不然丢弃这项数据。此操做与 throttleWithTimeout
方法相同。函数
注意: 这个操做符会在原始数据的 onCompleted
时候直接发射发射数据,不会由于限流而丢弃数据。线程
实例代码:3d
// 1. debounce(long timeout, TimeUnit unit) // 发送一个数据,若是在包含timeout时间内,没有第二个数据发射,那么就会发射此数据,不然丢弃此数据 Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); // 下一个数据到此数据发射, 30 < timeout --> skip Thread.sleep(30); emitter.onNext(2); // 下一个数据到此数据发射, 100 > timeout --> deliver Thread.sleep(100); emitter.onNext(3); // 下一个数据到此数据发射, 50 = timeout --> skip: Thread.sleep(50); emitter.onNext(4); // 下一个数据到此数据发射, onCompleted --> deliver emitter.onComplete(); } }).debounce(50, TimeUnit.MILLISECONDS) // 指定防抖丢弃时间段为50毫秒 // .debounce(50, TimeUnit.MILLISECONDS, Schedulers.trampoline()) // 指定调度为当前线程排队 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept debounce(1-1): " + t); } });
输出:code
--> accept debounce(1-1): 2 --> accept debounce(1-1): 4
Javadoc: debounce(timeout, unit)
Javadoc: debounce(timeout, unit, scheduler)
原始数据发射每个序列都经过绑定监听debounceSelector
的数据通知,在debounceSelector
数据发送前,若是有下一个数据,则丢弃当前项数据,继续监视下一个数据。
注意: 这个操做符会在原始数据的 onCompleted
时候直接发射发射数据,不会由于限流而丢弃数据。
实例代码:
// 2. debounce(debounceSelector) // 原始数据发射每个序列的经过监听debounceSelector的数据通知, // 在debounceSelector数据发送前,若是有下一个数据,则丢弃当前项数据,继续监视下一个数据 Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); // skip --> debounceSelector is no emitter(<2s) Thread.sleep(1000); emitter.onNext(2); // skip --> debounceSelector is no emitter(<2s) Thread.sleep(200); emitter.onNext(3); // deliver --> debounceSelector is emitter(>2s) Thread.sleep(2500); emitter.onNext(4); // skip --> debounceSelector is no emitter(=2s) Thread.sleep(2000); emitter.onNext(5); // deliver --> onComplete Thread.sleep(500); emitter.onComplete(); } }).debounce(new Function<Integer, ObservableSource<Long>>() { @Override public ObservableSource<Long> apply(Integer t) throws Exception { System.out.println("--> apply(1-2): " + t); // 设置过滤延迟时间为2秒,此时返回的Observable从订阅到发送数据时间段即为timeout return Observable.timer(2, TimeUnit.SECONDS) .doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable t) throws Exception { // 开始订阅,监听数据的发送来过滤数据 System.out.println("--> debounceSelector(1-2) is onSubscribe!"); } }).doOnDispose(new Action() { @Override public void run() throws Exception { // 发射数据后,丢弃当前的数据,解除当前绑定 System.out.println("--> debounceSelector(1-2) is unSubscribe!"); } }); } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("----------> accept(1-2): " + t); } });
输出:
--> apply(1-2): 1 --> debounceSelector(1-2) is onSubscribe! --> debounceSelector(1-2) is unSubscribe! --> apply(1-2): 2 --> debounceSelector(1-2) is onSubscribe! --> debounceSelector(1-2) is unSubscribe! --> apply(1-2): 3 --> debounceSelector(1-2) is onSubscribe! --> debounceSelector(1-2) is unSubscribe! ----------> accept(1-2): 3 --> apply(1-2): 4 --> debounceSelector(1-2) is onSubscribe! --> debounceSelector(1-2) is unSubscribe! --> apply(1-2): 5 --> debounceSelector(1-2) is onSubscribe! ----------> accept(1-2): 5 --> debounceSelector(1-2) is unSubscribe!
Javadoc: debounce(debounceSelector)
主要应用于数据序列的节流操做,在指定的采样周期内获取指定的数据。Throttling
也用于稀疏序列。当生产者发出的值超出咱们想要的值时,咱们不须要每一个序列值,咱们能够经过限制它来稀释序列。
注意: 时间的划分不必定是统一的。例如,发射数据的时间间隔与划分数据的时间间隔一致时,在原始数据发送的一个时间点(此时数据尚未实际发送),此时可能因为划分时间已到,划分的数据片直接关闭了,因此有的时间片数据会有时间间隙差别。
提示: 操做默认在 computation 调度器上执行,可是你能够指定其它的调度器。
获取每一个 windowDuration
时间段内的原始数据序列中的第一项数据,直到原始数据所有发送完毕。
解析: 实际在每一个采样周期内,先发送第一项接收到的数据,而后丢弃后续周期内的数据项。
实例代码:
// 1. throttleFirst(long windowDuration, TimeUnit unit) // 指定每一个指定时间内取第一项数据, 直到原始数据序列所有发送结束 Observable.intervalRange(1, 10, 0, 1, TimeUnit.SECONDS) .doOnNext(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> DataSource doOnNext : " + t); } }).throttleFirst(2, TimeUnit.SECONDS) // 获取每隔2秒以内收集的第一项数据 // .throttleFirst(2, TimeUnit.SECONDS, Schedulers.newThread()) // 指定调度线程为newThread() .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> throttleFirst onSubscribe"); } @Override public void onNext(Long t) { System.out.println("-------------> throttleFirst onNext: " + t); } @Override public void onError(Throwable e) { System.out.println("--> throttleFirst onError: " + e); } @Override public void onComplete() { System.out.println("--> throttleFirst onComplete"); } });
输出:
--> throttleFirst onSubscribe --> DataSource doOnNext : 1 -------------> throttleFirst onNext: 1 --> DataSource doOnNext : 2 --> DataSource doOnNext : 3 --> DataSource doOnNext : 4 -------------> throttleFirst onNext: 4 --> DataSource doOnNext : 5 --> DataSource doOnNext : 6 --> DataSource doOnNext : 7 -------------> throttleFirst onNext: 7 --> DataSource doOnNext : 8 --> DataSource doOnNext : 9 -------------> throttleFirst onNext: 9 --> DataSource doOnNext : 10 --> throttleFirst onComplete
Javadoc: throttleFirst(windowDuration, unit)
Javadoc: throttleFirst(windowDuration, unit, scheduler)
获取每一个 windowDuration
时间段内的原始数据序列中的最近的一项数据,直到原始数据所有发送完毕。throttleLast
运算符以固定间隔而不是相对于最后一项来划分时间。它会在每一个窗口中发出最后一个值,而不是它后面的第一个值。
解析: 实际在每一个采样周期内,先缓存收集的数据,等周期结束发送最后一项数据,丢弃最后数据项前面的数据。
实例代码:
// 2. throttleLast(long intervalDuration, TimeUnit unit) // 指定间隔时间内取最后一项数据,直到原始数据序列所有发送结束 Observable.intervalRange(1, 10, 0, 1050, TimeUnit.MILLISECONDS) .doOnNext(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> DataSource doOnNext : " + t); } }).throttleLast(2, TimeUnit.SECONDS) // 获取每隔2秒以内收集的最后一项数据 // .throttleLast(2, TimeUnit.SECONDS, Schedulers.newThread()) // 指定调度线程为newThread() .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> throttleLast onSubscribe"); } @Override public void onNext(Long t) { System.out.println("-------------> throttleLast onNext: " + t); } @Override public void onError(Throwable e) { System.out.println("--> throttleLast onError: " + e); } @Override public void onComplete() { System.out.println("--> throttleLast onComplete"); } });
输出:
--> throttleLast onSubscribe --> DataSource doOnNext : 1 --> DataSource doOnNext : 2 -------------> throttleLast onNext: 2 --> DataSource doOnNext : 3 --> DataSource doOnNext : 4 -------------> throttleLast onNext: 4 --> DataSource doOnNext : 5 --> DataSource doOnNext : 6 -------------> throttleLast onNext: 6 --> DataSource doOnNext : 7 --> DataSource doOnNext : 8 -------------> throttleLast onNext: 8 --> DataSource doOnNext : 9 --> DataSource doOnNext : 10 --> throttleLast onComplete
Javadoc: throttleLast(intervalDuration, unit)
Javadoc: throttleLast(intervalDuration, unit, scheduler)
指定每一个数据发射后在 timeout
时间内,原始数据序列中没有下一个数据发射时,发射此项数据,不然丢弃这项数据。此操做与 debounce
方法相同。
注意: 这个操做符会在原始数据的 onCompleted
时候直接发射发射数据,不会由于限流而丢弃数据。
实例代码:
// 3. throttleWithTimeout(long timeout, TimeUnit unit) // 发送一个数据,若是在包含timeout时间内,没有第二个数据发射,那么就会发射此数据,不然丢弃此数据 Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); // 下一个数据到此数据发射, --> skip: 30 < timeout Thread.sleep(30); emitter.onNext(2); // 下一个数据到此数据发射, --> skip: 50 = timeout Thread.sleep(50); emitter.onNext(3); // 下一个数据到此数据发射, --> deliver: 60 > timeout Thread.sleep(60); emitter.onNext(4); // onComplete --> deliver: onComplete emitter.onComplete(); } }).throttleWithTimeout(50, TimeUnit.MILLISECONDS) // 指定防抖丢弃时间段为50毫秒 // .throttleWithTimeout(50, TimeUnit.MILLISECONDS, Schedulers.newThread()) // 指定调度线程为newThread() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { // TODO Auto-generated method stub System.out.println("--> accept throttleWithTimeout(3): " + t); } });
输出:
--> accept throttleWithTimeout(3): 3 --> accept throttleWithTimeout(3): 4
Javadoc: throttleWithTimeout(timeout, unit)
Javadoc: throttleWithTimeout(timeout, unit, scheduler)
sample
容许您经过将序列划分为时间片断,并从每片中取出一个值来稀疏序列。当每片结束时,将发出其中的最后一个值(若是有的话)。
注意: 时间的划分不必定是统一的。例如,发射数据的时间间隔与划分数据的时间间隔一致时,在原始数据发送的一个时间点(此时数据尚未实际发送),此时可能因为划分时间已到,划分的数据片直接关闭了,因此有的时间片数据会有时间间隙差别。
获取每一个 period
时间片断内手机收据序列的最后一项,忽略此时间片内收集的其余数据项。
实例代码:
// 1. sample(long period, TimeUnit unit)/sample(long period, TimeUnit unit) // 将序列分为 period 的时间片断,从每片重取出最近的一个数据 // 等同于throttleLast Observable.intervalRange(1, 5, 0, 1100, TimeUnit.MILLISECONDS) .doOnNext(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> DataSource onNext: " + t); } }).sample(2, TimeUnit.SECONDS) // 每3秒时间段数据中取最近一个值 // .sample(2, TimeUnit.SECONDS, true) // 参数emitLast,设置是否忽略未采样的最后一个数据 // .sample(2, TimeUnit.SECONDS, Schedulers.newThread()) // 指定调度器为newThread() .subscribe(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept(1): " + t); } });
输出:
--> DataSource onNext: 1 --> DataSource onNext: 2 --> accept(1): 2 --> DataSource onNext: 3 --> DataSource onNext: 4 --> accept(1): 4 --> DataSource onNext: 5
Javadoc: sample(long period, TimeUnit unit)
Javadoc: sample(long period, TimeUnit unit, emitLast)
Javadoc: sample(long period, TimeUnit unit, scheduler)
Javadoc: sample(long period, TimeUnit unit, scheduler, emitLast)
sample
的这个方法每当第二个 sampler
发射一个数据(或者当它终止)时就对原始 Observable 进行采样。第二个Observable经过参数传递给 sample
。
实例代码:
// 2. sample(ObservableSource sampler) // 每当第二个 sampler 发射一个数据(或者当它终止)时就对原始 Observable进行采样 Observable.intervalRange(1, 5, 0, 1020, TimeUnit.MILLISECONDS) .doOnNext(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> DataSource onNext: " + t); } }).sample(Observable.interval(2, TimeUnit.SECONDS)) // 每隔2秒进行一次采样 .subscribe(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept(2): " + t); } });
输出:
--> DataSource onNext: 1 --> DataSource onNext: 2 --> accept(2): 2 --> DataSource onNext: 3 --> DataSource onNext: 4 --> accept(2): 4 --> DataSource onNext: 5
Javadoc: sample(sampler)
Javadoc: sample(sampler, emitLast)
抑制(过滤掉)重复的数据项。Distinct 的过滤规则是:只容许尚未发射过的数据项经过。
在某些实现中,有一些方法中容许你调整断定两个数据不一样( distinct )的标准。还有一些实现只比较一项数据和它的直接前驱,所以只会从序列中过滤掉连续重复的数据。
只容许尚未发射过的数据项经过,过滤数据序列中的全部重复的数据项,保证处理后的数据序列没有重复。
示例代码:
// 1. distinct() // 去除所有数据中重复的数据 Observable.just(1, 2, 3, 3, 3, 4, 4, 5, 6, 6) .distinct() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept distinct(1): " + t); } });
输出:
--> accept distinct(1): 1 --> accept distinct(1): 2 --> accept distinct(1): 3 --> accept distinct(1): 4 --> accept distinct(1): 5 --> accept distinct(1): 6
Javadoc: distinct()
这个操做符接受一个函数。这个函数根据原始Observable发射的数据项产生一个 Key
,而后,比较这些Key而不是数据自己,来断定两个数据是不是不一样的。
实例代码:
// 数根据原始Observable发射的数据项产生一个 Key,而后比较这些Key而不是数据自己,来断定两个数据是不是不一样的(去除所有数据中重复的数据) Observable.just(1, 2, 3, 3, 4, 5, 6, 6) .distinct(new Function<Integer, String>() { @Override public String apply(Integer t) throws Exception { // 根据奇数或偶数来判断数据序列的重复的key return t % 2 == 0 ? "even" : "odd"; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept distinct(2): " + t); } });
输出:
--> accept distinct(2): 1 --> accept distinct(2): 2
Javadoc: distinct(keySelector)
distinctUntilChanged
操做符,去除数据序列中的连续重复项。它只断定一个数据和它的直接前驱是不是不一样的。
实例代码:
// 3. distinctUntilChanged() // 去除连续重复的数据 Observable.just(1, 2, 3, 3, 4, 5, 6, 6, 3, 2) .distinctUntilChanged() .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept distinctUntilChanged(3): " + t); } });
输出:
--> accept distinctUntilChanged(3): 1 --> accept distinctUntilChanged(3): 2 --> accept distinctUntilChanged(3): 3 --> accept distinctUntilChanged(3): 4 --> accept distinctUntilChanged(3): 5 --> accept distinctUntilChanged(3): 6 --> accept distinctUntilChanged(3): 3 --> accept distinctUntilChanged(3): 2
Javadoc: distinctUntilChanged()
distinctUntilChanged(keySelector)
操做符,根据一个函数产生的 Key
断定两个相邻的数据项是否是相同的,去除连续重复的数据。
实例代码:
// 4. distinctUntilChanged(Function<T,K>) // 数根据原始Observable发射的数据项产生的 Key,去除连续重复的数据 Observable.just(8, 2, 3, 5, 9, 5, 6, 6) .distinctUntilChanged(new Function<Integer, String>() { @Override public String apply(Integer t) throws Exception { // 根据原始数据处理后添加key,依据这个key来判断是否重复(去除连续重复的数据) return t % 2 == 0 ? "even" : "odd"; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept distinctUntilChanged(4): " + t); } });
输出:
--> accept distinctUntilChanged(4): 8 --> accept distinctUntilChanged(4): 3 --> accept distinctUntilChanged(4): 6
Javadoc: distinctUntilChanged(keySelector)
主要用于忽略Observable发射的指定的 N 项数据,如跳过数据序列的前面或后面 N 项数据,指定时间段内的数据项。
Skip
操做符的还有一些变体的操做方法以下:
忽略 Observable
发射的前 N
项数据,只保留以后的数据。
实例代码:
// 1. skip(long count) // 跳过前count项数据,保留后面的数据 Observable.range(1, 10) .skip(5) // 过滤数据序列前5项数据 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept skip(1): " + t); } });
输出:
--> accept skip(1): 6 --> accept skip(1): 7 --> accept skip(1): 8 --> accept skip(1): 9 --> accept skip(1): 10
Javadoc: skip(count)
skip
的这个变体接受一个时长参数,它会丢弃原始Observable开始的那段时间段发射的数据,时长和时间单位经过参数指定。
实例代码:
// 2. skip(long time, TimeUnit unit) // 跳过开始的time时间段内的数据,保留后面的数据 Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS) .skip(2, TimeUnit.SECONDS) // 跳过前2秒的数据 .subscribe(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept skip(2): " + t); } });
输出:
--> accept skip(2): 4 --> accept skip(2): 5
Javadoc: skip(time, unit)
Javadoc: skip(time, unit, scheduler)
使用 SkipLast
操做符修改原始Observable,你能够忽略Observable发射的后 N
项数据,只保留前面的数据。
实例代码:
// 3. skipLast(int count) // 跳过数据后面的count个数据 Observable.range(1, 10) .skipLast(5) // 跳过数据序列的后5项数据 .subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept skipLast(3): " + t); } });
输出:
--> accept skipLast(3): 1 --> accept skipLast(3): 2 --> accept skipLast(3): 3 --> accept skipLast(3): 4 --> accept skipLast(3): 5
Javadoc: skipLast(count)
还有一个 skipLast
变体接受一个时间段参数,它会丢弃在原始 Observable 的生命周期内最后一段时间内发射的数据。时长和时间单位经过参数指定。
注意: 这个机制是这样实现的:延迟原始 Observable 发射的任何数据项,直到自原始数据发射以后过了给定的时长以后,才开始发送数据。
实例代码:
// 4. skipLast(long time, TimeUnit unit, [boolean delayError]) // 丢弃在原始Observable的生命周 期内最后time时间内发射的数据 // 可选参数delayError:延迟异常通知 Observable.intervalRange(1, 10, 0, 1, TimeUnit.SECONDS) .doOnNext(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> DataSource: " + t); } }).skipLast(2, TimeUnit.SECONDS) // .skipLast(2, TimeUnit.SECONDS, Schedulers.trampoline()) // 经过scheduler指定工做线程 // .skipLast(2, TimeUnit.SECONDS, true) // 延迟Error的通知,多用于组合Observable的场景 .subscribe(new Consumer<Long>() { @Override public void accept(Long t) throws Exception { System.out.println("--> accept skipLast(4): " + t); } });
输出:
--> DataSource: 1 --> DataSource: 2 --> DataSource: 3 --> accept skipLast(4): 1 --> DataSource: 4 --> accept skipLast(4): 2 --> DataSource: 5 --> accept skipLast(4): 3 --> DataSource: 6 --> accept skipLast(4): 4 --> DataSource: 7 --> accept skipLast(4): 5 --> DataSource: 8 --> accept skipLast(4): 6 --> DataSource: 9 --> accept skipLast(4): 7 --> DataSource: 10 --> accept skipLast(4): 8
注意: skipLast 的这个操做默认在 computation 调度器上执行,可是你可使用Scheduler参数指定其 它的调度器。
Javadoc: skipLast(time, unit)
Javadoc: skipLast(time, unit, delayError)
Javadoc: skipLast(time, unit, scheduler)
Javadoc: skipLast(time, unit, scheduler, delayError)
Javadoc: skipLast(time, unit, scheduler, delayError, bufferSize)
后续的Rx相关数据过滤部分请参考: Rxjava2 Observable的数据过滤详解及实例(二)
Rx介绍与讲解及完整目录参考:Rxjava2 介绍与详解实例