目录html
如今咱们能够看到愈来愈多的开发者都在使用 Rx 相关的技术进行 App,Java 后端等领域进行开发。在开源的社区以及互联网公司,Rx、响应式编程、函数式都是热门的存在。因此笔者将结合自身的学习以及实际使用状况,写一个针对 Rxjava2 的系列文章,一块儿学习和使用 Rxjava 所带来的便捷。java
笔者将利用工做之余,结合 ReactiveX 官方 Wiki
对 Rxjava 的定义与介绍,对相关基础知识、基本操做,经常使用部分的 API 进行整理,并加上我的理解和相关操做的示例。react
相关参考连接:git
Rxjava2 系列文章目录:github
未完,待续(正在努力整理中)...
实例代码:编程
ReactiveX的历史后端
ReactiveX
是Reactive Extensions的缩写,通常简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET、JavaScript和C++,Rx近几年愈来愈流行了,如今已经支持几乎所有的流行编程语言了,Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是reactivex.io。数组
什么是ReactiveX缓存
微软给的定义是,Rx是一个函数库,让开发者能够利用可观察序列和LINQ风格查询操做符来编写异步和基于事件的程序,使用Rx,开发者能够用Observables表示异步数据流,用LINQ操做符查询异步数据流, 用Schedulers参数化异步数据流的并发处理,Rx能够这样定义:Rx = Observables + LINQ + Schedulers。安全
ReactiveX.io给的定义是,Rx是一个使用可观察数据流进行异步编程
的编程接口,ReactiveX结合了观察者模式
、迭代器模式
和函数式编程
的精华。
RxJava 究竟是什么
RxJava 在 GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。这就是 RxJava ,归纳得很是精准。
然而,对于初学者来讲,这仍是比较含蓄难懂的。由于它是一个总结
,而初学者更须要一个入门的介绍或者理解。其实, RxJava 的本质能够总结为异步
的概念。说到本质上,它就是一个实现异步操做的库。RxJava 的异步实现,是经过一种扩展的观察者模式
来实现的。
RxJava 优势
一样是作异步,为何去使用它,而不用现成的 Thread,ThreadPoolExecutor,Android的AsyncTask / Handler / ... ?其实就是简洁,易用
!
异步操做很关键的一点是程序的简洁性,由于在调度过程比较复杂的状况下,异步代码常常会既难写也难被读懂。 正如Android 创造的 AsyncTask 和Handler ,其实都是为了让异步代码更加简洁。RxJava 的优点也是简洁,但它的简洁的不同凡响之处在于,随着程序逻辑变得愈来愈复杂,它依然可以保持简洁。
名词定义
使用观察者模式
简化代码
使用Observable的优点
Rx扩展了观察者模式用于支持数据和事件序列,添加了一些操做符,它让你能够声明式的组合这些序列,而无需关注底层的实现:如线程、同步、线程安全、并发数据结构和非阻塞IO。
Observable经过使用最佳的方式访问异步数据序列填补了这个间隙。
类型 | 单个数据 | 多个数据 |
---|---|---|
同步 | T getData() | Iterable
|
异步 | Future<T> getData() | Observable<T> getData() |
Rx的Observable模型让你能够像使用集合数据同样操做异步事件流,对异步事件流使用各类
简单、可组合的操做。
1. Observable可组合
对于单层的异步操做来讲,Java中Future对象的处理方式是很是简单有效的,可是一旦涉及到嵌套,它们就开始变得异常繁琐和复杂。使用Future很难很好的组合带条件的异步执行流程(考虑到运行时各类潜在的问题,甚至能够说是不可能的),固然,要想实现仍是能够作到的,可是很是困难,或许你能够用 Future.get() ,但这样作,异步执行的优点就彻底没有了。从另外一方面说,Rx的bservable一开始就是为组合异步数据流准备的。
2. Observable更灵活
Rx的Observable不只支持处理单独的标量值(就像Future能够作的),也支持数据序列,甚至是无穷的数据流。 Observable 是一个抽象概念,适用于任何场景。Observable拥有它的近亲Iterable的所有优雅与灵活。
Observable是异步的双向push,Iterable是同步的单向pull,对比:
事件 | Iterable(pull) | Observable(push) |
---|---|---|
获取数据 | T next() | onNext(T) |
异常处理 | throws Exception | onError(Exception) |
任务完成 | !hasNext() | onCompleted |
3. Observable无偏见
Rx对于对于并发性或异步性没有任何特殊的偏好,Observable能够用任何方式实现,线程池、事件循环、非阻塞IO、Actor模式,任何知足你的需求的,你擅长或偏好的方式均可以。不管你选择怎样实现它,不管底层实现是阻塞的仍是非阻塞的,客户端代码将全部与Observable的交互都当作是异步的。
下列是笔者使用的版本(可根据实际状况进行选择):
implementation "io.reactivex.rxjava2:rxjava:2.2.12"
1.1 观察者模式
基本概念:Observable
(可观察者,即被观察者)、Observer
(观察者)、 subscribe
(订阅)、事件
。Observable 和 Observer 经过 subscribe() 方法实现订阅关系,从而 Observable 能够在须要的时候发出事件来通知 Observer(观察者观察被观察者的通知事件)。
在RxJava中,一个实现了 Observer 接口的对象能够订阅 (subscribe) 一个 Observable 类的实例。订阅者(subscriber) 对 Observable 发射 (emit) 的任何数据或数据序列做出响应。这种模式 简化了并发操做,由于它不须要阻塞等待 Observable 发射数据,而是建立了一个处于待命状态的观察者哨兵,哨兵在将来某个时刻响应Observable的通知。
RxJava 的事件回调方法: onSubscribe()
、onNext()
、 onCompleted()
和 onError()
。
注意: 在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,而且是事件序列中的最后一个。须要注意的是,onCompleted() 和 onError() 两者也是互斥的,即在队列中调用了其中一个,就不该该再调用另外一个。
1.2 Consumer 和 Action
这两个词意思分别是 消费者
(能够理解为消费被观察者发射出来的事件)和 行为
(能够理解为响应被观察者的行为)。对于 Observer 中的 4 个回调方法,咱们未必都能用获得,若是只须要用到其中的一部分,就须要 Consumer 和 Action 上场了。
简单示例:
// 1. 进行订阅,subscribe(Observer) observable.subscribe(observer); System.out.println("---------------------------------------------"); // 2. 进行订阅,subscribe(Consumer onNext) observable.subscribe(nextConsumer); System.out.println("---------------------------------------------"); // 3. 进行订阅,subscribe(Consumer onNext, Consumer onError) observable.subscribe(nextConsumer, errorConsumer); System.out.println("---------------------------------------------"); // 4. 进行订阅,subscribe(Consumer onNext, Consumer onError, Action onCompleted) observable.subscribe(nextConsumer, errorConsumer, completedAction); System.out.println("---------------------------------------------"); // 5. 进行订阅,subscribe(Consumer onNext, Consumer onError, Action onCompleted, Consumer onSubscribe) observable.subscribe(nextConsumer, errorConsumer, completedAction, onSubscribeComsumer);
1.3 Observable的分类
在RxJava中,Observable 有 Hot 与 Cold 之分。
Javadoc: Observable
Rxjava2.x 中有这么一个被观察者 Flowable
,一样做为被观察者,它和Observable有什么区别呢,在Rxjava2中,Observable再也不支持背压,而新增的Flowable支持背压,何为背压,就是异步场景
下上游发送事件的速度大于下游处理事件的速度所产生的现象。
提示:在本系列后面会有详细的单独篇章来介绍和如何使用背压。
Javadoc: Flowable
Single
相似于 Observable,不一样的是,它老是只发射一个值,或者一个错误通知,而不是发射一系列的值。
所以,不一样于Observable须要三个方法 onNext, onError, onCompleted,订阅Single只须要两个方法:
Single 只会调用这两个方法中的一个,并且只会调用一次,调用了任何一个方法以后,订阅关系终止。
示例代码:
// Single: 只发送 onSuccess or onError 通知,而且只会发送一次, 第一次发送数据后的都不会在处理 Single.create(new SingleOnSubscribe<String>() { @Override public void subscribe(SingleEmitter<String> emitter) throws Exception { emitter.onSuccess("Success"); // 发送success通知 emitter.onSuccess("Success2"); // 只能发送一次通知,后续不在处理 } }).subscribe(new BiConsumer<String, Throwable>() { @Override public void accept(String t1, Throwable t2) throws Exception { System.out.println("--> accept: t1 = " + t1 + ", t2 = " + t2); } });
输出:
--> accept: t1 = Success, t2 = null
提示:Single 能够经过
toXXX
方法转换为 Observable, Flowable, Completable与Maybe。
Javadoc: Single
Completable 在建立后,不会发射任何数据, 只有 onComplete
与 onError
事件,同时没有Observable中的一些操做符,如 map,flatMap。一般与 andThen
操做符结合使用。
示例代码:
// 1. Completable:只发送complete 或 error 事件,不发送任何数据 Completable.fromAction(new Action() { @Override public void run() throws Exception { System.out.println("Hello World! This is Completable."); } }).subscribe(new CompletableObserver() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe"); } @Override public void onError(Throwable e) { System.out.println("--> onError"); } @Override public void onComplete() { System.out.println("--> onComplete"); } }); System.out.println("----------------------------------------------"); // 2. 与 andThen 结合使用,当Completable执行完onCompleted后,执行andThen里的任务 Completable.create(new CompletableOnSubscribe() { @Override public void subscribe(CompletableEmitter emitter) throws Exception { Thread.sleep(1000); System.out.println("--> completed"); emitter.onComplete(); } }).andThen(Observable.range(1, 5)).subscribe(new Consumer<Integer>() { @Override public void accept(Integer t) throws Exception { System.out.println("--> accept: " + t); } });
输出:
--> onSubscribe Hello World! This is Completable. --> onComplete ---------------------------------------------- --> completed --> accept: 1 --> accept: 2 --> accept: 3 --> accept: 4 --> accept: 5
提示:Completable 能够经过
toXXX
方法转换为 Observable, Flowable, Single与Maybe。
Javadoc: Completable
Maybe 是 Rxjava 2.x 之后的新类型,只能发射 0 或者 1 项数据,即便后续有多个数据,后面的数据也不会被处理。能够看作是 Single 与 Completable 结合。
示例代码:
// Maybe 只发送0个或者1个数据,后续数据将被忽略 Maybe.create(new MaybeOnSubscribe<String>() { @Override public void subscribe(MaybeEmitter<String> emitter) throws Exception { // 若是先发送了,将会调用MaybeObserver的onCompleted方法,若是有数据发送或者调用onError,则不会去调用 // emitter.onComplete(); emitter.onSuccess("Hello"); // 若是发送了第一个数据后续数据将不会被处理 emitter.onSuccess("World"); } }).subscribe(new MaybeObserver<String>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe"); } @Override public void onSuccess(String t) { System.out.println("--> onSuccess: " + t); } @Override public void onError(Throwable e) { System.out.println("--> onError: " + e); } @Override public void onComplete() { System.out.println("--> onComplete"); } });
输出:
--> onSubscribe --> onSuccess: Hello
提示:Maybe 能够经过
toXXX
方法转换为 Observable, Flowable, Single与Completable。
Javadoc: Maybe
Subject
能够当作是一个桥梁或者代理,在 RxJava 实现中,它同时充当了 Observer
和 Observable
的角色。由于它是一个Observer,它能够订阅一个或多个 Observable;又由于它是一个 Observable ,它能够转发它收到(Observe)的数据,也能够发射新的数据。
它既能够是数据源observerable,也能够是数据的订阅者Observer。这个能够经过源码来了解一下。
public abstract class Subject<T> extends Observable<T> implements Observer<T> { ... }
Subject 实际上仍是 Observable,不过它由于实现了Observer接口,能够经过onNext、onComplete、onError方法发射和终止发射数据。
注意: 不要使用just(T)
、from(T)
、create(T)
来使用Subject,由于会把Subject转换为Obserable。
在 Rxjava 中,官方一共为咱们提供了几种Subject:
AsyncSubject
仅释放 onComplete() 以前的最后一个数据(必须调用subject.onComplete()才会发送数据,不然观察者不会接收到任何数据)。
能够获取数据业务逻辑的最后的结果数据。
注意: 若是因异常(Error)终止,将不会向后续的Observer释放数据,可是会向Observer传递一个异常通知。
实例代码:
// 注意: 不要使用just(T)、from(T)、create(T)来使用Subject,由于会把Subject转换为Obserable // 不管订阅的时候AsyncSubject是否Completed,都可以收到最后一个值的回调 AsyncSubject<String> asyncSubject = AsyncSubject.create(); asyncSubject.onNext("emitter 1"); asyncSubject.onNext("emitter 2"); asyncSubject.onNext("emitter 3"); asyncSubject.onNext("emitter 4"); asyncSubject.onNext("emitter 5"); // 此时订阅后将近发送此项数据 // asyncSubject.onNext(1/0 + ""); // 发生error时将不会有数据发射,仅发送error通知 asyncSubject.onComplete(); // 订阅后只会接收最后一个数据 asyncSubject.subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe"); } @Override public void onNext(String t) { System.out.println("--> onNext = " + t); } @Override public void onError(Throwable e) { System.out.println("--> onError = " + e); } @Override public void onComplete() { System.out.println("--> onComplete"); } });
输出:
--> onSubscribe --> onNext = emitter 5 --> onComplete
Javadoc: AsyncSubject
当观察者订阅 BehaviorSubject
时,它开始发射原始Observable在订阅前的最后一个发射的数据(若是此时尚未收到任何数据,它会发射一个默认值),而后继续发射其它任何来自原始Observable的数据。
能够缓存订阅前最后一次发出的数据,以及订阅后发送的全部数据。
注意: 若是因异常(Error)终止,将不会向后续的Observer释放数据,可是会向Observer传递一个异常通知。
实例代码:
// 建立无默认值的BehaviorSubject BehaviorSubject<Integer> subject = BehaviorSubject.create(); // 建立有默认值的BehaviorSubject BehaviorSubject<Integer> subjectDefault = BehaviorSubject.createDefault(-1); // 观察者对象 Observer<Integer> observer = new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--------------------------------"); System.out.println("--> onSubscribe"); } @Override public void onNext(Integer t) { System.out.println("--> onNext: " + t); } @Override public void onError(Throwable e) { System.out.println("--> onError: " + e); } @Override public void onComplete() { System.out.println("--> onComplete"); } }; // 1. 无数据发送的时候,发送默认值 // subjectDefault.subscribe(observer); // 2. 此时会发射全部订阅后正常发射的数据: 1, 2, 3, 4, error // subject.subscribe(observer); subject.onNext(1); subject.onNext(2); subject.onNext(3); // 3. 此时会发射订阅前的一个数据及后面正常发射的数据: 3, 4, error // subject.subscribe(observer); subject.onNext(4); subject.onError(new NullPointerException()); // 4. 此时不会发射后续数据,仅发送Error通知 // subject.subscribe(observer); subject.onNext(5); subject.onComplete(); // 5. 此时没有数据发射,若是有error存在的话,将会发送error subject.subscribe(observer);
输出:
-------------------------------- --> onSubscribe --> onNext: -1 -------------------------------- --> onSubscribe --> onNext: 1 --> onNext: 2 --> onNext: 3 --> onNext: 4 --> onError: java.lang.NullPointerException -------------------------------- --> onSubscribe --> onNext: 3 --> onNext: 4 --> onError: java.lang.NullPointerException -------------------------------- --> onSubscribe --> onError: java.lang.NullPointerException -------------------------------- --> onSubscribe --> onError: java.lang.NullPointerException
Javadoc: BehaviorSubject
PublishSubject
只会把在订阅发生的时间点以后来自原始Observable的数据发射给观察者。须要注意的是,PublishSubject 可能会一建立完成就马上开始发射数据(除非你能够阻止它发生),所以这里有一个风险:在Subject被建立后到有观察者订阅它以前这个时间间隙内,可能有一个或多个数据可能会丢失。若是要确保来自原始Observable的全部数据都被分发,你须要这样作:使用Create建立那个Observable以便手动给它引入 "冷" Observable的行为(当全部观察者都已经订阅时才开始发射数据),或者改用 ReplaySubject
。
若是 PublishSubject 在订阅前已经调用了 onComplete()
方法,则观察者不会接收到数据。
注意: 若是因异常(Error)终止,将不会向后续的Observer释放数据,可是会向Observer传递一个异常通知。
实例代码:
// 释放订阅后接收到正常发射的数据,有error将不会发射任何数据 PublishSubject<Integer> subject = PublishSubject.create(); // 观察者对象 Observer<Integer> observer = new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--------------------------------"); System.out.println("--> onSubscribe"); } @Override public void onNext(Integer t) { System.out.println("--> onNext: " + t); } @Override public void onError(Throwable e) { System.out.println("--> onError: " + e); } @Override public void onComplete() { System.out.println("--> onComplete"); } }; // 1. 此时订阅将释放后续正常发射的数据: 1,2, 3, 4, error // subject.subscribe(observer); subject.onNext(1); subject.onNext(2); // 2. 此时订阅,发射后续正常发射的数据:3, 4, error // subject.subscribe(observer); subject.onNext(3); subject.onNext(4); // 此时将不会发送任何数据,直接发送error subject.onError(new NullPointerException()); subject.onNext(5); subject.onComplete(); // 3. 此时订阅若是有error,仅发送error,不然无数据发射 subject.subscribe(observer);
输出:
-------------------------------- --> onSubscribe --> onNext: 1 --> onNext: 2 --> onNext: 3 --> onNext: 4 --> onError: java.lang.NullPointerException -------------------------------- --> onSubscribe --> onNext: 3 --> onNext: 4 --> onError: java.lang.NullPointerException -------------------------------- --> onSubscribe --> onError: java.lang.NullPointerException
Javadoc: PublishSubject
ReplaySubject
会发射全部来自原始Observable的数据给观察者,不管它们是什么时候订阅的。也 有其它版本的ReplaySubject,在重放缓存增加到必定大小的时候或过了一段时间后会丢弃旧的数据(原始Observable发射的)。
若是你把 ReplaySubject
看成一个观察者使用,注意不要从多个线程中调用它的onNext方法 (包括其它的on系列方法),这可能致使同时(非顺序)调用,这会违反Observable协议, 给Subject的结果增长了不肯定性。
ReplaySubject 还能够限制缓存数据的数量,限制缓存的时间:
实例代码:
// 1. 接受收到的全部数据以及通知,对每隔Observer都执行相同的独立的操做 ReplaySubject<Integer> subject = ReplaySubject.create(); // 2. 指定内部缓存大小,此方法避免在内部缓冲区增加以容纳新缓冲区时过多的数组重分配 // ReplaySubject<Integer> subject = ReplaySubject.create(5); // 3. createWithSize(count) // 指定保留订阅前数据项的个数的Subject,会发射订阅前count个数据和后续的数据 // ReplaySubject<Integer> subject = ReplaySubject.createWithSize(1); // 4. createWithTime(maxAge, unit, scheduler) // 指定保留订阅前指定maxAge时间段内数据和后续的数据 // ReplaySubject<Integer> subject = ReplaySubject.createWithTime(1, TimeUnit.MILLISECONDS, Schedulers.trampoline()); // 建立Observer(观察者), 能够接受Observable全部通知 Observer<Integer> observer = new Observer<Integer>() { public void onSubscribe(Disposable d) { System.out.println("----------------------------------"); System.out.println("--> onSubscribe"); } public void onNext(Integer t) { System.out.println("--> onNext = " + t); } public void onError(Throwable e) { System.out.println("--> onError: " + e); } public void onComplete() { System.out.println("--> onComplete"); } }; // 正常接受全部Observable的数据和通知 subject.subscribe(observer); subject.onNext(1); subject.onNext(2); subject.onNext(3); // 正常接受全部Observable的数据和通知 subject.subscribe(observer); subject.onNext(4); // 若是有error,则发送error通知,不影响任何一个观察者数据与通知接受 // subject.onError(new NullPointerException()); subject.onNext(5); subject.onComplete(); // 正常接受全部Observable的数据和通知 subject.subscribe(observer);
输出:
---------------------------------- --> onSubscribe --> onNext = 1 --> onNext = 2 --> onNext = 3 ---------------------------------- --> onSubscribe --> onNext = 1 --> onNext = 2 --> onNext = 3 --> onNext = 4 --> onNext = 4 --> onNext = 5 --> onNext = 5 --> onComplete --> onComplete ---------------------------------- --> onSubscribe --> onNext = 1 --> onNext = 2 --> onNext = 3 --> onNext = 4 --> onNext = 5 --> onComplete
Javadoc: ReplaySubject
UnicastSubject
是仅支持订阅一次的 Subject ,若是多个订阅者试图订阅这个 Subject 将会受到 IllegalStateException
。
经常使用于一次性消费或安全场合,如网络结算,支付等。
实例代码:
// 建立UnicastSubject,只能被订阅一次,不能再次被订阅 UnicastSubject<Integer> subject = UnicastSubject.create(); // 建立Observer(观察者), 能够接受Observable全部通知 Observer<Integer> observer = new Observer<Integer>() { public void onSubscribe(Disposable d) { System.out.println("--------------------------------"); System.out.println("--> onSubscribe"); } public void onNext(Integer t) { System.out.println("--> onNext = " + t); } public void onError(Throwable e) { System.out.println("--> onError: " + e); } public void onComplete() { System.out.println("--> onComplete"); } }; // 订阅后,此subject将不能够再被订阅了 subject.subscribe(observer); subject.onNext(1); subject.onNext(2); subject.onNext(3); // 此时会有IllegalStateException,由于只能订阅一次,不能重复订阅 subject.subscribe(observer); subject.onNext(4); subject.onNext(5); subject.onComplete(); // 此时会有IllegalStateException,由于只能被订阅一次,不能重复订阅 subject.subscribe(observer);
输出:
-------------------------------- --> onSubscribe --> onNext = 1 --> onNext = 2 --> onNext = 3 -------------------------------- --> onSubscribe --> onError: java.lang.IllegalStateException: Only a single observer allowed. --> onNext = 4 --> onNext = 5 --> onComplete -------------------------------- --> onSubscribe --> onError: java.lang.IllegalStateException: Only a single observer allowed.
Javadoc: UnicastSubject
在并发状况下,不推荐使用一般的Subject对象,此时会产生屡次调用产生一系列不可控的问题,而是推荐使用 SerializedSubject
,并发时只容许一个线程调用onNext等方法,将Subject 串行化 后,全部其余的Observable和Subject方法都是线程安全的。
注意: 在Rxjava2 中 SerializedSubject
是一个不公开(不是public)的类型,意味着不能够直接建立使用,可是能够经过Subject.toSerialized()方法将Subject对象串行化保证其线程安全。同时也提供了 SerializedObserver,SerializedSubscriber等来包装对象成为串行化对象。
实例代码:
// 建立Subject ReplaySubject<String> subject = ReplaySubject.create(); // 经过toSerialized()进行串行化 Subject<String> serialized = subject.toSerialized(); // 订阅 serialized.subscribe(new Consumer<String>() { @Override public void accept(String t) throws Exception { System.out.println("--> accept: " + t + ", ReceiverThreadID: " + Thread.currentThread().getId()); } }); // 多线程执行 for (int i = 0; i < 10; i++) { final int value = i + 1; new Thread(new Runnable() { @Override public void run() { serialized.onNext(value + "-SendThreadID: " + Thread.currentThread().getId()); } }).start(); } System.in.read(); System.out.println("---------------------------------------------------------------------"); // 建立一个 SerializedObserver来进行串行化,保证线程安全 // 注意:只保证同时只有一个线程调用 onNext, onCompleted, onError方法,并非将全部emit的值放到一个线程上而后处理 SerializedObserver<String> observer = new SerializedObserver<String>(new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe"); } @Override public void onNext(String t) { System.out.println("--> onNext: " + t + ", ReceiverThreadID: " + Thread.currentThread().getId()); } @Override public void onError(Throwable e) { System.out.println("--> onError"); } @Override public void onComplete() { System.out.println("--> onComplete"); } }); // 订阅 subject.subscribe(observer); // 多线程执行 for (int i = 0; i < 10; i++) { final int value = i + 1; new Thread(new Runnable() { @Override public void run() { subject.onNext(value + "-SendThreadID: " + Thread.currentThread().getId()); // if (value == 10) { // subject.onComplete(); // } } }).start(); } System.in.read();
输出:
--> accept: 1-SendThreadID: 11, ReceiverThreadID: 11 --> accept: 2-SendThreadID: 12, ReceiverThreadID: 11 --> accept: 10-SendThreadID: 20, ReceiverThreadID: 11 --> accept: 9-SendThreadID: 19, ReceiverThreadID: 11 --> accept: 8-SendThreadID: 18, ReceiverThreadID: 11 --> accept: 7-SendThreadID: 17, ReceiverThreadID: 11 --> accept: 6-SendThreadID: 16, ReceiverThreadID: 11 --> accept: 4-SendThreadID: 14, ReceiverThreadID: 11 --> accept: 5-SendThreadID: 15, ReceiverThreadID: 11 --> accept: 3-SendThreadID: 13, ReceiverThreadID: 11 --------------------------------------------------------------------- --> onSubscribe --> onNext: 1-SendThreadID: 11, ReceiverThreadID: 11 --> onNext: 3-SendThreadID: 13, ReceiverThreadID: 11 --> onNext: 4-SendThreadID: 14, ReceiverThreadID: 11 --> onNext: 5-SendThreadID: 15, ReceiverThreadID: 11 --> onNext: 6-SendThreadID: 16, ReceiverThreadID: 16 --> onNext: 7-SendThreadID: 17, ReceiverThreadID: 16 --> onNext: 8-SendThreadID: 18, ReceiverThreadID: 16 --> onNext: 9-SendThreadID: 19, ReceiverThreadID: 16 --> onNext: 10-SendThreadID: 20, ReceiverThreadID: 16
Rxjava2 中已经取消了TestSubject,使用TestScheduler
和TestObserver
替代,下面主要以 TestObserver 为例进行介绍。
TestObserver 是一个一个记录事件并容许对其进行断言的观察者,多用于测试场合。通常能够建立一个TestObserver
对象或者从Observable 或者 Subject 中直接调用 test()
方法获取。
实例代码:
// Observable Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onNext(100); emitter.onError(new NullPointerException()); emitter.onComplete(); } }); // 1. 建立TestObserver对象 TestObserver<Integer> testObserver = TestObserver.create(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("--> onSubscribe:"); } @Override public void onNext(Integer t) { System.out.println("--> onNext: " + t); } @Override public void onError(Throwable e) { System.out.println("--> onError: " + e); } @Override public void onComplete() { System.out.println("--> onComplete:"); } }); observable.subscribe(testObserver); try { // 断言是否为收到订阅,可是没有事件发送 testObserver.assertEmpty(); // 断言是否收到onComplete() testObserver.assertComplete(); // 断言没有数据100发送 testObserver.assertNever(100); // 断言接收数据结果 testObserver.assertResult(1, 2, 3); // 断言异常 testObserver.assertError(NullPointerException.class); ... 更多请参考Api } catch (Error e) { System.out.println("Error: " + e); } System.out.println("-----------------------------------------------"); // Subject AsyncSubject<Object> subject = AsyncSubject.create(); // 2. 从Observable或者Subject中获取TestObserver对象 TestObserver<Integer> test = observable.test(); TestObserver<Object> test2 = subject.test(); System.out.println(test.values()); // received onNext values try { // 断言是否为收到订阅,可是没有事件发送 test.assertEmpty(); test2.assertEmpty(); // 断言是否收到onComplete() test.assertComplete(); // 断言没有数据100发送 test.assertNever(100); // 断言接收数据结果 test.assertResult(1, 2, 3); // 断言异常 test.assertError(NullPointerException.class); ... 更多请参考Api } catch (Error e) { System.out.println("Error: " + e); }
输出(当出现断言不匹配的状况,会有相应Error抛出):
--> onSubscribe: --> onNext: 1 --> onNext: 2 --> onNext: 3 --> onNext: 100 --> onError: java.lang.NullPointerException Error: java.lang.AssertionError: Value counts differ; expected: 0 but was: 4 (latch = 0, values = 4, errors = 1, completions = 0) ----------------------------------------------- [1, 2, 3, 100] Error: java.lang.AssertionError: Value counts differ; expected: 0 but was: 4 (latch = 0, values = 4, errors = 1, completions = 0)
Javadoc: TestObserver
Process
和 Subject
的做用和使用相同。Process 是 Rxjava2 中的新功能,它是一个接口,继承自 Subscriber、Publish。与Subject 最大的区别是 Process 支持背压,关于背压,后续将会有专题文章来作详细介绍。
若是你想给Observable操做符链添加多线程功能,你能够指定操做符(或者特定的 Observable)在特定的调度器(Scheduler
)上执行。
某些ReactiveX的Observable操做符有一些变体,它们能够接受一个Scheduler
参数。这个参数指定操做符将它们的部分或所有任务放在一个特定的调度器上执行。
使用ObserveOn和SubscribeOn操做符,你可让Observable在一个特定的调度器上执行, ObserveOn指示一个Observable在一个特定的调度器上调用观察者的onNext, onError和 onCompleted方法,SubscribeOn更进一步,它指示Observable将所有的处理过程(包括发射数据和通知)放在特定的调度器上执行。
调度器的种类
下表展现了RxJava中可用的调度器种类:
调度器类型 | 做用 |
---|---|
Schedulers.computation() | 用于计算任务,如事件循环或和回调处理,不要用于IO操做(IO操做请使用Schedulers.io()),默认线程数等于处理器的数量 。 |
Schedulers.from(executor) | 使用指定的Executor做为调度器。 |
Schedulers.trampoline() | 调度在当前线程上工做,但不当即执行。当其它排队的任务完成后,在当前线程排队开始执行。 |
Schedulers.io() | 用于IO密集型任务,如异步阻塞IO操做,这个调度器的线程池会根据须要增加;对于普通的计算任务,请使用 Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器。 |
Schedulers.newThread() | 为每一个任务建立一个新线程 |
Schedulers.single() | 一个默认的、共享的、单线程支持的调度器实例,用于须要在同一后台线程上强顺序执行的工做。 |
关于Rxjava中的线程模型、线程转换操做、调度器的使用等后面会有专题文章来详细介绍。
本章主要介绍了Rxjava的概念与添加使用依赖、Rxjava中的观察者模式、Observable、Flowable、Subject,Schedule等基础对象的介绍与使用,应该能够对Rxjava的概念及基本对象有了基本的认识和了解,以及简单的上手使用。
有关Rxjava2 的其余相关部份内容,后续将有系列的文章来介绍,请关注上面的实时文章目录。