在这篇文章中,咱们会先分析一下 RxJava2 中的 Subject ;而后,咱们会使用 Subject 制做一个相似于 EventBus 的全局的通讯工具。git
在了解本篇文章的内容以前,你须要先了解 RxJava2 中的一些基本的用法,好比 Observable 以及背压的概念,你能够参考个人其余两篇文章来获取这部份内容:《RxJava2 系列 (1):一篇的比较全面的 RxJava2 方法总结》和《RxJava2 系列 (2):背压和Flowable》。github
Subject 能够同时表明 Observer 和 Observable,容许从数据源中屡次发送结果给多个观察者。除了 onSubscribe(), onNext(), onError() 和 onComplete() 以外,全部的方法都是线程安全的。此外,你还可使用 toSerialized() 方法,也就是转换成串行的,将这些方法设置成线程安全的。缓存
若是你已经了解了 Observable 和 Observer ,那么也许直接看 Subject 的源码定义会更容易理解:安全
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
// ...
}
复制代码
从上面看出,Subject 同时继承了 Observable 和 Observer 两个接口,说明它既是被观察的对象,同时又是观察对象,也就是能够生产、能够消费、也能够本身生产本身消费。因此,咱们能够项下面这样来使用它。这里咱们用到的是该接口的一个实现 PublishSubject :bash
public static void main(String...args) {
PublishSubject<Integer> subject = PublishSubject.create();
subject.subscribe(System.out::println);
Executor executor = Executors.newFixedThreadPool(5);
Disposable disposable = Observable.range(1, 5).subscribe(i ->
executor.execute(() -> {
try {
Thread.sleep(i * 200);
subject.onNext(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
}
复制代码
根据程序的执行结果,程序在第200, 400, 600, 800, 1000毫秒依次输出了1到5的数字。ide
在这里,咱们用 PublishSubject 建立了一个主题并对其监听,而后在线程当中又通知该主题内容变化,整个过程咱们都只操做了 PublishSubject 一个对象。显然,使用 Subject 咱们能够达到对一个指定类型的值的结果进行监听的目的——咱们把值改变以后对应的逻辑写在 subscribe() 方法中,而后每次调用 onNext() 等方法通知结果以后就能够自动调用 subscribe() 方法进行更新操做。工具
同时,由于 Subject 实现了 Observer 接口,而且在 Observable 等的 subscribe() 方法中存在一个以 Observer 做为参数的方法(以下),因此,Subject 也是能够做为消费者来对事件进行消费的。post
public final void subscribe(Observer<? super T> observer)
复制代码
以上就是 Subject 的两个主要的特性。测试
在 RxJava2 ,Subject 有几个默认的实现,下面咱们对它们之间的区别作简单的说明:this
AsyncSubject
:只有当 Subject 调用 onComplete 方法时,才会将 Subject 中的最后一个事件传递给全部的 Observer。好比,在下面的例子中,虽然在发送 "two" 的时候,observer 就进行了订阅,可是只有当 subject 调用了 onComplete() 方法的时候,observer 才收到了 "three" 这一个事件:
AsyncSubject<String> subject = AsyncSubject.create();
subject.onNext("one");
subject.onNext("two");
subject.subscribe(observer);
subject.onNext("three");
subject.onComplete();
复制代码
BehaviorSubject
:在建立 BehaviorSuject 的时候能够经过静态的工厂方法指定一个默认值数,也能够不指定。当一个 Observer 使用了 subscribe() 方法对其进行订阅的时候,它只能收到在订阅以前发送出的最后一个结果(或者说最新的值),在这以前的结果是没法被接收到的。好比,下面的例子中,新注册的 observer 只能接收到 "one", "two" 和 "three",可是没法接收到 "zero":
BehaviorSubject<Object> subject = BehaviorSubject.create();
subject.onNext("zero");
subject.onNext("one");
subject.subscribe(observer);
subject.onNext("two");
subject.onNext("three");
复制代码
PublishSubject
:不会改变事件的发送顺序;在已经发送了一部分事件以后注册的 Observer 不会收到以前发送的事件。好比,在下面的代码中,observer1 会收到全部的 onNext() 和 onComplete() 发出的结果,可是 observer2 只能收到 "three" 和最终的 onComplete():
PublishSubject<Object> subject = PublishSubject.create();
// observer1 进行订阅
subject.subscribe(observer1);
subject.onNext("one");
subject.onNext("two");
// observer2 进行订阅
subject.subscribe(observer2);
subject.onNext("three");
subject.onComplete();
复制代码
ReplaySubject
:不管何时注册 Observer 均可以接收到任什么时候候经过该 Observable 发射的事件。好比,在下面的代码中,observer1 和 observer2 能够收到在它们进行订阅以前的全部的 onNext() 和 onCompete() 事件:
ReplaySubject<Object> subject = ReplaySubject.create();
subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
subject.onComplete();
// observer1 和 observer2 进行订阅
subject.subscribe(observer1);
subject.subscribe(observer2);
复制代码
UnicastSubject
:只容许一个 Observer 进行监听,在该 Observer 注册以前会将发射的全部的事件放进一个队列中,并在 Observer 注册的时候一块儿通知给它。好比,在下面的例子中,当 observer1 进行订阅的时候,会将 "one" "two" "three" 依次发送给 observer1,而当 observer2 进行订阅的时候会抛出一个异常,由于只能有一个观察者能够订阅:
UnicastSubject<String> subject = UnicastSubject.create();
subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
subject.subscribe(observer1);
subject.subscribe(observer2);
复制代码
对比 PublishSubject 和 ReplaySubject,它们的区别在于新注册的 Observer 是否可以收到在它注册以前发送的事件。这个相似于 EventBus 中的 StickyEvent 即黏性事件,为了说明这一点,咱们准备了下面两段代码:
private static void testPublishSubject() throws InterruptedException {
PublishSubject<Integer> subject = PublishSubject.create();
subject.subscribe(i -> System.out.print("(1: " + i + ") "));
Executor executor = Executors.newFixedThreadPool(5);
Disposable disposable = Observable.range(1, 5).subscribe(i -> executor.execute(() -> {
try {
Thread.sleep(i * 200);
subject.onNext(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
Thread.sleep(500);
subject.subscribe(i -> System.out.print("(2: " + i + ") "));
Observable.timer(2, TimeUnit.SECONDS).subscribe(i -> ((ExecutorService) executor).shutdown());
}
private static void testReplaySubject() throws InterruptedException {
ReplaySubject<Integer> subject = ReplaySubject.create();
subject.subscribe(i -> System.out.print("(1: " + i + ") "));
Executor executor = Executors.newFixedThreadPool(5);
Disposable disposable = Observable.range(1, 5).subscribe(i -> executor.execute(() -> {
try {
Thread.sleep(i * 200);
subject.onNext(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
Thread.sleep(500);
subject.subscribe(i -> System.out.print("(2: " + i + ") "));
Observable.timer(2, TimeUnit.SECONDS).subscribe(i -> ((ExecutorService) executor).shutdown());
}
复制代码
它们的输出结果依次是
PublishSubject的结果:(1: 1) (1: 2) (1: 3) (2: 3) (1: 4) (2: 4) (1: 5) (2: 5)
ReplaySubject的结果: (1: 1) (1: 2) (2: 1) (2: 2) (1: 3) (2: 3) (1: 4) (2: 4) (1: 5) (2: 5)
复制代码
从上面的结果对比中,咱们能够看出前者与后者的区别在于新注册的 Observer 并无收到在它注册以前发送的事件。试验的结果与上面的叙述是一致的。
其余的测试代码这不一并给出了,详细的代码能够参考Github - Java Advanced。
清楚了 Subject 的概念以后,让咱们来作一个实践——用 RxJava 打造 EventBus。
咱们先考虑用一个全局的 PublishSubject 来解决这个问题,固然,这意味着咱们发送的事件不是黏性事件。不过,不要紧,只要这种实现方式搞懂了,用 ReplaySubject 作一个发送黏性事件的 EventBus 也非难事。
考虑一下,若是要实现这个功能咱们须要作哪些准备:
好了,首先是全局的 Subject 的问题,咱们能够实现一个静态的或者单例的 Subject。这里咱们选择使用后者,因此,咱们须要一个单例的方式来使用 Subject:
public class RxBus {
private static volatile RxBus rxBus;
private final Subject<Object> subject = PublishSubject.create().toSerialized();
public static RxBus getRxBus() {
if (rxBus == null) {
synchronized (RxBus.class) {
if(rxBus == null) {
rxBus = new RxBus();
}
}
}
return rxBus;
}
复制代码
}
这里咱们应用了 DCL 的单例模式提供一个单例的 RxBus,对应一个惟一的 Subject. 这里咱们用到了 Subject 的toSerialized()
,咱们上面已经提到过它的做用,就是用来保证 onNext() 等方法的线程安全性。
另外,由于 Observalbe 自己是不支持背压的,因此,咱们还须要将该 Observable 转换成 Flowable 来实现背压的效果:
public <T> Flowable<T> getObservable(Class<T> type){
return subject.toFlowable(BackpressureStrategy.BUFFER).ofType(type);
}
复制代码
这里咱们用到的背压的策略是BackpressureStrategy.BUFFER
,它会缓存发射结果,直到有消费者订阅了它。而这里的ofType()
方法的做用是用来过滤发射的事件的类型,只有指定类型的事件会被发布。
而后,咱们须要记录订阅者的信息以便在适当的时机取消订阅,这里咱们用一个Map<String, CompositeDisposable>
类型的哈希表来解决。这里的CompositeDisposable
用来存储 Disposable,从而达到一个订阅者对应多个 Disposable 的目的。CompositeDisposable
是一个 Disposable 的容器,声称能够达到 O(1) 的增、删的复杂度。这里的作法目的是使用注册观察以后的 Disposable 的 dispose() 方法来取消订阅。因此,咱们能够获得下面的这段代码:
public void addSubscription(Object o, Disposable disposable) {
String key = String.valueOf(o.hashCode());
if (disposableMap.get(key) != null) {
disposableMap.get(key).add(disposable);
} else {
CompositeDisposable disposables = new CompositeDisposable();
disposables.add(disposable);
disposableMap.put(key, disposables);
}
}
public void unSubscribe(Object o) {
String key = String.valueOf(o.hashCode());
if (!disposableMap.containsKey(key)){
return;
}
if (disposableMap.get(key) != null) {
disposableMap.get(key).dispose();
}
disposableMap.remove(key);
}
复制代码
最后,对外提供一下 Subject 的订阅和发布方法,整个 EventBus 就制做完成了:
public void post(Object o){
subject.onNext(o);
}
public <T> Disposable doSubscribe(Class<T> type, Consumer<T> next, Consumer<Throwable> error){
return getObservable(type)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(next,error);
}
复制代码
咱们只须要在最顶层的 Activity 基类中加入以下的代码。这样,咱们就不须要在各个 Activity 中取消注册了。而后,就可使用这些顶层的方法来进行操做了。
protected void postEvent(Object object) {
RxBus.getRxBus().post(object);
}
protected <M> void addSubscription(Class<M> eventType, Consumer<M> action) {
Disposable disposable = RxBus.getRxBus().doSubscribe(eventType, action, LogUtils::d);
RxBus.getRxBus().addSubscription(this, disposable);
}
protected <M> void addSubscription(Class<M> eventType, Consumer<M> action, Consumer<Throwable> error) {
Disposable disposable = RxBus.getRxBus().doSubscribe(eventType, action, error);
RxBus.getRxBus().addSubscription(this, disposable);
}
@Override
protected void onDestroy() {
super.onDestroy();
RxBus.getRxBus().unSubscribe(this);
}
复制代码
在第一个 Activity 中咱们对指定的类型的结果进行监听:
addSubscription(RxMessage.class, rxMessage -> ToastUtils.makeToast(rxMessage.message));
复制代码
而后,咱们在另外一个 Activity 中发布事件:
postEvent(new RxMessage("Hello world!"));
复制代码
这样当第二个 Activity 中调用指定的发送事件的方法以后,第一个 Activity 就能够接收到发射的事件了。
好了,以上就是 Subject 的使用,若是要用一个词来形容它的话,那么只能是“自给自足”了。就是说,它同时作了 Observable 和 Observer 的工做,既能够发射事件又能够对事件进行消费,可谓身兼数职。它在那种想要对某个值进行监听并处理的情形特别有用。由于它不须要你写多个冗余的类,只要它一个就完成了其余两个类来完成的任务,于是代码更加简洁。
RxJava 系列文章:
更正 RxBus 中的 addSubscription()
和 unSubscribe()
两个方法,在以前的版本中使用传入的 Object 的类名做为哈希表的键,现改成使用 Object 的哈希码做为哈希表的键:使用类名的时候存在一个问题,即若是在 Fragment 中使用 RxBus,而且同一类型的 Fragment 在多个地方使用,会致使其中一个 Fragment 取消订阅的时候,全部同一类型的 Fragment 都取消订阅。