本篇文章已受权微信公众号 YYGeeker
独家发布转载请标明出处java
CSDN学院课程地址react
- RxJava2从入门到精通-初级篇:edu.csdn.net/course/deta…
- RxJava2从入门到精通-中级篇:edu.csdn.net/course/deta…
- RxJava2从入门到精通-进阶篇:edu.csdn.net/course/deta…
- RxJava2从入门到精通-源码分析篇:edu.csdn.net/course/deta…
RxJava操做符也是其精髓之一,能够经过一个简单的操做符,实现复杂的业务逻辑,甚至还能够将操做符组合起来(即RxJava的组合过程),完成更为复杂的业务需求。好比咱们前面用到的.create()
,.subscribeOn()
,.observeOn()
,.subscribe()
都是RxJava的操做符之一,下面咱们将对RxJava的操做符进行分析c++
掌握RxJava操做符前,首先要学会看得懂RxJava的图片,图片是RxJava主导的精髓,下面咱们经过例子说明c#
这张图片咱们先要分清楚概念上的东西,上下两行横向的直线区域表明着事件流,上面一行(上游)是咱们的被观察者Observable
,下面一行(下游)是咱们的观察者Observer
,事件流就是从上游的被观察者发送给下游的观察者的。而中间一行的flatMap区域则是咱们的操做符部分,它能够对咱们的数据进行变换操做。最后,数据流则是图片上的圆形、方形、菱形等区域,也是从上游流向下游的,不一样的形状表明着不一样的数据类型缓存
这张图片并非表示没有被观察者Observable
,而是Create方法自己就是建立了被观察者,因此能够将被观察者的上游省略。在进行事件的onNext()
分发后,执行onComplete()
事件,这样就表示事件流已经结束,后续若是上游继续发事件,则下游表示不接收。当事件流的onCompleted()
或者onError()
正好被调用过一次后,此后就不能再调用观察者的任何其它回调方法bash
在理解RxJava操做符以前,须要将这几个概念弄明白,整个操做符的章节都是围绕这几个概念进行的微信
Observable
,事件流开始的地方和数据流发射的地方Observer
,事件流结束的地方和数据流接收的地方一、create网络
Observable
最原始的建立方式,建立出一个最简单的事件流,可使用发射器发射特定的数据类型数据结构
public static void main(String[] args) {
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
for (int i = 1; i < 5; i++) {
e.onNext(i);
}
e.onComplete();
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("onComplete");
}
});
}
复制代码
输出并发
onNext=1
onNext=2
onNext=3
onNext=4
onComplete
复制代码
二、from
建立一个事件流并发出特定类型的数据流,其发射的数据流类型有以下几个操做符
public static void main(String[] args) {
Observable.fromArray(new Integer[]{1, 2, 3, 4, 5})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
复制代码
输出
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码
三、just
just操做符和from操做符很像,只是方法的参数有所差异,它能够接受多个参数
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 5)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
复制代码
输出
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码
四、defer
defer与just的区别是,just是直接将发射当前的数据流,而defer会等到订阅的时候,才会去执行它的call()回调,再去发射当前的数据流。复杂点的理解就是:defer操做符是将一组数据流在原有的事件流基础上缓存一个新的事件流,直到有人订阅的时候,才会建立它缓存的事件流
public static void main(String[] args) {
i = 10;
Observable<Integer> just = Observable.just(i, i);
Observable<Object> defer = Observable.defer(new Callable<ObservableSource<?>>() {
@Override
public ObservableSource<?> call() throws Exception {
//缓存新的事件流
return Observable.just(i, i);
}
});
i = 15;
just.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
defer.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
System.out.println("onNext=" + (int) o);
}
});
i = 20;
defer.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
System.out.println("onNext=" + (int) o);
}
});
}
复制代码
输出
onNext=10
onNext=10
onNext=15
onNext=15
onNext=20
onNext=20
复制代码
五、interval
interval操做符是按固定的时间间隔发射一个无限递增的整数数据流,因为这段代码的的延时操做都是非阻塞型的,因此在Java上运行会致使JVM的立马中止,只能把这段代码放在Android来运行,interval默认在computation调度器上执行
public void interval() {
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println("onNext=" + aLong);
}
});
}
复制代码
输出
onNext=1
onNext=2
onNext=3
onNext=4
......
复制代码
六、range
range操做符发射一个范围内的有序整数数据流,你能够指定范围的起始和长度
public static void main(String[] args) {
Observable.range(1, 5)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
复制代码
输出
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码
七、repeat
repeat操做符能够重复发送指定次数的某个事件流,repeat操做符默认在trampoline调度器上执行
public static void main(String[] args) {
Observable.just(1).repeat(5)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
复制代码
输出
onNext=1
onNext=1
onNext=1
onNext=1
onNext=1
复制代码
八、timer
timer操做符能够建立一个延时的事件流,因为这段代码的的延时操做都是非阻塞型的,因此在Java上运行会致使JVM的立马中止,只能把这段代码放在Android来运行,默认在computation调度器上执行
public void timer() {
Observable.timer(5, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println("onNext=" + aLong);
}
});
}
复制代码
输出
onNext=0
复制代码
九、小结
补充:interval()、timer()、delay()的区别
一、map
map操做符能够将数据流进行类型转换
public static void main(String[] args) {
Observable.just(1).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "发送过来的数据会被变成字符串" + integer;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("onNext=" + s);
}
});
}
复制代码
输出
onNext=发送过来的数据会被变成字符串1
复制代码
二、flatMap
flatMap操做符将数据流进行类型转换,而后将新的数据流传递给新的事件流进行分发,这里经过模拟请求登陆的延时操做进行说明,因为这段代码的的延时操做都是非阻塞型的,因此在Java上运行会致使JVM的立马中止,只能把这段代码放在Android来运行
public void flatMap() {
Observable.just(new UserParams("hensen", "123456")).flatMap(new Function<UserParams, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(UserParams userParams) throws Exception {
return Observable.just(userParams.username + "登陆成功").delay(2, TimeUnit.SECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
}
public static class UserParams {
public UserParams(String username, String password) {
this.username = username;
this.password = password;
}
public String username;
public String password;
}
复制代码
输出
hensen登陆成功
复制代码
补充:
三、groupBy
groupBy操做符能够将发射出来的数据项进行分组,并将分组后的数据项保存在具备key-value映射的事件流中。groupBy具体的分组规则由groupBy操做符传递进来的函数参数Function
所决定的,它能够将key和value按照Function
的返回值进行分组,返回一个具备分组规则的事件流GroupedObservable
,注意这里分组出来的事件流是按照原始事件流的顺序输出的,咱们能够经过sorted()
对数据项进行排序,而后输出有序的数据流。
public static void main(String[] args) {
Observable.just("java", "c++", "c", "c#", "javaScript", "Android")
.groupBy(new Function<String, Character>() {
@Override
public Character apply(String s) throws Exception {
return s.charAt(0);//按首字母分组
}
})
.subscribe(new Consumer<GroupedObservable<Character, String>>() {
@Override
public void accept(final GroupedObservable<Character, String> characterStringGroupedObservable) throws Exception {
//排序后,直接订阅输出key和value
characterStringGroupedObservable.sorted().subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("onNext= key:" + characterStringGroupedObservable.getKey() + " value:" + s);
}
});
}
});
}
复制代码
输出
onNext= key:A value:Android
onNext= key:c value:c
onNext= key:c value:c#
onNext= key:c value:c++
onNext= key:j value:java
onNext= key:j value:javaScript
复制代码
四、scan
scan操做符会对发射的数据和上一轮发射的数据进行函数处理,并返回的数据供下一轮使用,持续这个过程来产生剩余的数据流。其应用场景有简单的累加计算,判断全部数据的最小值等
public static void main(String[] args) {
Observable.just(8, 2, 13, 1, 15).scan(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer < integer2 ? integer : integer2;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer item) throws Exception {
System.out.println("onNext=" + item);
}
});
}
复制代码
输出
onNext=8
onNext=2
onNext=2
onNext=1
onNext=1
复制代码
五、buffer
buffer操做符能够将发射出来的数据流,在给定的缓存池中进行缓存,当缓存池中的数据项溢满时,则将缓存池的数据项进行输出,重复上述过程,直到将发射出来的数据所有发射出去。若是发射出来的数据不够缓存池的大小,则按照当前发射出来的数量进行输出。若是对buffer操做符设置了skip
参数,则buffer每次缓存池溢满时,会跳过指定的skip
数据项,而后再进行缓存和输出。
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.buffer(5).subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
System.out.println("onNext=" + integers.toString());
}
});
复制代码
输出
onNext=[1, 2, 3, 4, 5]
onNext=[6, 7, 8, 9]
复制代码
六、window
window操做符和buffer操做符在功能上实现的效果是同样的,但window操做符最大区别在于一样是缓存必定数量的数据项,window操做符最终发射出来的是新的事件流integerObservable
,而buffer操做符发射出来的是新的数据流,也就是说,window操做符发射出来新的事件流中的数据项,还能够通过Rxjava其余操做符进行处理。
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
.window(2, 1).subscribe(new Consumer<Observable<Integer>>() {
@Override
public void accept(Observable<Integer> integerObservable) throws Exception {
integerObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
});
}
复制代码
输出
onNext=1
onNext=2
onNext=2
onNext=3
onNext=3
onNext=4
onNext=4
onNext=5
onNext=5
onNext=6
onNext=6
onNext=7
onNext=7
onNext=8
onNext=8
onNext=9
onNext=9
复制代码
七、小结
一、debounce
debounce操做符会去过滤掉发射速率过快的数据项,下面的例子onNext
事件能够想象成按钮的点击事件,若是在2秒种内频繁的点击,则其点击事件会被忽略,当i为3的除数的时候,发射的事件的时间会超过规定忽略事件的时间,那么则容许触发点击事件。这就有点像咱们频繁点击按钮,但始终只会触发一次点击事件,这样就不会致使重复去响应点击事件
public static void main(String[] args) {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 100; i++) {
if (i % 3 == 0) {
Thread.sleep(3000);
} else {
Thread.sleep(1000);
}
emitter.onNext(i);
}
}
}).debounce(2, TimeUnit.SECONDS)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
复制代码
输出
onNext=2
onNext=5
onNext=8
onNext=11
onNext=14
......
复制代码
二、distinct
distinct操做符会过滤重复发送的数据项
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 1, 2, 3).distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
复制代码
输出
onNext=1
onNext=2
onNext=3
onNext=4
复制代码
三、elementAt
elementAt操做符只取指定的角标的事件
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 1, 2, 3).elementAt(0)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
复制代码
输出
onNext=1
复制代码
四、filter
filter操做符能够过滤指定函数的数据项
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 1, 2, 3)
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer > 2;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
复制代码
输出
onNext=3
onNext=4
onNext=3
复制代码
五、first
first操做符只发射第一项数据项
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 1, 2, 3)
.first(7)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
复制代码
输出
onNext=1
复制代码
六、ignoreElements
ignoreElements操做符不发射任何数据,只发射事件流的终止通知
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 1, 2, 3)
.ignoreElements()
.subscribe(new Action() {
@Override
public void run() throws Exception {
System.out.println("onComplete");
}
});
}
复制代码
输出
onComplete
复制代码
七、last
last操做符只发射最后一项数据
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 1, 2, 3)
.last(7)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
复制代码
输出
onNext=3
复制代码
八、sample
sample操做符会在指定的事件内从数据项中采集所须要的数据,因为这段代码的的延时操做都是非阻塞型的,因此在Java上运行会致使JVM的立马中止,只能把这段代码放在Android来运行
public void sample() {
Observable.interval(1, TimeUnit.SECONDS)
.sample(2, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println("onNext=" + aLong);
}
});
}
复制代码
输出
onNext=2
onNext=4
onNext=6
onNext=8
复制代码
九、skip
skip操做符能够忽略事件流发射的前N项数据项,只保留以后的数据
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.skip(3)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer i) throws Exception {
System.out.println("onNext=" + i);
}
});
}
复制代码
输出
onNext=4
onNext=5
onNext=6
onNext=7
onNext=8
复制代码
十、skipLast
skipLast操做符能够抑制事件流发射的后N项数据
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.skipLast(3)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer i) throws Exception {
System.out.println("onNext=" + i);
}
});
}
复制代码
输出
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码
十一、take
take操做符能够在事件流中只发射前面的N项数据
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.take(3)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer i) throws Exception {
System.out.println("onNext=" + i);
}
});
}
复制代码
输出
onNext=1
onNext=2
onNext=3
复制代码
十二、takeLast
takeLast操做符事件流只发射数据流的后N项数据项,忽略前面的数据项
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 5, 6, 7, 8)
.takeLast(3)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer i) throws Exception {
System.out.println("onNext=" + i);
}
});
}
复制代码
输出
onNext=6
onNext=7
onNext=8
复制代码
还有一个操做符叫takeLastBuffer,它和takeLast相似,,惟一的不一样是它把全部的数据项收集到一个List再发射,而不是依次发射一个
1三、小结
一、merge/concat
merge操做符能够合并两个事件流,若是在merge操做符上增长延时发送的操做,那么就会致使其发射的数据项是无序的,会跟着发射的时间点进行合并。虽然是将两个事件流合并成一个事件流进行发射,但在最终的一个事件流中,发射出来的倒是两次数据流。因为concat操做符和merge操做符的效果是同样的,这里只举一例
merge和concat的区别
public static void main(String[] args) {
Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");
Observable<String> just2 = Observable.just("1", "2", "3", "4", "5");
Observable.merge(just1, just2).subscribe(new Consumer<Serializable>() {
@Override
public void accept(Serializable serializable) throws Exception {
System.out.println("onNext=" + serializable.toString());
}
});
}
复制代码
输出
onNext=A
onNext=B
onNext=C
onNext=D
onNext=E
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码
二、zip
zip操做符是将两个数据流进行指定的函数规则合并
public static void main(String[] args) {
Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");
Observable<String> just2 = Observable.just("1", "2", "3", "4", "5");
Observable.zip(just1, just2, new BiFunction<String, String, String>() {
@Override
public String apply(String s, String s2) throws Exception {
return s + s2;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("onNext=" + s);
}
});
}
复制代码
输出
onNext=A1
onNext=B2
onNext=C3
onNext=D4
onNext=E5
复制代码
三、startWith
startWith操做符是将另外一个数据流合并到原数据流的开头
public static void main(String[] args) {
Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");
Observable<String> just2 = Observable.just("1", "2", "3", "4", "5");
just1.startWith(just2).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("onNext=" + s);
}
});
}
复制代码
输出
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
onNext=A
onNext=B
onNext=C
onNext=D
onNext=E
复制代码
四、join
join操做符是有时间期限的合并操做符,因为这段代码的的延时操做都是非阻塞型的,因此在Java上运行会致使JVM的立马中止,只能把这段代码放在Android来运行
public void join() {
Observable<String> just1 = Observable.just("A", "B", "C", "D", "E");
Observable<Long> just2 = Observable.interval(1, TimeUnit.SECONDS);
just1.join(just2, new Function<String, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(String s) throws Exception {
return Observable.timer(3, TimeUnit.SECONDS);
}
}, new Function<Long, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Long l) throws Exception {
return Observable.timer(8, TimeUnit.SECONDS);
}
}, new BiFunction<String, Long, String>() {
@Override
public String apply(String s, Long l) throws Exception {
return s + l;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("onNext=" + s);
}
});
}
复制代码
join操做符有三个函数须要设置
因为just2的期限只有3秒的时间,而just2延时1秒发送一次,因此just2只发射了2次,其输出的结果就只能和just2输出的两次进行合并,其输出格式有点相似咱们的排列组合
onNext=A0
onNext=B0
onNext=C0
onNext=D0
onNext=E0
onNext=A1
onNext=B1
onNext=C1
onNext=D1
onNext=E1
复制代码
五、combineLatest
conbineLatest操做符会寻找其余事件流最近发射的数据流进行合并,因为这段代码的的延时操做都是非阻塞型的,因此在Java上运行会致使JVM的立马中止,只能把这段代码放在Android来运行
public static String[] str = {"A", "B", "C", "D", "E"};
public void combineLatest() {
Observable<String> just1 = Observable.interval(1, TimeUnit.SECONDS).map(new Function<Long, String>() {
@Override
public String apply(Long aLong) throws Exception {
return str[(int) (aLong % 5)];
}
});
Observable<Long> just2 = Observable.interval(1, TimeUnit.SECONDS);
Observable.combineLatest(just1, just2, new BiFunction<String, Long, String>() {
@Override
public String apply(String s, Long l) throws Exception {
return s + l;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("onNext=" + s);
}
});
}
复制代码
输出
onNext=A0
onNext=B0
onNext=B1
onNext=C1
onNext=C2
onNext=D2
onNext=D3
onNext=E3
onNext=E4
onNext=A4
onNext=A5
复制代码
六、小结
一、onErrorReturn
onErrorReturn操做符表示当错误发生时,它会忽略onError的回调且会发射一个新的数据项并回调onCompleted()
public static void main(String[] args) {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
for (int i = 1; i < 5; i++) {
if(i == 4){
e.onError(new Exception("onError crash"));
}
e.onNext(i);
}
}
})
.onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
return -1;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("onError");
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("onComplete");
}
});
}
复制代码
输出
onNext=1
onNext=2
onNext=3
onNext=-1
onComplete
复制代码
二、onErrorResumeNext
onErrorResumeNext操做符表示当错误发生时,它会忽略onError的回调且会发射一个新的事件流并回调onCompleted()
public static void main(String[] args) {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
for (int i = 1; i < 5; i++) {
if(i == 4){
e.onError(new Exception("onError crash"));
}
e.onNext(i);
}
}
})
.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
@Override
public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
return Observable.just(-1);
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("onError");
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("onComplete");
}
});
}
复制代码
输出
onNext=1
onNext=2
onNext=3
onNext=-1
onComplete
复制代码
三、onExceptionResumeNext
onExceptionResumeNext操做符表示当错误发生时,若是onError收到的Throwable不是一个Exception,它会回调onError方法,且不会回调备用的事件流,若是onError收到的Throwable是一个Exception,它会回调备用的事件流进行数据的发射
public static void main(String[] args) {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
for (int i = 1; i < 5; i++) {
if(i == 4){
e.onError(new Exception("onException crash"));
//e.onError(new Error("onError crash"));
}
e.onNext(i);
}
}
})
.onExceptionResumeNext(new ObservableSource<Integer>() {
@Override
public void subscribe(Observer<? super Integer> observer) {
//备用事件流
observer.onNext(8);
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("onError");
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("onComplete");
}
});
}
复制代码
输出
onNext=1
onNext=2
onNext=3
onNext=8
复制代码
四、retry
retry操做符表示当错误发生时,发射器会从新发射
public static void main(String[] args) {
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
for (int i = 1; i < 5; i++) {
if (i == 4) {
e.onError(new Exception("onError crash"));
}
e.onNext(i);
}
}
})
.retry(1)
.onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
return -1;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("onError");
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("onComplete");
}
});
}
复制代码
输出
onNext=1
onNext=2
onNext=3
onNext=1
onNext=2
onNext=3
onNext=-1
onComplete
复制代码
五、retryWhen
retryWhen操做符和retry操做符类似,区别在于retryWhen将错误Throwable传递给了函数进行处理并产生新的事件流进行处理,因为这段代码的的延时操做都是非阻塞型的,因此在Java上运行会致使JVM的立马中止,只能把这段代码放在Android来运行
private static int retryCount = 0;
private static int maxRetries = 2;
public void retryWhen(){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
for (int i = 1; i < 5; i++) {
if (i == 4) {
e.onError(new Exception("onError crash"));
}
e.onNext(i);
}
}
})
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
if (++retryCount <= maxRetries) {
// When this Observable calls onNext, the original Observable will be retried (i.e. re-subscribed).
System.out.println("get error, it will try after " + 1 + " seconds, retry count " + retryCount);
return Observable.timer(1, TimeUnit.SECONDS);
}
return Observable.error(throwable);
}
});
}
})
.onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
return -1;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("onError");
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("onComplete");
}
});
}
复制代码
输出
onNext=1
onNext=2
onNext=3
get error, it will try after 1 seconds, retry count 1
onNext=1
onNext=2
onNext=3
get error, it will try after 1 seconds, retry count 2
onNext=1
onNext=2
onNext=3
onNext=-1
onComplete
复制代码
六、小结
一、delay
delay操做符能够延时某次事件发送的数据流,因为这段代码的的延时操做都是非阻塞型的,因此在Java上运行会致使JVM的立马中止,只能把这段代码放在Android来运行
public void deley() {
Observable.just(1, 2, 3, 4, 5).delay(2, TimeUnit.SECONDS)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
复制代码
输出
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码
delay和delaySubscription的效果是同样的,只不过delay是对数据流的延时,而delaySubscription是对事件流的延时
二、do
do操做符能够监听整个事件流的生命周期,do操做符分为多个类型,并且每一个类型的做用都不一样
public static void main(String[] args) {
Observable.just(1, 2, 3)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("doOnNext");
}
})
.doOnEach(new Consumer<Notification<Integer>>() {
@Override
public void accept(Notification<Integer> integerNotification) throws Exception {
System.out.println("doOnEach");
}
})
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
System.out.println("doOnSubscribe");
}
})
.doOnDispose(new Action() {
@Override
public void run() throws Exception {
System.out.println("doOnDispose");
}
})
.doOnTerminate(new Action() {
@Override
public void run() throws Exception {
System.out.println("doOnTerminate");
}
})
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("doOnError");
}
})
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
System.out.println("doOnComplete");
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
System.out.println("doFinally");
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
复制代码
输出
doOnSubscribe
doOnNext
doOnEach
onNext=1
doOnNext
doOnEach
onNext=2
doOnNext
doOnEach
onNext=3
doOnEach
doOnTerminate
doOnComplete
doFinally
复制代码
三、materialize/dematerialize
materialize操做符将发射出的数据项转换成为一个Notification对象,而dematerialize操做符则是跟materialize操做符相反,这两个操做符有点相似咱们Java对象的装箱和拆箱功能
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 5).materialize()
.subscribe(new Consumer<Notification<Integer>>() {
@Override
public void accept(Notification<Integer> integerNotification) throws Exception {
System.out.println("onNext=" + integerNotification.getValue());
}
});
Observable.just(1, 2, 3, 4, 5).materialize().dematerialize()
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object object) throws Exception {
System.out.println("onNext=" + object.toString());
}
});
}
复制代码
输出
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
onNext=null
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码
输出的时候,materialize会输出多个null,是由于null的事件为onCompleted事件,而dematerialize把onCompleted事件给去掉了,这个缘由也能够从图片中看出来
四、serialize
serialize操做符能够将异步执行的事件流进行同步操做,直到事件流结束
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 5).serialize()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
复制代码
输出
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码
五、timeInterval
timeInterval操做符能够将发射的数据项转换为带有时间间隔的数据项,因为这段代码的的延时操做都是非阻塞型的,因此在Java上运行会致使JVM的立马中止,只能把这段代码放在Android来运行
public void timeInterval(){
Observable.interval(2, TimeUnit.SECONDS).timeInterval(TimeUnit.SECONDS)
.subscribe(new Consumer<Timed<Long>>() {
@Override
public void accept(Timed<Long> longTimed) throws Exception {
System.out.println("onNext=" + longTimed.value() + " timeInterval=" + longTimed.time());
}
});
}
复制代码
输出
onNext=0 timeInterval=2
onNext=1 timeInterval=2
onNext=2 timeInterval=2
onNext=3 timeInterval=2
onNext=4 timeInterval=2
复制代码
六、timeout
timeout操做符表示当发射的数据项超过了规定的限制时间,则发射onError事件,这里直接让程序超过规定的限制时间,因为这段代码的的延时操做都是非阻塞型的,因此在Java上运行会致使JVM的立马中止,只能把这段代码放在Android来运行
public void timeOut(){
Observable.interval(2, TimeUnit.SECONDS).timeout(1, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println("onNext=" + aLong);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("onError");
}
});
}
复制代码
输出
onError
复制代码
七、timestamp
timestamp操做符会给每一个发射的数据项带上时间戳,因为这段代码的的延时操做都是非阻塞型的,因此在Java上运行会致使JVM的立马中止,只能把这段代码放在Android来运行
public void timeStamp() {
Observable.interval(2, TimeUnit.SECONDS).timestamp(TimeUnit.MILLISECONDS)
.subscribe(new Consumer<Timed<Long>>() {
@Override
public void accept(Timed<Long> longTimed) throws Exception {
System.out.println("onNext=" + longTimed.value() + " timeInterval=" + longTimed.time());
}
});
}
复制代码
输出
onNext=0 timeInterval=1525755132132
onNext=1 timeInterval=1525755134168
onNext=2 timeInterval=1525755136132
onNext=3 timeInterval=1525755138132
复制代码
八、using
using操做符可让你的事件流存在一次性的数据项,即用完就将资源释放掉
using操做符接受三个参数:
public static class UserBean {
String name;
int age;
public UserBean(String name, int age) {
this.name = name;
this.age = age;
}
}
public static void main(String[] args) {
Observable.using(new Callable<UserBean>() {
@Override
public UserBean call() throws Exception {
//从网络中获取某个对象
return new UserBean("俊俊俊", 22);
}
}, new Function<UserBean, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(UserBean userBean) throws Exception {
//拿出你想要的资源
return Observable.just(userBean.name);
}
}, new Consumer<UserBean>() {
@Override
public void accept(UserBean userBean) throws Exception {
//释放对象
userBean = null;
}
}).subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
System.out.println("onNext=" + o.toString());
}
});
}
复制代码
输出
onNext=俊俊俊
复制代码
九、to
to操做符能够将数据流中的数据项进行集合的转换,to操做符分为多个类型,并且每一个类型的做用都不一样
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 5).toList()
.subscribe(new Consumer<List<Integer>>() {
@Override
public void accept(List<Integer> integers) throws Exception {
System.out.println("onNext=" + integers.toString());
}
});
}
复制代码
输出
onNext=[1, 2, 3, 4, 5]
复制代码
十、小结
一、all
all操做符表示对全部数据项进行校验,若是全部都经过则返回true,不然返回false
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 5)
.all(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer > 0;
}
})
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
System.out.println("onNext=" + aBoolean);
}
});
}
复制代码
输出
onNext=true
复制代码
二、contains
contains操做符表示事件流中发射的数据项当中是否包含有指定的数据项
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 5)
.contains(2)
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
System.out.println("onNext=" + aBoolean);
}
});
}
复制代码
输出
onNext=true
复制代码
三、amb
amb操做符在多个事件流中只发射最早发出数据的事件流,因为这段代码的的延时操做都是非阻塞型的,因此在Java上运行会致使JVM的立马中止,只能把这段代码放在Android来运行
public void amb(){
List<Observable<Integer>> list = new ArrayList<>();
list.add(Observable.just(1, 2, 3).delay(3, TimeUnit.SECONDS));
list.add(Observable.just(4, 5, 6).delay(2, TimeUnit.SECONDS));
list.add(Observable.just(7, 8, 9).delay(1, TimeUnit.SECONDS));
Observable.amb(list)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
复制代码
输出
onNext=7
onNext=8
onNext=9
复制代码
四、defaultIfEmpty
defaultIfEmpty操做符会在事件流没有发射任何数据时,发射一个指定的默认值
public static void main(String[] args) {
Observable.empty()
.defaultIfEmpty(-1)
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
System.out.println("onNext=" + o.toString());
}
});
}
复制代码
输出
onNext=-1
复制代码
五、sequenceEqual
sequenceEqual操做符能够判断两个数据流是否彻底相等
public static void main(String[] args) {
Observable<Integer> just1 = Observable.just(1, 2, 3);
Observable<Integer> just2 = Observable.just(1, 2, 3);
Observable.sequenceEqual(just1, just2)
.subscribe(new Consumer<Boolean>() {
@Override
public void accept(Boolean aBoolean) throws Exception {
System.out.println("onNext=" + aBoolean);
}
});
}
复制代码
输出
onNext=true
复制代码
六、skipUntil/skipWhile
skipUtils操做符是在两个事件流发射的时候,第一个事件流会等到第二个事件流开始发射的时候,第一个事件流才开始发射出数据项,它会忽略以前发射过的数据项,因为这段代码的的延时操做都是非阻塞型的,因此在Java上运行会致使JVM的立马中止,只能把这段代码放在Android来运行
public void skipUntil(){
Observable<Long> just1 = Observable.interval(1, TimeUnit.SECONDS);
Observable<Integer> just2 = Observable.just(8).delay(3, TimeUnit.SECONDS);
just1.skipUntil(just2)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println("onNext=" + aLong);
}
});
}
复制代码
输出
onNext=2
onNext=3
onNext=4
onNext=5
......
复制代码
skipWhile操做符是在一个事件流中,从第一项数据项开始判断是否符合某个特定条件,若是判断值返回true,则不发射该数据项,继续从下一个数据项执行一样的判断,直到某个数据项的判断值返回false时,则终止判断,发射剩余的全部数据项。须要注意的是,这里只要一次判断为false则后面的全部数据项都不判断
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 5)
.skipWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 3;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
复制代码
输出
onNext=3
onNext=4
onNext=5
复制代码
七、takeUntil/takeWhile
takeUntil操做符跟skipUntil相似,skip表示跳过的意思,而take表示取值的意思,takeUntil操做符是在两个事件流发射的时候,第一个事件流会等到第二个事件流开始发射的时候,第一个事件流中止发射数据项,它会忽略以后的数据项,因为这段代码的的延时操做都是非阻塞型的,因此在Java上运行会致使JVM的立马中止,只能把这段代码放在Android来运行
public void takeUntil(){
Observable<Long> just1 = Observable.interval(1, TimeUnit.SECONDS);
Observable<Integer> just2 = Observable.just(8).delay(3, TimeUnit.SECONDS);
just1.takeUntil(just2)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println("onNext=" + aLong);
}
});
}
复制代码
输出
onNext=0
onNext=1
复制代码
takeWhile操做符是在一个事件流中,从第一项数据项开始判断是否符合某个特定条件,若是判断值返回true,则发射该数据项,继续从下一个数据项执行一样的判断,直到某个数据项的判断值返回false时,则终止判断,且剩余的全部数据项不会发射。须要注意的是,这里只要一次判断为false则后面的全部数据项都不判断
public static void main(String[] args) {
Observable.just(1, 2, 3, 4, 0)
.takeWhile(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 3;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
复制代码
输出
onNext=1
onNext=2
复制代码
八、小结
数学运算操做符比较简单,对于数学运算操做符会放在小结中介绍,下面是对聚合操做符作介绍
一、reduce
reduce操做符跟scan操做符是同样的,会对发射的数据和上一轮发射的数据进行函数处理,并返回的数据供下一轮使用,持续这个过程来产生剩余的数据流。reduce与scan的惟一区别在于reduce只输出最后的结果,而scan会输出每一次的结果,这点从图片中也能看出来
public static void main(String[] args) {
Observable.just(8, 2, 13, 1, 15).reduce(new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) throws Exception {
return integer < integer2 ? integer : integer2;
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer item) throws Exception {
System.out.println("onNext=" + item);
}
});
}
复制代码
输出
onNext=1
复制代码
二、collect
collect操做符跟reduce操做符相似,只不过collect增长了一个可改变数据结构的函数供咱们处理
public static void main(String[] args) {
Observable.just(8, 2, 13, 1, 15).collect(new Callable<String>() {
@Override
public String call() throws Exception {
return "A";
}
}, new BiConsumer<String, Integer>() {
@Override
public void accept(String s, Integer integer) throws Exception {
System.out.println("onNext=" + s + " " + integer);
}
}).subscribe(new BiConsumer<String, Throwable>() {
@Override
public void accept(String s, Throwable throwable) throws Exception {
System.out.println("onNext2=" + s);
}
});
}
复制代码
输出
onNext=A 8
onNext=A 2
onNext=A 13
onNext=A 1
onNext=A 15
onNext2=A
复制代码
三、小结
数学运算操做符的使用须要在gradle中添加rxjava-math的依赖
implementation 'io.reactivex:rxjava-math:1.0.0'
复制代码
一、publish
publish操做符是将普通的事件流转化成可链接的事件流ConnectableObservable
,它与普通的事件流不同,ConnectableObservable
在没有调用connect()进行链接的状况下,事件流是不会发射数据的
public static void main(String[] args) {
ConnectableObservable<Integer> connectableObservable = Observable.just(1, 2, 3, 4, 5).publish();
connectableObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
复制代码
输出
无
复制代码
二、connect
connect操做符是将可链接的事件流进行链接并开始发射数据。这个方法须要注意的是,connect操做符必须在全部事件流被订阅后才开始发射数据。若是放在subscribe
以前的话,则订阅者是没法收到数据的。若是后面还有订阅者将订阅这次事件流,则会丢失已经调用了connect
后,发射出去的数据项
public static void main(String[] args) {
ConnectableObservable<Integer> connectableObservable = Observable.just(1, 2, 3, 4, 5).publish();
connectableObservable.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
connectableObservable.connect();
}
复制代码
输出
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码
三、refCount
refCount操做符能够将可链接的事件流转换成普通的事件流
public static void main(String[] args) {
ConnectableObservable<Integer> connectableObservable = Observable.just(1, 2, 3, 4, 5).publish();
connectableObservable.refCount().subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("onNext=" + integer);
}
});
}
复制代码
输出
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
复制代码
四、replay
replay操做符将弥补connect
操做符的缺陷,因为connect会让后面进行订阅的订阅者丢失以前发射出去的数据项,因此使用replay操做符能够将发射出去的数据项进行缓存,这样使得后面的订阅者均可以得到完整的数据项。这里须要注意的是,replay操做符不能和publish操做符同时使用,不然将不会发射数据。例子中,读者能够将replay操做符换成publish操做符,这时候的输出就会丢失前2秒发射的数据项
public void replay(){
ConnectableObservable<Long> connectableObservable = Observable.interval(1, TimeUnit.SECONDS).replay();
connectableObservable.connect();
connectableObservable.delaySubscription(3, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println("onNext=" + aLong);
}
});
}
复制代码
输出
onNext=0
onNext=1
onNext=2
onNext=3
onNext=4
onNext=5
......
复制代码
五、小结