原文首发于微信公众号:躬行之(jzman-blog),欢迎关注交流!java
buffer 操做符重载方法比较多,这里选取典型的几个来讲明 buffer 操做符的使用,buffer 操做符的使用能够分为以下三类,具体以下:缓存
//第一类
public final Observable<List<T>> buffer(int count)
public final Observable<List<T>> buffer(int count, int skip)
//第二类
public final Observable<List<T>> buffer(long timespan, TimeUnit unit)
public final Observable<List<T>> buffer(long timespan, long timeskip, TimeUnit unit)
//第三类
public final <B> Observable<List<T>> buffer(ObservableSource<B> boundary)
public final <TOpening, TClosing> Observable<List<T>> buffer(
ObservableSource<? extends TOpening> openingIndicator,
Function<? super TOpening, ? extends ObservableSource<? extends TClosing>> closingIndicator)
复制代码
buffer 操做符将一个 Observable 转换为一个 Observable,这个 Observable 用于收集原来发送的数据,而后发送这些缓存的数据集合,buffer 将发送的单个事件转换成元素集合,下面是针对此种状况的官方示意图:bash
以下面的事件的发送过程,若是不设置 buffer 则须要发送四次,若是使用以下 buffer 进行转换,则只需发送两次,测试代码以下:微信
count = 0;
Observable.just("Event1", "Event2", "Event3", "Event4")
.buffer(2)
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) throws Exception {
count++;
Log.i(TAG, "第" + count + "次接收...");
Log.i(TAG, "accept--->" + strings.size());
Log.i(TAG, "接收的数据...");
for (String str : strings) {
Log.i(TAG, "accept--->" + strings.size() + "---" + str);
}
}
});
复制代码
上述代码的执行结果以下:并发
第1次接收...
accept--->2
接收的数据...
accept--->2---Event1
accept--->2---Event2
第2次接收...
accept--->2
接收的数据...
accept--->2---Event3
accept--->2---Event4
复制代码
相较 buffer(int count), skip 能够指定下一次由源 Observable 转换的 Observable 收集事件的位置,若是 count 等于 skip,则 buffer(int count,int skip) 等价于 buffer(int count),官方示意图以下:app
以下面的事件发送过程,至关于每 3 个事件一组进行发送,但每次收集数据的位置参数 skip 为 2,则每次收集的数据中会有数据重复,测试代码以下:ide
count = 0;
Observable.just("Event1", "Event2", "Event3", "Event4", "Event5")
.buffer(3, 2)
.subscribe(new Consumer<List<String>>() {
@Override
public void accept(List<String> strings) throws Exception {
count++;
Log.i(TAG, "第" + count + "次接收...");
Log.i(TAG, "accept--->" + strings.size());
Log.i(TAG, "接收的数据...");
for (String str : strings) {
Log.i(TAG, "accept--->" + strings.size() + "---" + str);
}
}
});
复制代码
上述代码的执行结果以下:学习
第1次接收...
accept--->3
接收的数据...
accept--->3---Event1
accept--->3---Event2
accept--->3---Event3
第2次接收...
accept--->3
接收的数据...
accept--->3---Event3
accept--->3---Event4
accept--->3---Event5
第3次接收...
accept--->1
接收的数据...
accept--->1---Event5
复制代码
buffer 操做符会将一个 Observable 转换为一个新的 Observable,timespan 决定新的的 Observsable 在发出缓存的数据的时间间隔,官方示意图以下:测试
以下面的事件发送过程,源 Observable 每隔 2 秒发送事件,而 buffer 新生成的 Obsrevable 则以每隔 1 秒的间隔发送缓存的事件集合,固然,这样会在间隔的时间段收集不到数据致使丢失数据,测试代码以下:this
Observable.intervalRange(1,8,0,2, TimeUnit.SECONDS)
.buffer(1,TimeUnit.SECONDS)
.subscribe(new Consumer<List<Long>>() {
@Override
public void accept(List<Long> longs) throws Exception {
Log.i(TAG, "accept--->" + String.valueOf(longs));
}
});
复制代码
上述代码的执行结果以下:
accept--->[1]
accept--->[]
accept--->[2]
accept--->[]
accept--->[3]
accept--->[]
accept--->[4]
accept--->[]
accept--->[5]
复制代码
buffer 操做符会将一个 Observable 转换为一个 Observable,timeskip 决定让新生成的 Observable 按期启动一个新的缓冲区,而后新的 Observable 会发出在 timespan 时间间隔内收集的事件集合,官方示意图以下:
以下面的事件发送过程,源 Observable 会每隔 1 秒发送 1 到 12 的整数,buffer 新生成的 Observable 会每隔 5 秒接收源 Observable 发送的事件,测试代码以下:
Observable.intervalRange(1,12,0,1, TimeUnit.SECONDS)
.buffer(1,5, TimeUnit.SECONDS)
.subscribe(new Consumer<List<Long>>() {
@Override
public void accept(List<Long> longs) throws Exception {
Log.i(TAG, "accept--->" + String.valueOf(longs));
}
});
复制代码
上述代码的执行结果以下:
accept--->[1]
accept--->[6]
accept--->[11]
复制代码
buffer(boundary) 会监视一个名叫 boundary 的 Observable,每当这个 Observable 发射了一个事件,它就建立一个新的 List 开始收集来自原始 Observable 的发送的事件并发送收集到的数据,官方示意图以下:
以下面事件的发送过程,收集到的原事件会由于时间间隔的不一样最终发送的收集到的事件的个数也不一样,测试代码以下:
Observable.intervalRange(1,10,0,2, TimeUnit.SECONDS)
.buffer(Observable.interval(3, TimeUnit.SECONDS))
.subscribe(new Consumer<List<Long>>() {
@Override
public void accept(List<Long> longs) throws Exception {
Log.i(TAG, "accept--->" + String.valueOf(longs));
}
});
复制代码
上述代码的执行结果以下:
accept--->[1, 2]
accept--->[3]
accept--->[4, 5]
accept--->[6]
accept--->[7, 8]
accept--->[9]
accept--->[10]
复制代码
buffer(openingIndicator, closingIndicator)会监视一个名叫 openingIndicator 的 Observable,这个 Observable 每发射一个事件,它就建立一个 List 收集原始 Observable 发送的数据,并将收集的数据给 closingIndicator,closingIndicator 会返回一个 Observable,这个 buffer 会监视 closingIndicator 返回的Observable,检测到这个 Observable 的数据时,就会关闭这个 List 发射刚才从 openingIndicator 得到数据,也就是名为 openingIndicator 的 Observable 收集的数据,下面是针对此种状况的官方示意图:
以下面时间发送过程,原始的 Observable 以每一个 1 秒的间隔发送 1 到 12 之间的整数,名为 openingIndicator 的 Observable 会每隔 3 秒建立一个 List 手机发送的事件,而后将收集的数据给 closingIndicator,closingIndicator 会延时 1 秒发送从名为 openingIndicator 的 Observable 拿到的数据,下面是测试代码:
Observable openingIndicator = Observable.interval(3, TimeUnit.SECONDS);
Observable closingIndicator = Observable.timer(1,TimeUnit.SECONDS);
Observable.intervalRange(1,12,0,1, TimeUnit.SECONDS)
.buffer(openingIndicator, new Function<Object, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Object o) throws Exception {
return closingIndicator;
}
})
.subscribe(new Consumer<List<Long>>() {
@Override
public void accept(List<Long> longs) throws Exception {
Log.i(TAG, "accept--->" + String.valueOf(longs));
}
});
复制代码
上述代码的执行结果以下:
accept--->[4, 5]
accept--->[7]
accept--->[10]
复制代码
这里就以 window(long count) 为例来介绍 window 操做符的使用,window 操做符的使用和 buffer 使用相似,不一样之处是经 buffer 转换成的 Observable 发送的时源 Observable 发送事件的事件集合,而经 window 操做符转换成的 Observable 会依次发送 count 个源 Observable 发送的事件,该操做符官方示意图以下:
测试代码以下:
Observable.just("Event1", "Event2", "Event3", "Event4")
.window(2)
.subscribe(new Consumer<Observable<String>>() {
@Override
public void accept(Observable<String> stringObservable) throws Exception {
Log.i(TAG, "accept--Observable->");
stringObservable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, "accept--->" + s);
}
});
}
});
复制代码
上述代码的执行结果以下:
accept--Observable->
accept--->Event1
accept--->Event2
accept--Observable->
accept--->Event3
accept--->Event4
复制代码
map 操做符可对发送的数据进行相应的类型转化,map 操做的官方示意图以下:
以下面的事件发送过程,通过 map 操做符转换,可对源 Observable 发送的事件进行进一步的加工和转换,测试代码以下:
Observable.just("Event1", "Event2", "Event3", "Event4")
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return "this is " + s;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, "accept--->" + s);
}
});
复制代码
上述代码的执行结果以下:
accept--->this is Event1
accept--->this is Event2
accept--->this is Event3
accept--->this is Event4
复制代码
flatMap 操做符使用时,当源 Observable 发出事件会相应的转换为能够发送多个事件的 Observable,这些 Observable 最终汇入同一个 Observable,而后这个 Observable 将这些事件统一发送出去,这里决定再也不想上文中同样,每一个重载方法都进行说明,这里已经常使用的 flatMap(mapper) 为例,其官方示意图以下:
以下面的事件发送过程,使用了 flatMap 操做符以后,源 Observable 发送事件时,相应的生成对应的 Observable,最终发送的事件都汇入同一个 Observable,而后将事件结果回调给观察者,测试代码以下:
final Observable observable = Observable.just("Event5", "Event6");
Observable.just("Event1", "Event2", "Event3", "Event4")
.flatMap(new Function<String, Observable<String>>() {
@Override
public Observable<String> apply(String s) throws Exception {
return observable;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i(TAG, "accept--->" + s);
}
});
复制代码
上述代码的执行结果以下:
accept--->Event5
accept--->Event6
accept--->Event5
accept--->Event6
accept--->Event5
accept--->Event6
accept--->Event5
accept--->Event6
复制代码
concatMap 的使用与 flatMap 的使用大体相似,相较flatMap可以保证事件接收的顺序,而 flatMap 不能保证事件接收的顺序,concatMap 操做符的官方示意图以下:
以下面的事件发送过程,咱们在源 Observable 发送整数 1 时延时 3 秒,而后继续发送其余事件,下面是测试代码:
Observable.intervalRange(1, 2, 0, 1, TimeUnit.SECONDS)
.concatMap(new Function<Long, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Long aLong) throws Exception {
int delay = 0;
if (aLong == 1) {
delay = 3;
}
return Observable.intervalRange(4, 4, delay, 1, TimeUnit.SECONDS);
}
}).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.i(TAG, "accept--->" + aLong);
}
});
复制代码
使用 concatMap 操做符上述代码的执行结果以下:
accept--->4
accept--->5
accept--->6
accept--->7
accept--->4
accept--->5
accept--->6
accept--->7
复制代码
使用 flatMap 操做符上述代码的执行结果以下:
accept--->4
accept--->5
accept--->6
accept--->4
accept--->7
accept--->5
accept--->6
accept--->7
复制代码
可见,concatMap 相较 flatMap 可以保证事件接收的顺序。
当源 Observable 发送事件时会相应的转换为能够发送多个事件的 Observable,switchMap 操做符只关心当前这个 Observable,也就是说,源 Observable 每当发送一个新的事件时,就会丢弃前面一个发送多个事件的 Observable,官方示意图以下:
以下面的事件发送过程,源 Observable 每一个 2 秒发送 1 和 2,转换成的能够发送多个事件的 Observable 每一个 1 秒发送从 4 开始的整数,使用 switchMap 操做符时,源 Observable 发送一个整数 1 时,这个新的能够发送多个事件的 Observable 只发送两个整数,也就是 4 和 5 以后就中止发送了,由于此时源 Observable 又开始发送事件了,此时会丢弃前一个可发送多个时间的 Observable,开始下一次源 Observable 发送事件的监听,测试代码以下:
Observable.intervalRange(1, 2, 0, 2, TimeUnit.SECONDS)
.switchMap(new Function<Long, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Long aLong) throws Exception {
Log.i(TAG, "accept-aLong-->" + aLong);
return Observable.intervalRange(4, 4, 0, 1, TimeUnit.SECONDS);
}
}).subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.i(TAG, "accept--->" + aLong);
}
});
复制代码
上述代码执行结果以下:
accept-aLong-->1
accept--->4
accept--->5
accept-aLong-->2
accept--->4
accept--->5
accept--->6
accept--->7
复制代码
此外,还有与之相关的操做符:concatMapDelayError、concatMapEager、concatMapEagerDelayError concatMapIterable、flatMapIterable 、switchMapDelayError,都是上述操做符的扩展,这里就不在介绍了。
groupBy 操做符会对接收的数据按照指定的规则进行分类,而后再被 GroupedObservable 等订阅输出,官方示意图以下:
以下面的事件发送过程,咱们会按照成绩进行分组输出,具体以下:
List<DataBean> beanList = new ArrayList<>();
beanList.add(new DataBean("成绩是95分", 95));
beanList.add(new DataBean("成绩是70分", 70));
beanList.add(new DataBean("成绩是56分", 56));
beanList.add(new DataBean("成绩是69分", 69));
beanList.add(new DataBean("成绩是90分", 90));
beanList.add(new DataBean("成绩是46分", 46));
beanList.add(new DataBean("成绩是85分", 85));
Observable.fromIterable(beanList)
.groupBy(new Function<DataBean, String>() {
@Override
public String apply(DataBean dataBean) throws Exception {
int score = dataBean.getScore();
if (score >= 80) {
return "A";
}
if (score >= 60 && score < 80) {
return "B";
}
if (score < 60) {
return "C";
}
return null;
}
})
.subscribe(new Consumer<GroupedObservable<String, DataBean>>() {
@Override
public void accept(final GroupedObservable<String, DataBean> groupedObservable) throws Exception {
groupedObservable.subscribe(new Consumer<DataBean>() {
@Override
public void accept(DataBean dataBean) throws Exception {
Log.i(TAG, "accept--->"+ groupedObservable.getKey() + "组--->"+dataBean.getDesc());
}
});
}
});
复制代码
上述代码的执行结果以下:
accept--->A组--->成绩是95分
accept--->B组--->成绩是70分
accept--->C组--->成绩是56分
accept--->B组--->成绩是69分
accept--->A组--->成绩是90分
accept--->C组--->成绩是46分
accept--->A组--->成绩是85分
复制代码
cast 操做符用于类型转化,cast 操做符官方示意图以下:
测试代码以下:
Observable.just(1,2,3,4,5)
.cast(String.class)
.subscribe(new Consumer<String>() {
@Override
public void accept(String String) throws Exception {
Log.i(TAG, "accept--->" + String);
}
});
复制代码
测试会出现以下异常:
java.lang.ClassCastException: Cannot cast java.lang.Integer to java.lang.String
复制代码
从结果可知,发现不一样类型之间转化会出现类型转化异常,cast 操做符并不能进行不一样类型之间的转化,可是可使用 cast 操做来校验发送的事件数据类型是否是指定的类型。
scan 操做符会依次扫描每两个元素,第一个元素没有上一个元素,则第一个元素的上一个元素会被忽略,当扫描第二个元素时,会获取到第一个元素,以后 apply 方法的返回值会做为上一个元素的值参与计算,最终返回转化后的结果,scan 官方示意图以下:
看一下下面的事件发送过程,第一次扫描时,第一个元素是 1,这里至关于 last,第二个元素是 2 ,这里至关于 item,此时 apply 方法返回的结果是 2,这个 2 会做为 last 的值参与下一次扫描计算,则下一次返回的值确定是 2 * 3,也就是 6,测试代码以下:
Observable.just(1, 2, 3, 4, 5)
.scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer last, Integer item) throws Exception {
Log.i(TAG, "accept--last->" + last);
Log.i(TAG, "accept--item->" + item);
return last * item;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.i(TAG, "accept--->" + integer);
}
});
复制代码
上述代码的执行结果以下:
accept--->1
accept--last->1
accept--item->2
accept--->2
accept--last->2
accept--item->3
accept--->6
accept--last->6
accept--item->4
accept--->24
accept--last->24
accept--item->5
accept--->120
复制代码
toList 操做符会将发送的一系列数据转换成 List,而后一次性发送出去,toList 的官方示意图以下:
测试代码以下:
Observable.just(1, 2, 3, 4)
.toList()
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
Log.i(TAG, "accept--->" + integers);
}
});
复制代码
上述代码的执行结果以下:
accept--->[1, 2, 3, 4]
复制代码
toMap操做符会将要发送的事件按照指定的规则转化为 Map 形式,而后一次性发送出去,toMap 操做符官方示意图以下:
测试代码以下:
Observable.just(1, 2, 3, 4)
.toMap(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "key"+integer;
}
})
.subscribe(new Consumer<Map<String, Integer>>() {
@Override
public void accept(Map<String, Integer> map) throws Exception {
Log.i(TAG, "accept--->" + map);
}
});
复制代码
上述代码的执行结果以下:
accept--->{key2=2, key4=4, key1=1, key3=3}
复制代码
我的微信公众号:jzman-blog 能够一块儿交流学习!