RxJava Subject

 

 

Subject

Subject能够当作是一个桥梁或者代理,在某些ReactiveX实现中(如RxJava),它同时充当了Observer和Observable的角色。由于它是一个Observer,它能够订阅一个或多个Observable;又由于它是一个Observable,它能够转发它收到(Observe)的数据,也能够发射新的数据。html

因为一个Subject订阅一个Observable,它能够触发这个Observable开始发射数据(若是那个Observable是"冷"的--就是说,它等待有订阅才开始发射数据)。所以有这样的效果,Subject能够把原来那个"冷"的Observable变成"热"的。java

Subject的种类

针对不一样的场景一共有四种类型的Subject。他们并非在全部的实现中所有都存在,并且一些实现使用其它的命名约定(例如,在RxScala中Subject被称做PublishSubject)。react

AsyncSubject

一个AsyncSubject只在原始Observable完成后,发射来自原始Observable的最后一个值。(若是原始Observable没有发射任何值,AsyncObject也不发射任何值)它会把这最后一个值发射给任何后续的观察者。 缓存

然而,若是原始的Observable由于发生了错误而终止,AsyncSubject将不会发射任何数据,只是简单的向前传递这个错误通知。 async

BehaviorSubject

当观察者订阅BehaviorSubject时,它开始发射原始Observable最近发射的数据(若是此时尚未收到任何数据,它会发射一个默认值),而后继续发射其它任何来自原始Observable的数据。 spa

然而,若是原始的Observable由于发生了一个错误而终止,BehaviorSubject将不会发射任何数据,只是简单的向前传递这个错误通知。 线程

PublishSubject

PublishSubject只会把在订阅发生的时间点以后来自原始Observable的数据发射给观察者。须要注意的是,PublishSubject可能会一建立完成就马上开始发射数据(除非你能够阻止它发生),所以这里有一个风险:在Subject被建立后到有观察者订阅它以前这个时间段内,一个或多个数据可能会丢失。若是要确保来自原始Observable的全部数据都被分发,你须要这样作:或者使用Create建立那个Observable以便手动给它引入"冷"Observable的行为(当全部观察者都已经订阅时才开始发射数据),或者改用ReplaySubject。 代理

若是原始的Observable由于发生了一个错误而终止,PublishSubject将不会发射任何数据,只是简单的向前传递这个错误通知。 code

ReplaySubject

ReplaySubject会发射全部来自原始Observable的数据给观察者,不管它们是什么时候订阅的。也有其它版本的ReplaySubject,在重放缓存增加到必定大小的时候或过了一段时间后会丢弃旧的数据(原始Observable发射的)。server

若是你把ReplaySubject看成一个观察者使用,注意不要从多个线程中调用它的onNext方法(包括其它的on系列方法),这可能致使同时(非顺序)调用,这会违反Observable协议,给Subject的结果增长了不肯定性。

RxJava的对应类

假设你有一个Subject,你想把它传递给其它的代理或者暴露它的Subscriber接口,你能够调用它的asObservable方法,这个方法返回一个Observable。具体使用方法能够参考Javadoc文档。

串行化

若是你把 Subject 看成一个 Subscriber 使用,注意不要从多个线程中调用它的onNext方法(包括其它的on系列方法),这可能致使同时(非顺序)调用,这会违反Observable协议,给Subject的结果增长了不肯定性。

要避免此类问题,你能够将 Subject 转换为一个 SerializedSubject ,相似于这样:

mySafeSubject = new SerializedSubject( myUnsafeSubject ); 
相关文章
相关标签/搜索