目录html
需求了解:java
在使用 RxJava
开发的过程当中,不少时候须要结合多个条件或者数据的逻辑判断,好比登陆功能的表单验证,实时数据比对等。这个时候咱们就须要使用 RxJava 的结合操做符来完成这一需求,Rx中提供了丰富的结合操做处理的操做方法。react
可用于组合多个Observables的操做方法:git
当 Observables 中的任何一个发射了数据时,使用一个函数结合每一个 Observable 发射的最近数据项,而且基于这个函数的结果发射数据。github
CombineLatest
操做符行为相似于zip
,可是只有当原始的Observable中的每个都发射了一条数据时 zip 才发射数据。 CombineLatest
则在原始的Observable中任意一个发射了数据时发射一条数据。当原始Observables的任何一个发射了一条数据时, CombineLatest 使用一 个函数结合它们最近发射的数据,而后发射这个函数的返回值。api
解析: combineLatest
操做符能够结合多个Observable,能够接收 2-9 个Observable对象, 在其中原始Observables的任何一个发射了一条数据时, CombineLatest 使用一个函数结合它们最近发射的数据,而后发射这个函数的返回值。此外combineLatest
操做符还有一些接收 Iterable , 数组方式的变体,以及其余指定参数combiner、bufferSize、和combineLatestDelayError方法等变体,在此就不在详细展开了,有兴趣的能够查看官方的相关API文档了解。数组
实例代码:缓存
// Observables 建立 Observable<Long> observable1 = Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS); Observable<Long> observable2 = Observable.intervalRange(1, 5, 1, 2, TimeUnit.SECONDS); Observable<Long> observable3 = Observable.intervalRange(100, 5, 1, 1, TimeUnit.SECONDS); // 1. combineLatest(ObservableSource, ObservableSource [支持2-9个参数]..., BiFunction) // 结合多个Observable, 当他们其中任意一个发射了数据时,使用函数结合他们最近发射的一项数据 Observable.combineLatest(observable1, observable2, new BiFunction<Long, Long, String>() { @Override public String apply(Long t1, Long t2) throws Exception { System.out.println("--> apply(1) t1 = " + t1 + ", t2 = " + t2); if (t1 + t2 == 10) { return "Success"; // 知足必定条件,返回指定的字符串 } return t1 + t2 + ""; // 计算全部数据的和并转换为字符串 } }).subscribe(new Consumer<String>() { @Override public void accept(String t) throws Exception { System.out.println("----> accept combineLatest(1): " + t); } }); System.out.println("--------------------------------------------------------"); // 2. combineLatest(T1, T2, T3, Function) // Observables的结合 Observable.combineLatest(observable1, observable2, observable3, new Function3<Long, Long, Long, String>() { @Override public String apply(Long t1, Long t2, Long t3) throws Exception { System.out.println("--> apply(2): t1 = " + t1 + ", t2 = " + t2 + ", t3 = " + t3); return t1 + t2 + t3 + ""; // 计算全部数据的和并转换为字符串 } }).subscribe(new Consumer<String>() { @Override public void accept(String t) throws Exception { System.out.println("--> accept(2): " + t); } });
输出:网络
--> apply(1) t1 = 1, t2 = 1 ----> accept combineLatest(1): 2 --> apply(1) t1 = 2, t2 = 1 ----> accept combineLatest(1): 3 --> apply(1) t1 = 3, t2 = 1 ----> accept combineLatest(1): 4 --> apply(1) t1 = 3, t2 = 2 ----> accept combineLatest(1): 5 --> apply(1) t1 = 4, t2 = 2 ----> accept combineLatest(1): 6 --> apply(1) t1 = 4, t2 = 3 ----> accept combineLatest(1): 7 --> apply(1) t1 = 5, t2 = 3 ----> accept combineLatest(1): 8 --> apply(1) t1 = 5, t2 = 4 ----> accept combineLatest(1): 9 --> apply(1) t1 = 5, t2 = 5 ----> accept combineLatest(1): Success -------------------------------------------------------- --> apply(2): t1 = 1, t2 = 1, t3 = 100 --> accept(2): 102 --> apply(2): t1 = 2, t2 = 1, t3 = 100 --> accept(2): 103 --> apply(2): t1 = 2, t2 = 1, t3 = 101 --> accept(2): 104 --> apply(2): t1 = 2, t2 = 2, t3 = 101 --> accept(2): 105 --> apply(2): t1 = 3, t2 = 2, t3 = 101 --> accept(2): 106 --> apply(2): t1 = 3, t2 = 2, t3 = 102 --> accept(2): 107 --> apply(2): t1 = 4, t2 = 2, t3 = 102 --> accept(2): 108 --> apply(2): t1 = 4, t2 = 2, t3 = 103 --> accept(2): 109 --> apply(2): t1 = 5, t2 = 2, t3 = 103 --> accept(2): 110 --> apply(2): t1 = 5, t2 = 3, t3 = 103 --> accept(2): 111 --> apply(2): t1 = 5, t2 = 3, t3 = 104 --> accept(2): 112 --> apply(2): t1 = 5, t2 = 4, t3 = 104 --> accept(2): 113 --> apply(2): t1 = 5, t2 = 5, t3 = 104 --> accept(2): 114
Javadoc: combineLatest(T1, T2, T3... , T9, combiner)并发
任什么时候候,只要在另外一个Observable发射的数据定义的时间窗口内,这个Observable发射了一条数据,就结合两个Observable发射的数据。
Join
操做符结合两个Observable发射的数据,基于时间窗口(你定义的针对每条数据特定的原则)选择待集合的数据项。你将这些时间窗口实现为一些Observables,它们的生命周期从任何一条Observable发射的每一条数据开始。当这个定义时间窗口的Observable发射了一条数据或者完成时,与这条数据关联的窗口也会关闭。只要这条数据的窗口是打开的,它将继续结合其它Observable发射的任何数据项。你定义一个用于结合数据的函数。
解析: join(other, leftEnd, rightEnd, resultSelector)
相关参数的解析
注意: 这是源Observable和目标Observable发射数据在任意一个基于时间窗口的有效期内才会接收到组合数据,这就意味着可能有数据丢失的状况,在其中一个已经发射完全部数据,而且没有处于时间窗口的数据状况,另外一个Observable的数据发射将不会收到组合数据。
示例代码:
// Observable的建立 Observable<Long> sourceObservable = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS); Observable<Long> targetObservable = Observable.intervalRange(10, 5, 1, 1000, TimeUnit.MILLISECONDS); // 1. join(other, leftEnd, rightEnd, resultSelector) // other: 目标组合的Observable // leftEnd: 接收一个源数据项,返回一个Observable,这个Observable的生命周期就是源Observable发射数据的有效期 // rightEnd: 接收一个源数据项,返回一个Observable,这个Observable的生命周期就是目标Observable发射数据的有效期 // resultSelector: 接收源Observable和目标Observable发射的数据项, 处理后的数据返回给观察者对象 sourceObservable.join(targetObservable, new Function<Long, ObservableSource<Long>>() { @Override public ObservableSource<Long> apply(Long t) throws Exception { System.out.println("-----> t1 is emitter: " + t); return Observable.timer(1000, TimeUnit.MILLISECONDS); // 源Observable发射数据的有效期为1000毫秒 } }, new Function<Long, ObservableSource<Long>>() { @Override public ObservableSource<Long> apply(Long t) throws Exception { System.out.println("-----> t2 is emitter: " + t); return Observable.timer(1000, TimeUnit.MILLISECONDS); // 目标Observable发射数据的有效期为1000毫秒 } }, new BiFunction<Long, Long, String>() { @Override public String apply(Long t1, Long t2) throws Exception { return "t1 = " + t1 + ", t2 = " + t2; // 对数据进行组合后返回和观察者 } }).subscribe(new Consumer<String>() { @Override public void accept(String t) throws Exception { System.out.println("--> accept(1): " + t); } }); System.in.read();
输出:
-----> t1 is emitter: 1 -----> t2 is emitter: 10 --> accept(1): t1 = 1, t2 = 10 -----> t1 is emitter: 2 --> accept(1): t1 = 2, t2 = 10 -----> t1 is emitter: 3 --> accept(1): t1 = 3, t2 = 10 -----> t2 is emitter: 11 --> accept(1): t1 = 1, t2 = 11 --> accept(1): t1 = 2, t2 = 11 --> accept(1): t1 = 3, t2 = 11 -----> t1 is emitter: 4 --> accept(1): t1 = 4, t2 = 11 -----> t1 is emitter: 5 --> accept(1): t1 = 5, t2 = 11 -----> t2 is emitter: 12 --> accept(1): t1 = 3, t2 = 12 --> accept(1): t1 = 4, t2 = 12 --> accept(1): t1 = 5, t2 = 12 -----> t2 is emitter: 13 --> accept(1): t1 = 5, t2 = 13 -----> t2 is emitter: 14 // 此时源t1中已经没有数据还处于时间窗口有效期内
groupJoin
groupJoin
操做符与 join
相同,只是参数传递有所区别。groupJoin(other, leftEnd, rightEnd, resultSelector) 中的resultSelector
能够将原始数据转换为 Observable 类型的数据发送给观察者。
示例代码:
// Observable的建立 Observable<Long> sourceObservable = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS); Observable<Long> targetObservable = Observable.intervalRange(10, 5, 1, 1000, TimeUnit.MILLISECONDS); // 2. groupJoin(other, leftEnd, rightEnd, resultSelector) // groupJoin操做符与join相同,只是参数传递有所区别。 // resultSelector能够将原始数据转换为Observable类型的数据发送给观察者。 sourceObservable.groupJoin(targetObservable, new Function<Long, ObservableSource<Long>>() { @Override public ObservableSource<Long> apply(Long t) throws Exception { System.out.println("-----> t1 is emitter: " + t); return Observable.timer(1000, TimeUnit.MILLISECONDS); // 源Observable发射数据的有效期为1000毫秒 } }, new Function<Long, ObservableSource<Long>>() { @Override public ObservableSource<Long> apply(Long t) throws Exception { System.out.println("-----> t2 is emitter: " + t); return Observable.timer(1000, TimeUnit.MILLISECONDS); // 目标Observable发射数据的有效期为1000毫秒 } }, new BiFunction<Long, Observable<Long>, Observable<String>>() { @Override public Observable<String> apply(Long t1, Observable<Long> t2) throws Exception { System.out.println("--> apply(2) combine: " + t1); // 结合操做 return t2.map(new Function<Long, String>() { @Override public String apply(Long t) throws Exception { System.out.println("-----> apply(2) operation: " + t); return "t1 = " + t1 + ", t2 = " + t; } }); } }).subscribe(new Consumer<Observable<String>>() { @Override public void accept(Observable<String> stringObservable) throws Exception { stringObservable.subscribe(new Consumer<String>() { @Override public void accept(String t) throws Exception { System.out.println("--> accept(2): " + t); } }); } });
输出:
-----> t1 is emitter: 1 --> apply(2) combine: 1 -----> t2 is emitter: 10 -----> apply(2) operation: 10 --> accept(2): t1 = 1, t2 = 10 -----> t1 is emitter: 2 --> apply(2) combine: 2 -----> apply(2) operation: 10 --> accept(2): t1 = 2, t2 = 10 -----> t1 is emitter: 3 --> apply(2) combine: 3 -----> apply(2) operation: 10 --> accept(2): t1 = 3, t2 = 10 -----> t2 is emitter: 11 -----> apply(2) operation: 11 --> accept(2): t1 = 1, t2 = 11 -----> apply(2) operation: 11 --> accept(2): t1 = 2, t2 = 11 -----> apply(2) operation: 11 --> accept(2): t1 = 3, t2 = 11 -----> t1 is emitter: 4 --> apply(2) combine: 4 -----> apply(2) operation: 11 --> accept(2): t1 = 4, t2 = 11 -----> t1 is emitter: 5 --> apply(2) combine: 5 -----> apply(2) operation: 11 --> accept(2): t1 = 5, t2 = 11 -----> t2 is emitter: 12 -----> apply(2) operation: 12 --> accept(2): t1 = 3, t2 = 12 -----> apply(2) operation: 12 --> accept(2): t1 = 4, t2 = 12 -----> apply(2) operation: 12 --> accept(2): t1 = 5, t2 = 12 -----> t2 is emitter: 13 -----> apply(2) operation: 13 --> accept(2): t1 = 5, t2 = 13 -----> t2 is emitter: 14
Javadoc: groupJoin(other, leftEnd, rightEnd, resultSelector)
合并多个Observables的发射物。
使用 Merge
操做符你能够将多个Observables的输出合并,就好像它们是一个单个的 Observable 同样。
Merge 可能会让合并的Observables发射的数据交错(有一个相似的操做符 Concat
不会让数据交错,它会按顺序一个接着一个发射多个Observables的发射物),任何一个原始Observable的 onError
通知会被当即传递给观察者,并且会终止合并后的Observable。
除了传递多个Observable给 merge ,你还能够传递一个Observable列表 List ,数组,甚至是一个发射Observable序列的Observable, merge 将合并它们的输出做为单个Observable的输出。
若是你传递一个发射Observables序列的Observable,你能够指定 merge 应该同时订阅的 Observable 的最大数量。一旦达到订阅数的限制,它将再也不订阅原始Observable发射的任何其它Observable,直到某个已经订阅的Observable发射了 onCompleted 通知。
示例代码:
// 建立Observable对象 Observable<Integer> odd = Observable.just(1, 3, 5); Observable<Integer> even = Observable.just(2, 4, 6); Observable<Integer> big = Observable.just(188888, 688888, 888888); // 建立list对象 List<Observable<Integer>> list = new ArrayList<>(); list.add(odd); list.add(even); list.add(big); // 建立Array对象 Observable<Integer>[] observables = new Observable[3]; observables[0] = odd; observables[1] = even; observables[2] = big; // 建立发射Observable序列的Observable Observable<ObservableSource<Integer>> sources = Observable.create(new ObservableOnSubscribe<ObservableSource<Integer>>() { @Override public void subscribe(ObservableEmitter<ObservableSource<Integer>> emitter) throws Exception { emitter.onNext(Observable.just(1)); emitter.onNext(Observable.just(1, 2)); emitter.onNext(Observable.just(1, 2, 3)); emitter.onNext(Observable.just(1, 2, 3, 4)); emitter.onNext(Observable.just(1, 2, 3, 4, 5)); emitter.onComplete(); } }); // 1. merge(ObservableSource source1, ObservableSource source2, ..., ObservableSource source4) // 可接受 2-4 个Observable对象进行merge Observable.merge(odd, even) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("--> accept(1): " + integer); } }); System.out.println("-----------------------------------------------"); // 2. merge(Iterable<? extends ObservableSource<? extends T>> sources, int maxConcurrency, int bufferSize) // 可选参数, maxConcurrency: 最大的并发处理数, bufferSize: 缓存的数量(从每一个内部观察资源预取的项数) // 接受一个Observable的列表List Observable.merge(list) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("--> accept(2): " + integer); } }); System.out.println("-----------------------------------------------"); // 3. mergeArray(int maxConcurrency, int bufferSize, ObservableSource<? extends T>... sources) // 可选参数, maxConcurrency: 最大的并发处理数, bufferSize: 缓存的数量(从每一个内部观察资源预取的项数) // 接受一个Observable的数组Array Observable.mergeArray(observables) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("--> accept(3): " + integer); } }); System.out.println("-----------------------------------------------"); // 4. merge(ObservableSource<? extends ObservableSource<? extends T>> sources, int maxConcurrency) // 可选参数, maxConcurrency: 最大的并发处理数 // 接受一个发射Observable序列的Observable Observable.merge(sources) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("--> accept(4): " + integer); } }); System.out.println("-----------------------------------------------"); // 5. mergeWith(other) // merge 是静态方法, mergeWith 是对象方法: Observable.merge(odd,even) 等价于 odd.mergeWith(even) odd.mergeWith(even) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("--> accept(5): " + integer); } });
输出:
--> accept(1): 1 --> accept(1): 3 --> accept(1): 5 --> accept(1): 2 --> accept(1): 4 --> accept(1): 6 ----------------------------------------------- --> accept(2): 1 --> accept(2): 3 --> accept(2): 5 --> accept(2): 2 --> accept(2): 4 --> accept(2): 6 --> accept(2): 188888 --> accept(2): 688888 --> accept(2): 888888 ----------------------------------------------- --> accept(3): 1 --> accept(3): 3 --> accept(3): 5 --> accept(3): 2 --> accept(3): 4 --> accept(3): 6 --> accept(3): 188888 --> accept(3): 688888 --> accept(3): 888888 ----------------------------------------------- --> accept(4): 1 --> accept(4): 1 --> accept(4): 2 --> accept(4): 1 --> accept(4): 2 --> accept(4): 3 --> accept(4): 1 --> accept(4): 2 --> accept(4): 3 --> accept(4): 4 --> accept(4): 1 --> accept(4): 2 --> accept(4): 3 --> accept(4): 4 --> accept(4): 5 ----------------------------------------------- --> accept(5): 1 --> accept(5): 3 --> accept(5): 5 --> accept(5): 2 --> accept(5): 4 --> accept(5): 6
Javadoc: merge(source1, ... , source4)
Javadoc: merge(Iterable sources, int maxConcurrency, int bufferSize)
Javadoc: mergeArray(int maxConcurrency, int bufferSize, ObservableSource... sources)
Javadoc: merge(ObservableSourcesources, int maxConcurrency)
若是传递给 merge 的任何一个的Observable发射了 onError
通知终止了, merge 操做符生成的Observable也会当即以onError
通知终止。若是你想让它继续发射数据,在最后才报告错误,可使用 mergeDelayError
。
MergeDelayError
操做符,mergeDelayError 在合并与交错输出的使用上与 merge
相同,区别在于它会保留 onError
通知直到其余没有Error的Observable全部的数据发射完成,在那时它才会把onError
传递给观察者。
注意: 若是有多个原始Observable出现了Error
, 这些Error通知会被合并成一个 CompositeException
,保留在CompositeException 内部的 List<Throwable> exceptions
中,可是若是只有一个原始Observable出现了Error,则不会生成 CompositeException ,只会发送这个Error通知。
因为MergeDelayError
使用上和merge
相同 ,因此这里就不作详细分析了,这里就简单描述其中的一种的使用实例。
实例代码:
// 建立有Error的Observable序列的Observable Observable<ObservableSource<Integer>> DelayErrorObservable = Observable.create(new ObservableOnSubscribe<ObservableSource<Integer>>() { @Override public void subscribe(ObservableEmitter<ObservableSource<Integer>> emitter) throws Exception { emitter.onNext(Observable.just(1)); emitter.onNext(Observable.error(new Exception("Error Test1"))); // 发射一个Error的通知的Observable emitter.onNext(Observable.just(2, 3)); emitter.onNext(Observable.error(new Exception("Error Test2"))); // 发射一个Error的通知的Observable emitter.onNext(Observable.just(4, 5, 6)); emitter.onComplete(); } }); // 6. mergeDelayError // 保留onError通知直到合并后的Observable全部的数据发射完成,在那时它才会把onError传递给观察者 Observable.mergeDelayError(DelayErrorObservable) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(6)"); } @Override public void onNext(Integer integer) { System.out.println("--> onNext(6): " + integer); } @Override public void onError(Throwable e) { // 判断是不是CompositeException对象(发生多个Observable出现Error时会发送的对象) if (e instanceof CompositeException) { CompositeException compositeException = (CompositeException) e; List<Throwable> exceptions = compositeException.getExceptions(); System.out.println("--> onError(6): " + exceptions); } else { System.out.println("--> onError(6): " + e); } } @Override public void onComplete() { System.out.println("--> onComplete(6)"); } });
输出:
--> onSubscribe(6) --> onNext(6): 1 --> onNext(6): 2 --> onNext(6): 3 --> onNext(6): 4 --> onNext(6): 5 --> onNext(6): 6 --> onError(6): [java.lang.Exception: Error Test1, java.lang.Exception: Error Test2]
Javadoc: mergeDelayError(source1, … , source4)
Javadoc: mergeDelayError(Iterable sources, int maxConcurrency, int bufferSize)
Javadoc: mergeArrayDelayError(int maxConcurrency, int bufferSize, ObservableSource… sources)
Javadoc: mergeDelayError(ObservableSource sources, int maxConcurrency)
经过一个函数将多个Observables的发射物结合到一块儿,基于这个函数的结果为每一个 结合体 发射单个数据项。
Zip
操做符与 Merge
相似,都是合并多个Observables的数据,返回一个Obversable,主要不一样的是它使用这个函数按顺序结合两个或多个Observables发射的数据项,而后它发射这个函数返回的结果。它按照严格的顺序应用这个函数。 它只发射与发射数据项最少的那个Observable同样多的数据。
解析:
Zip
操做符与 Merge
的使用上基本一致,主要不一样的是 zip 发射的数据取决于发射数据项最少的那个Observable而且按照严格的顺序去结合数据。zip
与对象方法 zipWith
,能够传递一个Observable列表 List ,数组,甚至是一个发射Observable序列的Observable。使用上在此就不作详细的展开了,可参照上面的 Merge
使用方法,下面就针对 zip
的特性实现一个简单的实例。
实例代码:
// 建立Observable Observable<Integer> observable1 = Observable.just(1, 2, 3); Observable<Integer> observable2 = Observable.just(1, 2, 3, 4, 5, 6); // zip(sources) // 可接受2-9个参数的Observable,对其进行顺序合并操做,最终合并的数据项取决于最少的数据项的Observable Observable.zip(observable1, observable2, new BiFunction<Integer, Integer, String>() { @Override public String apply(Integer t1, Integer t2) throws Exception { System.out.println("--> apply: t1 = " + t1 + ", t2 = " + t2); return t1 + t2 + ""; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println("--> accept: " + s); // 最终接受observable1所有数据项与observable2相同数量顺序部分数据 } });
输出:
--> apply: t1 = 1, t2 = 1 --> accept: 2 --> apply: t1 = 2, t2 = 2 --> accept: 4 --> apply: t1 = 3, t2 = 3 --> accept: 6
Javadoc: zip( source1, source2, ... , source9, zipper )
Javadoc: zip( Iterable sources, Function zipper )
Javadoc: zipIterable(Iterable<ObservableSource> sources, Function<Object[],R> zipper, boolean delayError, int bufferSize)
Javadoc: zipArray( Function<Object[]> zipper, boolean delayError, int bufferSize, ObservableSource... sources )
Javadoc: zip( ObservableSource<ObservableSource> sources, Function<Object[]> zipper )
在数据序列的开头插入一条指定的数据项或者数据序列。
若是你想要一个Observable在发射数据以前先发射一个指定的数据或者数据序列(能够是单个数据、数组、列表,Observable中的数据),可使 用 StartWith
操做符。(若是你想一个Observable发射的数据末尾追加一个数据序列可使用 Concat
操做符。)
实例代码:
// 建立列表List List<Integer> lists = new ArrayList<>(); lists.add(999); lists.add(9999); lists.add(99999); // 建立数组Array Integer[] arrays = new Integer[3]; arrays[0] = 999; arrays[1] = 9999; arrays[2] = 9999; // 1. startWith(item) // 在Observable数据发射前发射item数据项 Observable.just(1, 2, 3) .startWith(999) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("--> accept(1): " + integer); } }); System.out.println("-----------------------------------------"); // 2. startWith(Iterable items) // 在Observable数据发射前发射items列表中的数据序列 Observable.just(1, 2, 3) .startWith(lists) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("--> accept(2): " + integer); } }); System.out.println("-----------------------------------------"); // 3. startWithArray(items) // 在Observable数据发射前发射items数组中的数据序列 Observable.just(1, 2, 3) .startWithArray(arrays) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("--> accept(3): " + integer); } }); System.out.println("-----------------------------------------"); // 4. startWith(ObservableSource other) // 在Observable数据发射前发射other中的数据序列 Observable.just(1, 2, 3) .startWith(Observable.just(999, 9999, 99999)) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { System.out.println("--> accept(4): " + integer); } });
输出:
--> accept(1): 999 --> accept(1): 1 --> accept(1): 2 --> accept(1): 3 ----------------------------------------- --> accept(2): 999 --> accept(2): 9999 --> accept(2): 99999 --> accept(2): 1 --> accept(2): 2 --> accept(2): 3 ----------------------------------------- --> accept(3): 999 --> accept(3): 9999 --> accept(3): 9999 --> accept(3): 1 --> accept(3): 2 --> accept(3): 3 ----------------------------------------- --> accept(4): 999 --> accept(4): 9999 --> accept(4): 99999 --> accept(4): 1 --> accept(4): 2 --> accept(4): 3
Javadoc: startWith(item)
Javadoc: startWith(Iterable items)
Javadoc: startWithArray(items)
Javadoc: startWith(ObservableSource other)
将一个发射多个Observables的Observable转换成另外一个单独的Observable,后者发射那些 Observables最近发射的数据项。
switchOnNext
订阅一个发射多个Observables的Observable。它每次观察那些Observables中的一个, switchOnNext
发射的这个新Observable并取消订阅前一个发射数据的旧Observable,开始发射最新的Observable发射的数据。
注意: 当原始Observables发射了一个新的Observable时(不是这个新的Observable发射了一条数据时),它将取消订阅以前的那个Observable。这意味着,在 后来那个Observable产生以后到它开始发射数据以前的这段时间里,前一个Observable发射 的数据将被丢弃(就像图例上的那个黄色圆圈同样)。
当Observables
发射一个新的Observable后,则会取消订阅前面的旧observable,直接开始接受新Observable的数据,若是Observables中的Observable有 Error
异常,将保留 onError
通知直到其余没有Error的Observable全部的数据发射完成,在那时它才会把 onError 传递给观察者。
注意: 若是有多个原始Observable出现了Error
, 这些Error通知会被合并成一个 CompositeException
,保留在CompositeException 内部的 List<Throwable> exceptions
中,可是若是只有一个原始Observable出现了Error,则不会生成 CompositeException ,只会发送这个Error通知。
实例代码:
// 建立Observable Observable<Long> observable1 = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS); Observable<Long> observable2 = Observable.intervalRange(10, 5, 1, 500, TimeUnit.MILLISECONDS); // 建立发射Observable序列的Observable Observable<Observable<Long>> sources = Observable.create(new ObservableOnSubscribe<Observable<Long>>() { @Override public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception { emitter.onNext(observable1); Thread.sleep(1000); // 此时发射一个新的observable2,将会取消订阅observable1 emitter.onNext(observable2); emitter.onComplete(); } }); // 建立发射含有Error通知的Observable序列的Observable Observable<Observable<Long>> sourcesError = Observable.create(new ObservableOnSubscribe<Observable<Long>>() { @Override public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception { emitter.onNext(observable1); emitter.onNext(Observable.error(new Exception("Error Test1!"))); // 发射一个发射Error通知的Observable emitter.onNext(Observable.error(new Exception("Error Test2!"))); // 发射一个发射Error通知的Observable Thread.sleep(1000); // 此时发射一个新的observable2,将会取消订阅observable1 emitter.onNext(observable2); emitter.onComplete(); } }); // 1. switchOnNext(ObservableSource<ObservableSource> sources, int bufferSize) // 可选参数 bufferSize: 缓存数据项大小 // 接受一个发射Observable序列的Observable类型的sources, // 当sources发射一个新的Observable后,则会取消订阅前面的旧observable,直接开始接受新Observable的数据 Observable.switchOnNext(sources) .subscribe(new Consumer<Long>() { @Override public void accept(Long integer) throws Exception { System.out.println("--> accept(1): " + integer); } }); System.in.read(); System.out.println("--------------------------------------------------------------------"); // 2. switchOnNextDelayError(ObservableSource<ObservableSource> sources, int prefetch) // 可选参数 prefetch: 与读取数据项大小 // 当sources发射一个新的Observable后,则会取消订阅前面的旧observable,直接开始接受新Observable的数据, // 保留onError通知直到合并后的Observable全部的数据发射完成,在那时它才会把onError传递给观察者 Observable.switchOnNextDelayError(sourcesError) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe(2)"); } @Override public void onNext(Long t) { System.out.println("--> onNext(2): " + t); } @Override public void onError(Throwable e) { // 判断是不是CompositeException对象(发生多个Observable出现Error时会发送的对象) if (e instanceof CompositeException) { CompositeException compositeException = (CompositeException) e; List<Throwable> exceptions = compositeException.getExceptions(); System.out.println("--> onError(2): " + exceptions); } else { System.out.println("--> onError(2): " + e); } } @Override public void onComplete() { System.out.println("--> onComplete(2)"); } }); System.in.read();
输出:
--> accept(1): 1 --> accept(1): 2 --> accept(1): 10 --> accept(1): 11 --> accept(1): 12 --> accept(1): 13 --> accept(1): 14 -------------------------------------------------------------------- --> onSubscribe(2) --> onNext(2): 10 --> onNext(2): 11 --> onNext(2): 12 --> onNext(2): 13 --> onNext(2): 14 --> onError(2): [java.lang.Exception: Error Test1!, java.lang.Exception: Error Test2!]
Javadoc: switchOnNext(ObservableSource<ObservableSource> sources, int bufferSize)
Javadoc: switchOnNextDelayError(ObservableSource<ObservableSource> sources, int prefetch)
Rxjava 的合并操做符可以同时处理多个被观察者,并发送相应的事件通知以及数据。经常应用于多业务合并处理场景,好比表单的联动验证,网络交互性数据的校验等,rxjava的合并操做符可以很好的去实现和处理。
提示:以上使用的Rxjava2版本: 2.2.12
Rx介绍与讲解及完整目录参考:Rxjava2 介绍与详解实例
实例代码: