版权声明:本文为openXu原创文章【openXu的博客】,未经博主容许不得以任何形式转载java
目录:git
按期收集Observable的数据放进一个数据包裹,而后发射这些数据包裹,而不是一次发射一个值。github
在RxJava中有许多Buffer的变体(下面列举两个示例):web
示例代码:缓存
//一组缓存3个数据 Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) .buffer(3) .subscribe(i -> Log.d(TAG, "1buffer-count:" + i)); //每隔三个数据缓存2个数据 Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) .buffer(2, 3) .subscribe(i -> Log.d(TAG, "1buffer-count&skip:" + i)); Observable.interval(1, TimeUnit.SECONDS). buffer(3, TimeUnit.SECONDS) .subscribe(i -> Log.d(TAG, "2buffer-count:" + i)); Observable.interval(1, TimeUnit.SECONDS). buffer(2, 3, TimeUnit.SECONDS) .subscribe(i -> Log.d(TAG, "2buffer-count&skip:" + i));
输出:数据结构
1buffer-count:[1, 2, 3]
1buffer-count:[4, 5, 6]
1buffer-count:[7, 8, 9]
1buffer-count:[10]ide
1buffer-count&skip:[1, 2]
1buffer-count&skip:[4, 5]
1buffer-count&skip:[7, 8]
1buffer-count&skip:[10]svg
2buffer-count&skip:[0]
2buffer-count:[0, 1]
2buffer-count&skip:[3, 4]
2buffer-count:[2, 3, 4]
2buffer-count&skip:[5, 6]
2buffer-count:[5, 6, 7]
2buffer-count&skip:[8, 9]
2buffer-count:[8, 9, 10]
2buffer-count&skip:[11, 12]
2buffer-count:[11, 12, 13]
…函数
vFlatMap将一个发射数据的Observable变换为多个Observables,而后将它们发射的数据合并后放进一个单独的Observable.net
FlatMap操做符使用一个指定的函数对原始Observable发射的每一项数据执行变换操做,这个函数返回一个自己也发射数据的Observable,而后FlatMap合并这些Observables发射的数据,最后将合并后的结果当作它本身的数据序列发射
这个方法是颇有用的,例如,当你有一个这样的Observable:它发射一个数据序列,这些数据自己包含Observable成员或者能够变换为Observable,所以你能够建立一个新的Observable发射这些次级Observable发射的数据的完整集合
注意:FlatMap对这些Observables发射的数据作的是合并(merge)操做,所以它们多是交错的。
在许多语言特定的实现中,还有一个操做符不会让变换后的Observables发射的数据交错,它按照严格的顺序发射这些数据,这个操做符一般被叫做ConcatMap或者相似的名字
注意:若是任何一个经过这个flatMap操做产生的单独的Observable调用onError异常终止了,这个Observable自身会当即调用onError并终止。
这个操做符有一个接受额外的int参数的一个变体。这个参数设置flatMap从原来的Observable映射Observables的最大同时订阅数。当达到这个限制时,它会等待其中一个终止而后再订阅另外一个。
Javadoc: flatMap(Func1))
Javadoc: flatMap(Func1,int))
##flatMapIterable:
这个变体成对的打包数据,而后生成Iterable而不是原始数据和生成的Observables,可是处理方式是相同的
##concatMap
它相似于最简单版本的flatMap,可是它按次序链接而不是合并那些生成的Observables,而后产生本身的数据序列。
##switchMap:
它和flatMap很像,除了一点:当原始Observable发射一个新的数据(Observable)时,它将取消订阅并中止监视产生执以前那个数据的Observable,只监视当前这一个
示例代码:
//将发射的数据都加上flat map的前缀 Observable.just(1, 2, 3, 4, 5) .flatMap(integer -> Observable.just("flat map:" + integer)) .subscribe(i -> Log.d(TAG, i)); //会输出n个n数字 Observable.just(1, 2, 3, 4) .flatMapIterable( integer -> { ArrayList<Integer> s = new ArrayList<>(); for (int i = 0; i < integer; i++) { s.add(integer); } return s; } ) .subscribe(i -> Log.d(TAG, "flatMapIterable:" + i));
输出:
flat map:1
flat map:2
flat map:3
flat map:4
flat map:5
flatMapIterable:1
flatMapIterable:2
flatMapIterable:2
flatMapIterable:3
flatMapIterable:3
flatMapIterable:3
flatMapIterable:4
flatMapIterable:4
flatMapIterable:4
flatMapIterable:4
GroupBy操做符将原始Observable分拆为一些Observables集合,它们中的每个发射原始Observable数据序列的一个子序列。哪一个数据项由哪个Observable发射是由函数getKey 断定的,这个函数给每一项指定一个Key,Key相同的数据会被同一个Observable发射。
groupBy操做符返回Observable的一个特殊子类GroupedObservable,实现了GroupedObservable接口的对象有一个额外的方法getKey,这个Key用于将数据分组到指定的Observable。
注意:groupBy将原始Observable分解为一个发射多个GroupedObservable的Observable,一旦有订阅,每一个GroupedObservable就开始缓存数据。所以,若是你忽略这些GroupedObservable中的任何一个,这个缓存可能造成一个潜在的内存泄露。所以,若是你不想观察,也不要忽略GroupedObservable。你应该使用像take(0)这样会丢弃本身的缓存的操做符。
若是你取消订阅一个GroupedObservable,那个Observable将会终止。若是以后原始的Observable又发射了一个与这个Observable的Key匹配的数据,groupBy将会为这个Key建立一个新的GroupedObservable。
groupBy默认不在任何特定的调度器上执行。
示例代码:
private void op_GroupBy(TextView textView){ // groupBy(Func1):Func1是对数据分组(肯定key) Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9) .groupBy(new Func1<Integer, String>() { @Override public String call(Integer integer) { //按照奇数和偶数分组 return integer % 2 == 0 ? "偶数" : "奇数"; } }).subscribe(new Action1<GroupedObservable<String, Integer>>() { @Override public void call(GroupedObservable<String, Integer> groupedObservable) { // groupedObservable.count() // .subscribe(integer -> Log.v(TAG, "key" + groupedObservable.getKey() + " contains:" + integer + " numbers")); groupedObservable.subscribe(value->Log.v(TAG, "key" + groupedObservable.getKey() + " value:"+value)); } }); // groupBy(Func1,Func1):Func1是对数据分组(肯定key),Func2发射每一个数据,在这里面能够对原始数据作处理 Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9) .groupBy(new Func1<Integer, String>() { @Override public String call(Integer integer) { //按照奇数和偶数分组 return integer % 2 == 0 ? "偶数" : "奇数"; } }, new Func1<Integer, String>() { @Override public String call(Integer integer) { //在数字前面加上说明,若是不加这个参数,最后发射的数据就是原始整数 return (integer % 2 == 0 ? "偶数" : "奇数")+integer; } }).subscribe(new Action1<GroupedObservable<String, String>>() { @Override public void call(GroupedObservable<String, String> groupedObservable) { // groupedObservable.count() // .subscribe(integer -> Log.v(TAG, "key" + groupedObservable.getKey() + " contains:" + integer + " numbers")); groupedObservable.subscribe(value->Log.v(TAG, "key" + groupedObservable.getKey() + " value:"+value)); } }); }
输出:
key奇数 value:1
key偶数 value:2
key奇数 value:3
key偶数 value:4
key奇数 value:5
key偶数 value:6
key奇数 value:7
key偶数 value:8
key奇数 value:9
key奇数 value:奇数1
key偶数 value:偶数2
key奇数 value:奇数3
key偶数 value:偶数4
key奇数 value:奇数5
key偶数 value:偶数6
key奇数 value:奇数7
key偶数 value:偶数8
key奇数 value:奇数9
示例代码:
String[] names = {"张三", "李四", "王二", "麻子"}; //map Observable.from(names).map(new Func1<String, String>() { @Override public String call(String s) { //将原始Observable发射的每一项数据前面加上 “姓名:” return "姓名:"+s; } }).subscribe(new Action1<String>() { @Override public void call(String s) { Log.v(TAG, s); } }); //cast: 多态中能够将父类引用强转为子类对象 //cast的强转只适用于多态,而不适用于String强转为Integer Animal animal = new Dog(); //多态 Observable.just(animal) .cast(Dog.class) .subscribe(new Action1<Dog>() { @Override public void call(Dog dog) { Log.v(TAG, "Cast ->" + dog); } });
输出:
姓名:张三
姓名:李四
姓名:王二
姓名:麻子
Cast ->com.openxu.rxjava.operators.TransformOperators$Dog@52729440
示例代码:
//会把原始数据的第一项当作新的第一项发射 Observable.just(1, 2, 3, 4, 5) .scan(new Func2<Integer, Integer, Integer>() { @Override public Integer call(Integer sum, Integer item) { Log.v(TAG, ">应用函数:" + sum+" ,"+item); return sum + item; } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.v(TAG, "Next:" + integer); } }); //scan将发射种子值3做为本身的第一项数据 Observable.just(1, 2, 3, 4, 5) .scan(2, new Func2<Integer, Integer, Integer>() { @Override public Integer call(Integer sum, Integer item) { Log.d(TAG, ">应用函数:" + sum+" ,"+item); return sum + item; } }).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d(TAG, "Next:" + integer); } });
输出:
Next:1
应用函数:1 ,2
Next:3
应用函数:3 ,3
Next:6
应用函数:6 ,4
Next:10
应用函数:10 ,5
Next:15
Next:2
应用函数:2 ,1
Next:3
应用函数:3 ,2
Next:5
应用函数:5 ,3
Next:8
应用函数:8 ,4
Next:12
应用函数:12 ,5
Next:17
Window和Buffer相似,但不是发射来自原始Observable的数据包,它发射的是Observables,这些Observables中的每个都发射原始Observable数据的一个子集,最后发射一个onCompleted通知
和Buffer同样,Window有不少变体,每一种都以本身的方式将原始Observable分解为多个做为结果的Observable,每个都包含一个映射原始数据的window。用Window操做符的术语描述就是,当一个窗口打开(when a window “opens”)意味着一个新的Observable已经发射(产生)了,并且这个Observable开始发射来自原始Observable的数据;当一个窗口关闭(when a window “closes”)意味着发射(产生)的Observable中止发射原始Observable的数据,而且发射终止通知onCompleted给它的观察者们
示例代码:
Observable.just(1, 2, 3, 4, 5, 6, 7) .window(3) //每次发射出一个包含三个整数的子Observable .subscribe(new Action1<Observable<Integer>>() { @Override public void call(Observable<Integer> integerObservable) { //每次发射一个子Observable Log.d(TAG,integerObservable+""); //订阅子Observable integerObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d(TAG,"window:" + integer); } }); } }); Observable.just(1, 2, 3, 4, 5, 6, 7) .window(3, 2) //每次发射出一个包含三个整数的子Observable .subscribe(new Action1<Observable<Integer>>() { @Override public void call(Observable<Integer> integerObservable) { Log.d(TAG,integerObservable+""); //订阅子Observable integerObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.d(TAG,"windowSkip:" + integer); } }); } });
输出:
rx.internal.operators.UnicastSubject@52715e4c
window:1
window:2
window:3
rx.internal.operators.UnicastSubject@527164fc
window:4
window:5
window:6
rx.internal.operators.UnicastSubject@527167f0
window:7
rx.internal.operators.UnicastSubject@52717474
windowSkip:1
windowSkip:2
rx.internal.operators.UnicastSubject@52717980
windowSkip:3
windowSkip:3
windowSkip:4
rx.internal.operators.UnicastSubject@52717cac
windowSkip:5
windowSkip:5
windowSkip:6
rx.internal.operators.UnicastSubject@52717fd8
windowSkip:7
windowSkip:7
以上就是RxJava实现的变换相关的操做符,对于不能理解的童鞋,建议将源码运行后对照分析,这样有助于理解。
有问题请留言,有帮助请点赞(__)