目录html
需求了解:java
对于 Observable 发射的数据有的时候可能不知足咱们的要求,或者须要转化为其余类型的数据,好比:缓存,数据类型转化,数据拦截等。此时可使用 Rx 中的一些对于数据操做的操做进行数据的变换,方便咱们的开发。react
执行变换的操做方法:api
Cast:在发射以前强制将Observable发射的全部数据转换为指定类型缓存
按期收集Observable的数据放进一个数据包裹(缓存),而后发射这些数据包裹,而不是一次发射一个值。网络
Buffer 操做符将一个Observable变换为另外一个,原来的Observable正常发射数据,变换产生 的Observable发射这些数据的缓存集合。 Buffer 操做符在不少语言特定的实现中有不少种变 体,它们在如何缓存这个问题上存在区别。数据结构
Window 操做符与 Buffer 相似,可是它在发射以前把收集到的数据放进单独的Observable, 而不是放进一个数据结构。并发
注意: 若是原来的Observable发射了一个 onError 通知, Buffer 会当即传递这个通知,而不是首先发射缓存的数据,即便在这以前缓存中包含了原始Observable发射的数据。app
在RxJava中的一些 Buffer 的操做以下:
ide
以列表(List)的形式发射非重叠的缓存,每个缓存至多包含来自原始 Observable 的 count
项数据(最后发射的列表数据可能少于count项)。
实例代码:
// 1. buffer(count) // 以列表(List)的形式发射非重叠的缓存, // 每个缓存至多包含来自原始 Observable的count项数据(最后发射的列表数据可能少于count项) Observable.range(1, 10) .buffer(3) .subscribe(new Consumer<List<Integer>>() { @Override public void accept(List<Integer> t) throws Exception { System.out.println("--> bufferr(1) accept: " + t); } });
输出:
--> bufferr(1) accept: [1, 2, 3] --> bufferr(1) accept: [4, 5, 6] --> bufferr(1) accept: [7, 8, 9] --> bufferr(1) accept: [10]
Javadoc: buffer(count)
开始建立一个List
收集原始 Observable 数据,监视一个名叫 boundary
的Observable,每当这个Observable发射了一个值,它就建立一个新的 List
开始收集来自原始Observable的数据并发射原来已经收集数据的 List
, 当 boundary
Observable 发送了完成通知,会将此时还未发送的 List 发送。
注意: 全部发送的 List 可能没有收集到数据,此时数据的收集可能并不会完整收集全部原始 Observable 数据。
实例代码:
// 2. buffer(boundary) 监视一个名叫boundary的Observable, // 开始建立一个List收集原始 Observable 数据,监视一个名叫boundary的Observable, // 每当这个Observable发射了一个值,它就建立一个新的List开始收集来自原始Observable的数据并发射原来已经收集数据的List, // 当boundary发送了完成通知,会将此时还未发送的 List 发送。 // 全部发送的 List 可能没有收集到数据,此时数据的收集可能并不会完整收集全部原始Observable数据。 Observable.range(1, 10000) .buffer(Observable.timer(1, TimeUnit.MILLISECONDS)) // 1毫秒后开始接受原始数据 .subscribe(new Consumer<List<Integer>>() { @Override public void accept(List<Integer> t) throws Exception { System.out.println("--> accept(2): " + t.size()); // 每次收集的数据序列个数 } });
输出:
--> accept(2): 2858 --> accept(2): 5471
Javadoc: buffer(boundary)
从原始Observable的第一项数据开始建立新的缓存,此后每当收 到 skip
项数据,用 count
项数据填充缓存:开头的一项和后续的 count-1 项,它以列表 (List)的形式发射缓存,取决于 count 和 skip 的值,这些缓存可能会有重叠部分(好比skip < count时),也可能会有间隙(好比skip > count时)。
解析: 在指定的数据序列中移动指针
来获取缓存数据:指针每次移动 skip
个数据长度,每次缓存指针位置及后面count
个数据,指针初始位置在原始数据的第一个(存在的状况下)。
实例代码:
// 3. buffer(int count, int skip) // 在指定的数据中移动指针来获取缓存数据:指针每次移动1个数据长度,每次缓存3个数据 Observable.range(1, 5) .buffer(3, 1) .subscribe(new Consumer<List<Integer>>() { @Override public void accept(List<Integer> t) throws Exception { System.out.println("--> bufferr(3) accept: " + t); } });
输出:
--> bufferr(3) accept: [1, 2, 3] --> bufferr(3) accept: [2, 3, 4] --> bufferr(3) accept: [3, 4, 5] --> bufferr(3) accept: [4, 5] --> bufferr(3) accept: [5]
Javadoc: buffer(count, skip)
按期以 List 的形式发射新的数据,在每一个时间段,收集来自原始 Observable 的数据(从前面一个数据包裹以后,或者若是是第一个数据包裹,从有观察者订阅原来的 Observale 以后开始)。还有另外一个版本的 buffer 接受一个 Scheduler 参数。
解析: 每隔 timespan
时间段以 List
的形式收集原始Observable的数据
实例代码:
// 4. buffer(long timespan, TimeUnit unit) // 每隔timespan时间段以list的形式收集数据 Observable.range(1, 50000) .buffer(1, TimeUnit.MILLISECONDS) // 每隔1毫秒收集一次原始序列数据 .subscribe(new Consumer<List<Integer>>() { @Override public void accept(List<Integer> t) throws Exception { System.out.println("--> bufferr(4) accept: " + t.size()); // 每次收集的数据序列个数 } });
输出:
--> bufferr(4) accept: 2571 --> bufferr(4) accept: 5457 --> bufferr(4) accept: 13248 --> bufferr(4) accept: 12755 --> bufferr(4) accept: 9543 --> bufferr(4) accept: 6426
注意:
buffer(timespan,TimeUnit)
默认状况下会使用 computation 调度器
Javadoc: buffer(timespan,TimeUnit)
Javadoc: buffer(timespan,TimeUnit,Scheduler)
每当收到来自原始 Observable
的 count
项数据,或者每过了一段指定 timespan
时间后, 就以 List 的形式发射这期间的数据,即便数据项少于 count 项。还有另外一个版本的 buffer 接受一个 Scheduler
参数。
实例代码:
// 5. buffer(long timespan, TimeUnit unit, int count) // 每隔1毫秒缓存50个数据 Observable.range(1, 1000) .buffer(1, TimeUnit.MILLISECONDS, 50) // 每隔1毫秒收集50个数据序列 .subscribe(new Consumer<List<Integer>>() { @Override public void accept(List<Integer> t) throws Exception { System.out.println("--> bufferr(5) accept: " + t.size()); // 每次收集的数据序列个数 } });
输出:
--> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 20 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 4 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 50 --> bufferr(5) accept: 26
注意:
buffer(timespan, TimeUnit, count)
默认状况下会使用 computation 调度器
Javadoc: buffer(timespan, TimeUnit, count)
Javadoc: buffer(timespan, TimeUnit, scheduler, count)
在每个 timeskip
时期内都建立一个新的 List
,而后用原始 Observable 发射的每一项数据填充这个列表(在把这个 List 当作本身的数据发射前,从建立时开始,直到过了 timespan 这么长的时间)。若是 timespan
长于 timeskip
,它发射的数据包将会重叠,所以可能包含重复的数据项。
解析: 在每隔 timeskip
时间段都建立一个新的 List ,每一个 List 都独立收集 timespan
时间段原始Observable发射的数据。 所以在 timespan 长于 timeskip 时,它发射的数据包将会重叠,所以不一样 List 中可能包含重复的数据项。 还有另外一个版本的 buffer 接受一个 Scheduler 参数。
实例代码:
// 6. buffer(long timespan, long timeskip, TimeUnit unit) // 在每个timeskip时期内都建立一个新的 List, // 每一个List都独立收集timespan时间段原始Observable发射的数据, // 若是 timespan 长于 timeskip,它发射的数据包将会重叠,所以不一样List中可能包含重复的数据项 Observable.range(1, 50000) .buffer(1, 1, TimeUnit.MILLISECONDS, Schedulers.newThread()) .subscribe(new Consumer<List<Integer>>() { @Override public void accept(List<Integer> t) throws Exception { System.out.println("--> accept(6): " + t.size()); // 每次收集的数据序列个数 } });
输出:
--> accept(6): 1412 --> accept(6): 733 --> accept(6): 10431 --> accept(6): 694 --> accept(6): 18944 --> accept(6): 10710 --> accept(6): 944 --> accept(6): 6132
注意:
buffer(imespan, timeskip, TimeUnit)
默认状况下会使用 computation 调度器。
Javadoc: buffer(imespan, timeskip, TimeUnit)
Javadoc: buffer(imespan, timeskip, TimeUnit, schedule)
当它订阅原来的Observable时,开始将数据收集到一个List
,而后它调用 bufferClosingSelector
生成第二个Observable
,当第二个Observable 发射一个TClosing
时,buffer 发射当前的 List
,而后重复
这个过程:开始组装一个新的List,而后调用bufferClosingSelector建立一个新的Observable并监视它。
注意: 它会一直这样作直到原来的Observable执行完成,能够收集完整
的原始 Observable 的数据
实例代码:
// 7. buffer(Callable<ObservableSource<T>> boundarySupplier) // 当它订阅原来的Observable时,开始将数据收集到一个List,而后它调用 bufferClosingSelector 生成第二个Observable, // 当第二个Observable 发射一个 TClosing 时,buffer 发射当前的 List , // 而后重复这个过程:开始组装一个新的List,而后调用bufferClosingSelector建立一个新的Observable并监视它。 // 它会一直这样作直到原来的Observable执行完成。会收集完整的原始 Observable 的数据 Observable.range(1, 50000) .buffer(new Callable<Observable<Long>>() { @Override public Observable<Long> call() throws Exception { return Observable.timer(1, TimeUnit.MILLISECONDS); } }).subscribe(new Consumer<List<Integer>>() { @Override public void accept(List<Integer> t) throws Exception { System.out.println("--> accept(7): " + t.size()); // 每次收集的数据序列个数 } });
输出:
--> accept(7): 14650 --> accept(7): 9708 --> accept(7): 25642
Javadoc: buffer(bufferClosingSelector)
对Observable发射的每一项数据应用一个函数,执行变换操做。
实例代码:
// map(Function<T,R)) // 接受原始Observable的数据,发送处理后的数据 Observable.range(1, 5) .map(new Function<Integer, Integer>() { @Override public Integer apply(Integer t) throws Exception { System.out.println("--> apply: " + t); return t*t; // 计算原始数据的平方 } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept Map: " + t); } });
输出:
--> apply: 1 --> accept Map: 1 --> apply: 2 --> accept Map: 4 --> apply: 3 --> accept Map: 9 --> apply: 4 --> accept Map: 16 --> apply: 5 --> accept Map: 25
Javadoc: map(mapper)
主要对原始数据进行转换操做后发送至订阅者。
RxJava2 中的一些 FlatMap 操做方法以下:
FlatMap 将一个发射数据的 Observable 变换为 多个 Observables,而后将它们发射的数据合并后放进一个单独的 Observable。
FlatMap 操做符使用一个指定的函数对原始Observable发射的每一项数据执行变换操做,这个函数返回一个自己也发射数据的Observable,而后 FlatMap
合并这些Observables发射的数据,最后将合并后的结果当作它本身的数据序列发射。
这个方法是颇有用的,例如,当你有一个这样的Observable:它发射一个数据序列,这些数据自己包含Observable成员或者能够变换为Observable,所以你能够建立一个新的 Observable发射这些次级Observable发射的数据的完整集合。
注意: FlatMap 对这些Observables发射的数据作的是合并(merge)操做,所以它们多是交错的。
在许多语言特定的实现中,还有一个操做符不会让变换后的Observables发射的数据交错,它按照严格的顺序发射这些数据,这个操做符一般被叫做ConcatMap
或者相似的名字。
实例代码:
// 1. flatMap(Function) // 对原始Observable发射的每一项数据执行变换操做,这个函数返回一个自己也发射数据的Observable, // 而后FlatMap合并这些Observables发射的数据,最后将合并后的结果当作它本身的数据序列发射 Observable.range(1, 5) .flatMap(new Function<Integer, ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> apply(Integer t) throws Exception { System.out.println("--> apply(1): " + t); // 原始数据 return Observable.range(1, t).subscribeOn(Schedulers.newThread()); // 处理后数据 } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept flatMap(1): " + t); // 接受的全部数据 } });
输出:
--> apply(1): 1 --> apply(1): 2 --> apply(1): 3 --> apply(1): 4 --> accept flatMap(1): 1 --> accept flatMap(1): 2 --> apply(1): 5 --> accept flatMap(1): 1 --> accept flatMap(1): 1 --> accept flatMap(1): 2 --> accept flatMap(1): 3 --> accept flatMap(1): 4 --> accept flatMap(1): 1 --> accept flatMap(1): 2 --> accept flatMap(1): 3 --> accept flatMap(1): 4 --> accept flatMap(1): 5 --> accept flatMap(1): 1 --> accept flatMap(1): 2 --> accept flatMap(1): 3
Javadoc: flatMap(mapper)
maxConcurrency
这个参数设置 flatMap 从原来的Observable映射Observables的最大同时订阅数。当达到这个限制时,它会等待其中一个终止而后再订阅另外一个。
实例代码:
// 2. flatMap(Function, maxConcurrency) // maxConcurrency 这个参数设置 flatMap 从原来的Observable映射Observables的最大同时订阅数。 // 当达到这个限制时,它会等待其中一个终止而后再订阅另外一个 Observable.range(1, 5) .flatMap(new Function<Integer, ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> apply(Integer t) throws Exception { System.out.println("--> apply(2): " + t); return Observable.range(1, t).subscribeOn(Schedulers.newThread()); } // 指定最大订阅数为1,此时等待上一个订阅的Observable结束,在进行下一个Observable订阅 }, 1).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept flatMap(2): "+ t); } });
输出:
--> apply(2): 1 --> apply(2): 2 --> apply(2): 3 --> apply(2): 4 --> apply(2): 5 --> accept flatMap(2): 1 --> accept flatMap(2): 1 --> accept flatMap(2): 2 --> accept flatMap(2): 1 --> accept flatMap(2): 2 --> accept flatMap(2): 3 --> accept flatMap(2): 1 --> accept flatMap(2): 2 --> accept flatMap(2): 3 --> accept flatMap(2): 4 --> accept flatMap(2): 1 --> accept flatMap(2): 2 --> accept flatMap(2): 3 --> accept flatMap(2): 4 --> accept flatMap(2): 5
Javadoc: flatMap(mapper, maxConcurrency)
delayError
这个参数指定是否延迟发生 Error
的Observable通知。还有一个能够指定最大订阅数参数 maxConcurrency
的变体。
解析: 当值为 true
时延迟发生Error
的这个订阅的Observable通知,不中断当前的订阅操做,继续下一个Observable的订阅,在全部订阅的Observable所有结束后发送 Error 这个Observable的通知,当值为 false
时则中断全部订阅的操做,并发送 Error
的通知。
实例代码:
// 3. flatMap(Function, delayErrors) // delayErrors 这个参数指定是否延迟发生Error的Observable通知 // 当true 时延迟发生Error的这个订阅的Observable通知,不中断当前的订阅操做, // 继续下一个Observable的订阅,在全部订阅的Observable所有结束后发送Error这个Observable的通知 Observable.range(1, 5) .flatMap(new Function<Integer, ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> apply(Integer t) throws Exception { System.out.println("--> apply(3): " + t); return Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { if( t == 3) { throw new NullPointerException("delayErrors test!"); // 测试 Error } for (int i = 1; i <= t; i++) { emitter.onNext(i); } emitter.onComplete(); } }); } // 设置延迟 Error 通知到最后 }, true).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept flatMap(3): "+ t); } },new Consumer<Throwable>() { @Override public void accept(Throwable t) throws Exception { System.out.println("--> acceot Error(3): " + t); } });
输出:
--> apply(3): 1 --> accept flatMap(3): 1 --> apply(3): 2 --> accept flatMap(3): 1 --> accept flatMap(3): 2 --> apply(3): 3 --> apply(3): 4 --> accept flatMap(3): 1 --> accept flatMap(3): 2 --> accept flatMap(3): 3 --> accept flatMap(3): 4 --> apply(3): 5 --> accept flatMap(3): 1 --> accept flatMap(3): 2 --> accept flatMap(3): 3 --> accept flatMap(3): 4 --> accept flatMap(3): 5 --> acceot Error(3): java.lang.NullPointerException: delayErrors test!
Javadoc: flatMap(Function, delayErrors)
Javadoc: flatMap(Function, delayErrors, maxConcurrency)
flatMapIterable 这个变体成对的打包数据,而后生成 Iterable
而不是原始数据和生成的 Observables,可是处理方式是相同的。
解析: 对数据进行处理转换成 Iterable
来发射数据。
实例代码:
// 4. flatMapIterable(Function(T,R)) // 对数据进行处理转换成Iterable来发射数据 Observable.range(1, 5) .flatMapIterable(new Function<Integer, Iterable<? extends Integer>>() { @Override public Iterable<? extends Integer> apply(Integer t) throws Exception { System.out.println("--> apply: " + t); ArrayList<Integer> list = new ArrayList<Integer>(); list.add(888); list.add(999); return list; // 将原始数据转换为两个数字发送 } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept flatMapIterable(4): " + t); } });
输出:
--> apply: 1 --> accept flatMapIterable(4): 888 --> accept flatMapIterable(4): 999 --> apply: 2 --> accept flatMapIterable(4): 888 --> accept flatMapIterable(4): 999 --> apply: 3 --> accept flatMapIterable(4): 888 --> accept flatMapIterable(4): 999 --> apply: 4 --> accept flatMapIterable(4): 888 --> accept flatMapIterable(4): 999 --> apply: 5 --> accept flatMapIterable(4): 888 --> accept flatMapIterable(4): 999
Javadoc: flatMapIterable(mapper)
参数 mapper
接收原始数据,resultSelector
同时接收原始数据和 mapper
处理的数据,进行二次数据转换。
实例代码:
// 5. flatMapIterable(Function(T,R),Function(T,T,R)) // 第一个func接受原始数据,转换数据,第二个func同时接受原始和处理的数据,进行二次转换处理 Observable.range(1, 3) .flatMapIterable(new Function<Integer, Iterable<? extends Integer>>() { @Override public Iterable<? extends Integer> apply(Integer t) throws Exception { ArrayList<Integer> list = new ArrayList<Integer>(); list.add(888); list.add(999); return list; // 将原始数据转换为两个数字发送 } }, new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer t1, Integer t2) throws Exception { System.out.println("--> apply(5): t1 = " + t1 + ", t2 = " + t2); return t1 + t2; // 将原始数据和处理过的数据组合进行二次处理发送 } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept flatMapIterable(5): " + t); } });
输出:
--> apply(5): t1 = 1, t2 = 888 --> accept flatMapIterable(5): 889 --> apply(5): t1 = 1, t2 = 999 --> accept flatMapIterable(5): 1000 --> apply(5): t1 = 2, t2 = 888 --> accept flatMapIterable(5): 890 --> apply(5): t1 = 2, t2 = 999 --> accept flatMapIterable(5): 1001 --> apply(5): t1 = 3, t2 = 888 --> accept flatMapIterable(5): 891 --> apply(5): t1 = 3, t2 = 999 --> accept flatMapIterable(5): 1002
concatMap
操做符的功能和 flatMap
是很是类似的,只是有一点,concatMap 最终输出的数据序列和原数据序列是一致,它是按顺序连接Observables,而不是合并(flatMap用的是合并)。
经过 mapper
处理原数据后,转换成 Observables ,按照顺序进行链接 Observables 发送数据。
解析: concatMap
和flatMap
的功能是同样的, 将一个发射数据的Observable变换为多个Observables,而后将它们发射的数据放进一个单独的Observable。只不过最后合并ObservablesflatMap采用的merge
,而concatMap采用的是链接(concat
)。区别:concatMap是有序的,flatMap是无序的,concatMap最终输出的顺序与原序列保持一致,而flatMap则不必定,有可能出现交错。
实例代码:
// 1. concatMap(Function(T,R)) // 按照顺序依次处理原始数据和处理的数据 Observable.range(1, 3) .concatMap(new Function<Integer, ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> apply(Integer t) throws Exception { System.out.println("--> apply(1): " + t); return Observable.range(1, t).doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable t) throws Exception { System.out.println("--> accept(1): Observable on Subscribe"); // 当前的Observable被订阅 } }); } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept concatMap(1): " + t); } }); System.out.println("--------------------------------------------"); // 2. concatMap(mapper, prefetch) // prefetch 参数是在处理后的Observables发射的数据流中预读数据个数,不影响原数据的发射和接收顺序 Observable.range(1, 3) .concatMap(new Function<Integer, ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> apply(Integer t) throws Exception { System.out.println("--> apply(2): " + t); return Observable.range(1, 3).doOnSubscribe(new Consumer<Disposable>() { @Override public void accept(Disposable t) throws Exception { System.out.println("--> accept(2): Observable on Subscribe"); // 当前的Observable被订阅 } }); } }, 2).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept concatMap(2): " + t); } });
输出:
--> apply(1): 1 --> accept(1): Observable on Subscribe --> accept concatMap(1): 1 --> apply(1): 2 --> accept(1): Observable on Subscribe --> accept concatMap(1): 1 --> accept concatMap(1): 2 --> apply(1): 3 --> accept(1): Observable on Subscribe --> accept concatMap(1): 1 --> accept concatMap(1): 2 --> accept concatMap(1): 3 -------------------------------------------- --> apply(2): 1 --> accept(2): Observable on Subscribe --> accept concatMap(2): 1 --> accept concatMap(2): 2 --> accept concatMap(2): 3 --> apply(2): 2 --> accept(2): Observable on Subscribe --> accept concatMap(2): 1 --> accept concatMap(2): 2 --> accept concatMap(2): 3 --> apply(2): 3 --> accept(2): Observable on Subscribe --> accept concatMap(2): 1 --> accept concatMap(2): 2 --> accept concatMap(2): 3
Javadoc: concatMap(mapper)
Javadoc: concatMap(mapper, refetch)
有选择的订阅 Observable
,当原始 Observable 发射一个数据,经过 witchMap
返回一个 Observable,
当原始Observable发射一个新的数据时,它将取消订阅并中止监视产生执以前的Observable,开始监视当前新的Observable。
解析: 若是上一个任务还没有完成时,就开始下一个任务的话,上一个任务就会被取消掉。若是全部任务都是在同一个线程里执行的话,此时这个操做符与 ContactMap 一致,都是依次顺序执行。只有在不一样的线程里执行的时候,即线程方案为newThread的时候,才会出现这种状况,经常使用于网络请求
中。
实例代码:
// 1. witchMap(Function(T,R)) // 同一个线程执行 Observable.range(1, 3) .switchMap(new Function<Integer, ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> apply(Integer t) throws Exception { System.out.println("--> apply(1): " + t); return Observable.range(1, 3); // 每一个任务指定在同一个线程执行 } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept switchMap(1): " + t); } }); System.out.println("---------------------------------------"); // 2. witchMap(Function(T,R)) // 不一样线程执行 Observable.range(1, 3) .switchMap(new Function<Integer, ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> apply(Integer t) throws Exception { System.out.println("--> apply(2): " + t); return Observable.range(1, 3) .subscribeOn(Schedulers.newThread()); // 每一个任务指定在子线程执行 } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept switchMap(2): " + t); } }); System.out.println("---------------------------------------"); // 3. switchMap(mapper, bufferSize) // bufferSize 参数是从当前活动的Observable中预读数据的大小 Observable.range(1, 3) .switchMap(new Function<Integer, ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> apply(Integer t) throws Exception { System.out.println("--> apply(3): " + t); return Observable.range(1, 5).subscribeOn(Schedulers.newThread()); } }, 3).subscribe(new Consumer<Integer>() { // 指定缓存大小为3 @Override public void accept(Integer t) throws Exception { System.out.println("--> accept switchMap(3): " + t); } });
输出:
--> apply(1): 1 --> accept switchMap(1): 1 --> accept switchMap(1): 2 --> accept switchMap(1): 3 --> apply(1): 2 --> accept switchMap(1): 1 --> accept switchMap(1): 2 --> accept switchMap(1): 3 --> apply(1): 3 --> accept switchMap(1): 1 --> accept switchMap(1): 2 --> accept switchMap(1): 3 --------------------------------------- --> apply(2): 1 --> apply(2): 2 --> apply(2): 3 --> accept switchMap(2): 1 --> accept switchMap(2): 2 --> accept switchMap(2): 3 --------------------------------------- --> apply(3): 1 --> apply(3): 2 --> apply(3): 3 --> accept switchMap(3): 1 --> accept switchMap(3): 2 --> accept switchMap(3): 3 --> accept switchMap(3): 4 --> accept switchMap(3): 5
Javadoc: switchMap(mapper)
Javadoc: switchMap(mapper, bufferSize)
后续的Rx相关数据变换部分请参考: Rxjava2 Observable的数据变换详解及实例(二)
Rx介绍与讲解及完整目录参考:Rxjava2 介绍与详解实例