博客主页java
RxJava 的合并操做符主要包括以下几个:segmentfault
RxJava 的链接操做符,主要是 ConnectableObservable 所使用的操做符和 Observable 所使用
的操做符:数组
合并多个 Observable 的发射物
merge 操做符能够将多个 Observable 的输出合井,使得它们就像是单个的 Observable 同样。缓存
Observable.merge( Observable.just(1, 3, 5, 7, 9), Observable.just(2, 4, 6) ).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next: " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next: 1 Next: 3 Next: 5 Next: 7 Next: 9 Next: 2 Next: 4 Next: 6 Complete.
merge 是按照时间线并行的。若是传递给 merge 的任何一个 Observable 发射了 onError 通知终止,则 merge 操做符生成的 Observable 也会当即以 onError 通知终止。若是想让它继续发射数据,直到最后才报告错误,则可使用 mergeDelayError 操做符.
merge 操做符最多只能合并 4 个被观察者,若是须要合并更多个被观察者,则可使用 mergeArray 操做符.并发
经过一个函数将多个 Observable 的发射物结合到一块儿,基于这个函数的结果为每一个结合体发射单个数据项app
zip 操做符返回一个 Obversable ,它使用这个函数按顺序结合两个或多个 Observable 发射的数据项,而后发射这个函数返回的结果。它按照严格的顺序应用这个函数,只发射与发射数据项最少的那个 Observable 同样多的数据。ide
zip 的最后一个参数接收每一个 Observable 发射的一项数据,返回被压缩后的数据,它能够接收 1~9 个参数:一个 Observable 序列 或者一些发射 Observable 的 Observable函数
Observable.zip( Observable.just(1, 3, 5, 7), Observable.just(2, 4, 6), new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer integer, Integer integer2) throws Exception { return integer + integer2; } } ).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next: " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 Next: 3 Next: 7 Next: 11 Complete.
zip 操做符相对于 merge 操做符,除发射数据外,还会进行合并操做,并且 zip 发射的数据与数据项最少的 Observable 有关。spa
这里 BiFunction 至关于一个合并函数,并不必定要返回 Integer 类型,能够根据业务须要返回合适的类型。 BiFunction 的源码以下:线程
public interface BiFunction<T1, T2, R> { /** * Calculate a value based on the input values. * @param t1 the first value * @param t2 the second value * @return the result value * @throws Exception on error */ @NonNull R apply(@NonNull T1 t1, @NonNull T2 t2) throws Exception; }
RxJava 2.x FuncN 遵循 Java 8 的命名规范。相对于 RxJava l.x, Func 更名成 Function, Func2 更名成 BiFunction, Func3~ Func9 更名成 Function3~ Function9,FuncN 由 Function 取代
combineLatest 操做符的行为相似于 zip,可是只有当原始的 Observable 中的每个都发射了一条数据时 zip 才发射数据,而 combineLatest 是当原始的 Observable 中任意一个发射了数据时就发射一条数据。当原始 Observable 的任何一个发射了一条数据时, combineLatest 使用一个函数结合它们最近发射的数据,而后发射这个函数的返回值。
Observable.combineLatest( Observable.just(1, 3, 5), Observable.just(2, 4, 6), new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer integer, Integer integer2) throws Exception { Log.d(TAG, "integer: " + integer + " ## integer2: " + integer2); return integer + integer2; } } ).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Log.d(TAG, "Next: " + integer); } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { Log.d(TAG, "Error: " + throwable); } }, new Action() { @Override public void run() throws Exception { Log.d(TAG, "Complete."); } }); // 执行结果 integer: 5 ## integer2: 2 Next: 7 integer: 5 ## integer2: 4 Next: 9 integer: 5 ## integer2: 6 Next: 11 Complete.
join 操做符结合两个 Observable 发射的数据,基于时间窗口(针对每条数据特定的原则)选择待集合的数据项。将这些时间窗口实现为一些 Observable ,它们的生命周期从任何一条 Observable 发射的每一条数据开始。当这个定义时间窗口的 Observable 发射了一条数据或者完成时,与这条数据关联的窗口也会关闭。只要这条数据的窗口是打开的,它就继续结合其余 Observable 发射的任何数据项。
Observable<Integer> o1 = Observable.just(1, 2, 3); Observable<Integer> o2 = Observable.just(4, 5, 6); o1.join(o2, new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { return Observable.just(String.valueOf(integer)).delay(200, TimeUnit.MILLISECONDS); } }, new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { return Observable.just(String.valueOf(integer)).delay(200, TimeUnit.MILLISECONDS); } }, new BiFunction<Integer, Integer, String>() { @Override public String apply(Integer integer, Integer integer2) throws Exception { return integer + ":" + integer2; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next: " + s); } }); // 执行结果 Next: 1:4 Next: 2:4 Next: 3:4 Next: 1:5 Next: 2:5 Next: 3:5 Next: 1:6 Next: 2:6 Next: 3:6
join(Observable, Function, Function, BiFunction) 有四个参数:
的生命周期决定了源 Observable 发射数据的有效期。
的生命周期决定了目标 bservable 发射数据的有效期。
合后返回。
join 操做符的效果相似于排列组合,把第一个数据源 A 做为基座窗口,它根据本身的节奏不断发射数据元素;第二个数据源 B,每发射一个数据,咱们都把它和第一个数据源 A 中己经发射的数据进行一对一匹配。举例来讲,若是某一时刻 B 发射了一个数据 "B",此时 A 己经发射了 a,b,c,d 共4个数据,那么合并操做就是把 “B” 依次与 a,b,c,d 配对,获得4组数据:[a, B], [b, B], [c, B], [d, B]
Observable<Integer> o1 = Observable.just(1, 2, 3).delay(200, TimeUnit.MILLISECONDS); Observable<Integer> o2 = Observable.just(4, 5, 6); o1.join(o2, new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { return Observable.just(String.valueOf(integer)).delay(200, TimeUnit.MILLISECONDS); } }, new Function<Integer, ObservableSource<String>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { return Observable.just(String.valueOf(integer)).delay(200, TimeUnit.MILLISECONDS); } }, new BiFunction<Integer, Integer, String>() { @Override public String apply(Integer integer, Integer integer2) throws Exception { return integer + ":" + integer2; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next: " + s); } }); // 执行结果 Next: 1:4 Next: 1:5 Next: 1:6 Next: 2:4 Next: 2:5 Next: 2:6 Next: 3:4 Next: 3:5 Next: 3:6
在数据序列的开头插入一条指定的项
若是想让 Observable 在发射数据以前先发射一个指定的数据序列,则可使用 startWith 操做符。若是想在一个 Observable 发射数据的末尾追加一个数据序列 ,则可使用 concat 操做符。
Observable.just("hello, java", "hello, kotlin") .startWith("hello, rxjava") .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next: " + s); } }); // 执行结果 Next: hello, rxjava Next: hello, java Next: hello, kotlin
startWith 操做符支持传递 Iterable, 同时还有一个 startWithArray 的操做符
Observable.just("hello, java", "hello, kotlin") .startWithArray("hello, rxjava", "hello, flutter") .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next: " + s); } }); // 执行结果 Next: hello, rxjava Next: hello, flutter Next: hello, java Next: hello, kotlin
使用了 startWithArray 操做符以后,能够再使用 startWith 操做符。
Observable.just("hello, java", "hello, kotlin") .startWithArray("hello, rxjava", "hello, flutter") .startWith("hello, groovy") .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next: " + s); } }); // 执行结果 Next: hello, groovy Next: hello, rxjava Next: hello, flutter Next: hello, java Next: hello, kotlin
startWith 还能够传递一个 Observable ,它会将那个 Observable 的发射物插在原始 Observable 发射的数据序列以前,而后把这个看成本身的发射物集合。
Observable.just("hello, java", "hello, kotlin") .startWithArray("hello, rxjava", "hello, flutter") .startWith(Observable.just("hello, groovy")) .subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { Log.d(TAG, "Next: " + s); } }); // 执行结果 Next: hello, groovy Next: hello, rxjava Next: hello, flutter Next: hello, java Next: hello, kotlin
connect 和 refCount 是 ConnectableObservable 所使用的操做符。
ConnectableObservable 继承自 Observable ,然而它并非在调用 subscribe() 的时候发射数据,而是只有对其使用 connect 操做符时它才会发射数据,因此能够用来更灵活地控制数据发射的时机。另外, ConnectableObservable 是 Hot Observable
push 操做符是将普通的 Observable 转换成 ConnectableObservable
connect 操做符是用来触发 ConnectableObservable 发射数据的 。咱们能够等全部的观察者都订阅了 ConnectableObservable 以后再发射数据。
final SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss"); Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS).take(6); ConnectableObservable<Long> connectableObservable = observable.publish(); connectableObservable.subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long aLong) { Log.d(TAG, "Next#1: " + aLong + "->time:" + format.format(new Date())); } @Override public void onError(Throwable e) { Log.d(TAG, "Error: " + e); } @Override public void onComplete() { Log.d(TAG, "Complete#1."); } }); connectableObservable.delaySubscription(3, TimeUnit.SECONDS) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long aLong) { Log.d(TAG, "Next#2: " + aLong + "->time:" + format.format(new Date())); } @Override public void onError(Throwable e) { Log.d(TAG, "Error: " + e); } @Override public void onComplete() { Log.d(TAG, "Complete#2."); } }); connectableObservable.connect(); // 执行结果 Next#1: 0->time:09:27:44 Next#1: 1->time:09:27:45 Next#1: 2->time:09:27:46 Next#1: 3->time:09:27:47 Next#2: 3->time:09:27:47 Next#1: 4->time:09:27:48 Next#2: 4->time:09:27:48 Next#1: 5->time:09:27:49 Next#2: 5->time:09:27:49 Complete#1. Complete#2.
refCount 操做符是将 ConnectableObservable 转换成普通的 Observable,同时又保持了 Hot Observable 的特性。当出现第一个订阅者时, refCount 会调用 connect()。 每一个订阅者每次都会接收到一样的数据,可是当全部订阅者都取消订阅(dispose)时, refCount 会自动 dispose 上游 Observable
全部的订阅者都取消订阅后,则数据流中止。若是从新订阅则数据流从新开始。若是不是全部的订阅者都取消了订阅,而是只取消了部分。则部分订阅者/观察者从新开始订阅时,不会从头开始数据流。
final SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss"); Observable<Long> obs = Observable.interval(1, TimeUnit.SECONDS).take(6); ConnectableObservable<Long> connectableObservable = obs.publish(); Observable<Long> obsRefCount = connectableObservable.refCount(); obs.subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long aLong) { Log.d(TAG, "Next#1: " + aLong + "->time:" + format.format(new Date())); } @Override public void onError(Throwable e) { Log.d(TAG, "Error: " + e); } @Override public void onComplete() { Log.d(TAG, "Complete#1."); } }); obs.delaySubscription(3, TimeUnit.SECONDS) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long aLong) { Log.d(TAG, "Next#2: " + aLong + "->time:" + format.format(new Date())); } @Override public void onError(Throwable e) { Log.d(TAG, "Error: " + e); } @Override public void onComplete() { Log.d(TAG, "Complete#2."); } }); obsRefCount.subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long aLong) { Log.d(TAG, "Next#3: " + aLong + "->time:" + format.format(new Date())); } @Override public void onError(Throwable e) { Log.d(TAG, "Error: " + e); } @Override public void onComplete() { Log.d(TAG, "Complete#3."); } }); obsRefCount.delaySubscription(3, TimeUnit.SECONDS) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long aLong) { Log.d(TAG, "Next#4: " + aLong + "->time:" + format.format(new Date())); } @Override public void onError(Throwable e) { Log.d(TAG, "Error: " + e); } @Override public void onComplete() { Log.d(TAG, "Complete#4."); } }); // 执行结果 Next#1: 0->time:09:39:50 Next#3: 0->time:09:39:50 Next#1: 1->time:09:39:51 Next#3: 1->time:09:39:51 Next#1: 2->time:09:39:52 Next#3: 2->time:09:39:52 Next#1: 3->time:09:39:53 Next#3: 3->time:09:39:53 Next#4: 3->time:09:39:53 Next#2: 0->time:09:39:53 Next#1: 4->time:09:39:54 Next#3: 4->time:09:39:54 Next#4: 4->time:09:39:54 Next#2: 1->time:09:39:54 Next#3: 5->time:09:39:55 Next#4: 5->time:09:39:55 Complete#3. Next#1: 5->time:09:39:55 Complete#4. Complete#1. Next#2: 2->time:09:39:55 Next#2: 3->time:09:39:56 Next#2: 4->time:09:39:57 Next#2: 5->time:09:39:58 Complete#2.
保证全部的观察者收到相同的数据序列,即便它们 Observable 开始发射数据以后才订阅
replay 操做符返回一个 ConnectableObservable 对象,而且能够缓存发射过的数据,这样即便有订阅者在发射数据以后进行订阅,也能收到以前发射过的数据。不过使用 replay 操做符最好仍是先限定缓存的大小,不然缓存的数据太多时会占用很大一块内存。对缓存的控制能够从空间和时间两个方面来实现。
replay 操做符生成的 ConnectableObservable ,使得观察者不管何时开始订阅,都能收到 Observable 发送的全部数据
final SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss"); Observable<Long> obs = Observable.interval(1, TimeUnit.SECONDS).take(6); ConnectableObservable<Long> connectableObservable = obs.replay(); connectableObservable.connect(); connectableObservable.subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long aLong) { Log.d(TAG, "Next#1: " + aLong + "->time:" + format.format(new Date())); } @Override public void onError(Throwable e) { Log.d(TAG, "Error: " + e); } @Override public void onComplete() { Log.d(TAG, "Complete#1."); } }); connectableObservable.delaySubscription(3, TimeUnit.SECONDS) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long aLong) { Log.d(TAG, "Next#2: " + aLong + "->time:" + format.format(new Date())); } @Override public void onError(Throwable e) { Log.d(TAG, "Error: " + e); } @Override public void onComplete() { Log.d(TAG, "Complete#2."); } }); // 执行结果 Next#1: 0->time:09:59:01 Next#1: 1->time:09:59:02 Next#1: 2->time:09:59:03 Next#2: 0->time:09:59:03 Next#2: 1->time:09:59:03 Next#2: 2->time:09:59:03 Next#1: 3->time:09:59:04 Next#2: 3->time:09:59:04 Next#1: 4->time:09:59:05 Next#2: 4->time:09:59:05 Next#1: 5->time:09:59:06 Next#2: 5->time:09:59:06 Complete#1. Complete#2.
connect() 无须在观察者订阅以后调用也能执行.
replay 有多个接收不一样参数的重载方法,有的能够指定 replay 的最大缓存数量,有的能够指定调度器。
ConnectableObservable 的线程切换只能经过 replay 操做符实现,普通 Observable 的subscribeOn() 和 observerOn() 在 ConnectableObservable 中不起做用。 replay 操做符能够经过指定线程的方式来切换线程。
若是个人文章对您有帮助,不妨点个赞鼓励一下(^_^)