对于Android系统来讲,消息传递是最基本的组件,每个App内的不一样页面,不一样组件都在进行消息传递。消息传递既能够用于Android四大组件之间的通讯,也可用于异步线程和主线程之间的通讯。对于Android开发者来讲,常用的消息传递方式有不少种,从最先使用的Handler、BroadcastReceiver、接口回调,到近几年流行的通讯总线类框架EventBus、RxBus。Android消息传递框架,总在不断的演进之中。java
EventBus是一个Android事件发布/订阅框架,经过解耦发布者和订阅者简化Android事件传递。EventBus能够代替Android传统的Intent、Handler、Broadcast或接口回调,在Fragment、Activity、Service线程之间传递数据,执行方法。android
EventBus最大的特色就是:简洁、解耦。在没有EventBus以前咱们一般用广播来实现监听,或者自定义接口函数回调,有的场景咱们也能够直接用Intent携带简单数据,或者在线程之间经过Handler处理消息传递。但不管是广播仍是Handler机制远远不能知足咱们高效的开发。EventBus简化了应用程序内各组件间、组件与后台线程间的通讯。EventBus一经推出,便受到广大开发者的推崇。git
如今看来,EventBus给Android开发者世界带来了一种新的框架和思想,就是消息的发布和订阅。这种思想在其后不少框架中都获得了应用。github
订阅发布模式定义了一种“一对多”的依赖关系,让多个订阅者对象同时监听某一个主题对象。这个主题对象在自身状态变化时,会通知全部订阅者对象,使它们可以自动更新本身的状态。编程
RxBus不是一个库,而是一个文件,实现只有短短30行代码。RxBus自己不须要过多分析,它的强大彻底来自于它基于的RxJava技术。响应式编程(Reactive Programming)技术这几年特别火,RxJava是它在Java上的实做。RxJava天生就是发布/订阅模式,并且很容易处理线程切换。因此,RxBus凭借区区30行代码,就敢挑战EventBus江湖老大的地位。架构
在RxJava中有个Subject类,它继承Observable类,同时实现了Observer接口,所以Subject能够同时担当订阅者和被订阅者的角色,咱们使用Subject的子类PublishSubject来建立一个Subject对象(PublishSubject只有被订阅后才会把接收到的事件马上发送给订阅者),在须要接收事件的地方,订阅该Subject对象,以后若是Subject对象接收到事件,则会发射给该订阅者,此时Subject对象充当被订阅者的角色。app
完成了订阅,在须要发送事件的地方将事件发送给以前被订阅的Subject对象,则此时Subject对象做为订阅者接收事件,而后会马上将事件转发给订阅该Subject对象的订阅者,以便订阅者处理相应事件,到这里就完成了事件的发送与处理。框架
最后就是取消订阅的操做了,RxJava中,订阅操做会返回一个Subscription对象,以便在合适的时机取消订阅,防止内存泄漏,若是一个类产生多个Subscription对象,咱们能够用一个CompositeSubscription存储起来,以进行批量的取消订阅。异步
RxBus有不少实现,如:ide
AndroidKnife/RxBus(https://github.com/AndroidKnife/RxBus) Blankj/RxBus(https://github.com/Blankj/RxBus)
其实正如前面所说的,RxBus的原理是如此简单,咱们本身均可以写出一个RxBus的实现:
public final class RxBus {
private final Subject<Object, Object> bus;
private RxBus() {
bus = new SerializedSubject<>(PublishSubject.create());
}
private static class SingletonHolder {
private static final RxBus defaultRxBus = new RxBus();
}
public static RxBus getInstance() {
return SingletonHolder.defaultRxBus;
}
/* * 发送 */
public void post(Object o) {
bus.onNext(o);
}
/* * 是否有Observable订阅 */
public boolean hasObservable() {
return bus.hasObservers();
}
/* * 转换为特定类型的Obserbale */
public <T> Observable<T> toObservable(Class<T> type) {
return bus.ofType(type);
}
}
复制代码
public final class RxBus2 {
private final Subject<Object> bus;
private RxBus2() {
// toSerialized method made bus thread safe
bus = PublishSubject.create().toSerialized();
}
public static RxBus2 getInstance() {
return Holder.BUS;
}
private static class Holder {
private static final RxBus2 BUS = new RxBus2();
}
public void post(Object obj) {
bus.onNext(obj);
}
public <T> Observable<T> toObservable(Class<T> tClass) {
return bus.ofType(tClass);
}
public Observable<Object> toObservable() {
return bus;
}
public boolean hasObservers() {
return bus.hasObservers();
}
}
复制代码
LiveData是Android Architecture Components提出的框架。LiveData是一个能够被观察的数据持有类,它能够感知并遵循Activity、Fragment或Service等组件的生命周期。正是因为LiveData对组件生命周期可感知特色,所以能够作到仅在组件处于生命周期的激活状态时才更新UI数据。
LiveData须要一个观察者对象,通常是Observer类的具体实现。当观察者的生命周期处于STARTED或RESUMED状态时,LiveData会通知观察者数据变化;在观察者处于其余状态时,即便LiveData的数据变化了,也不会通知。
Android Architecture Components的核心是Lifecycle、LiveData、ViewModel 以及 Room,经过它能够很是优雅的让数据与界面进行交互,并作一些持久化的操做,高度解耦,自动管理生命周期,并且不用担忧内存泄漏的问题。
public final class LiveDataBus {
private final Map<String, MutableLiveData<Object>> bus;
private LiveDataBus() {
bus = new HashMap<>();
}
private static class SingletonHolder {
private static final LiveDataBus DATA_BUS = new LiveDataBus();
}
public static LiveDataBus get() {
return SingletonHolder.DATA_BUS;
}
public <T> MutableLiveData<T> getChannel(String target, Class<T> type) {
if (!bus.containsKey(target)) {
bus.put(target, new MutableLiveData<>());
}
return (MutableLiveData<T>) bus.get(target);
}
public MutableLiveData<Object> getChannel(String target) {
return getChannel(target, Object.class);
}
}
复制代码
短短二十行代码,就实现了一个通讯总线的所有功能,而且还具备生命周期感知功能,而且使用起来也及其简单:
注册订阅:
LiveDataBus.get().getChannel("key_test", Boolean.class)
.observe(this, new Observer<Boolean>() {
@Override
public void onChanged(@Nullable Boolean aBoolean) {
}
});
复制代码
发送消息:
LiveDataBus.get().getChannel("key_test").setValue(true);
复制代码
咱们发送了一个名为"key_test",值为true的事件。 这个时候订阅者就会收到消息,并做相应的处理,很是简单。
对于LiveDataBus的初版实现,咱们发现,在使用这个LiveDataBus的过程当中,订阅者会收到订阅以前发布的消息。对于一个消息总线来讲,这是不可接受的。不管EventBus或者RxBus,订阅方都不会收到订阅以前发出的消息。对于一个消息总线,LiveDataBus必需要解决这个问题。
怎么解决这个问题呢?先分析下缘由:
当LifeCircleOwner的状态发生变化的时候,会调用LiveData.ObserverWrapper的activeStateChanged函数,若是这个时候ObserverWrapper的状态是active,就会调用LiveData的dispatchingValue。
在LiveData的dispatchingValue中,又会调用LiveData的considerNotify方法。
在LiveData的considerNotify方法中,红框中的逻辑是关键,若是ObserverWrapper的mLastVersion小于LiveData的mVersion,就会去回调mObserver的onChanged方法。而每一个新的订阅者,其version都是-1,LiveData一旦设置过其version是大于-1的(每次LiveData设置值都会使其version加1),这样就会致使LiveDataBus每注册一个新的订阅者,这个订阅者马上会收到一个回调,即便这个设置的动做发生在订阅以前。
对于这个问题,总结一下发生的核心缘由。对于LiveData,其初始的version是-1,当咱们调用了其setValue或者postValue,其vesion会+1;对于每个观察者的封装ObserverWrapper,其初始version也为-1,也就是说,每个新注册的观察者,其version为-1;当LiveData设置这个ObserverWrapper的时候,若是LiveData的version大于ObserverWrapper的version,LiveData就会强制把当前value推送给Observer。
明白了问题产生的缘由以后,咱们来看看怎么才能解决这个问题。很显然,根据以前的分析,只须要在注册一个新的订阅者的时候把Wrapper的version设置成跟LiveData的version一致便可。
那么怎么实现呢,看看LiveData的observe方法,他会在步骤1建立一个LifecycleBoundObserver,LifecycleBoundObserver是ObserverWrapper的派生类。而后会在步骤2把这个LifecycleBoundObserver放入一个私有Map容器mObservers中。不管ObserverWrapper仍是LifecycleBoundObserver都是私有的或者包可见的,因此没法经过继承的方式更改LifecycleBoundObserver的version。
那么能不能从Map容器mObservers中取到LifecycleBoundObserver,而后再更改version呢?答案是确定的,经过查看SafeIterableMap的源码咱们发现有一个protected的get方法。所以,在调用observe的时候,咱们能够经过反射拿到LifecycleBoundObserver,再把LifecycleBoundObserver的version设置成和LiveData一致便可。
对于非生命周期感知的observeForever方法来讲,实现的思路是一致的,可是具体的实现略有不一样。observeForever的时候,生成的wrapper不是LifecycleBoundObserver,而是AlwaysActiveObserver(步骤1),并且咱们也没有机会在observeForever调用完成以后再去更改AlwaysActiveObserver的version,由于在observeForever方法体内,步骤3的语句,回调就发生了。
那么对于observeForever,如何解决这个问题呢?既然是在调用内回调的,那么咱们能够写一个ObserverWrapper,把真正的回调给包装起来。把ObserverWrapper传给observeForever,那么在回调的时候咱们去检查调用栈,若是回调是observeForever方法引发的,那么就不回调真正的订阅者。
public final class LiveDataBus {
private final Map<String, BusMutableLiveData<Object>> bus;
private LiveDataBus() {
bus = new HashMap<>();
}
private static class SingletonHolder {
private static final LiveDataBus DEFAULT_BUS = new LiveDataBus();
}
public static LiveDataBus get() {
return SingletonHolder.DEFAULT_BUS;
}
public <T> MutableLiveData<T> with(String key, Class<T> type) {
if (!bus.containsKey(key)) {
bus.put(key, new BusMutableLiveData<>());
}
return (MutableLiveData<T>) bus.get(key);
}
public MutableLiveData<Object> with(String key) {
return with(key, Object.class);
}
private static class ObserverWrapper<T> implements Observer<T> {
private Observer<T> observer;
public ObserverWrapper(Observer<T> observer) {
this.observer = observer;
}
@Override
public void onChanged(@Nullable T t) {
if (observer != null) {
if (isCallOnObserve()) {
return;
}
observer.onChanged(t);
}
}
private boolean isCallOnObserve() {
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
if (stackTrace != null && stackTrace.length > 0) {
for (StackTraceElement element : stackTrace) {
if ("android.arch.lifecycle.LiveData".equals(element.getClassName()) &&
"observeForever".equals(element.getMethodName())) {
return true;
}
}
}
return false;
}
}
private static class BusMutableLiveData<T> extends MutableLiveData<T> {
private Map<Observer, Observer> observerMap = new HashMap<>();
@Override
public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<T> observer) {
super.observe(owner, observer);
try {
hook(observer);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void observeForever(@NonNull Observer<T> observer) {
if (!observerMap.containsKey(observer)) {
observerMap.put(observer, new ObserverWrapper(observer));
}
super.observeForever(observerMap.get(observer));
}
@Override
public void removeObserver(@NonNull Observer<T> observer) {
Observer realObserver = null;
if (observerMap.containsKey(observer)) {
realObserver = observerMap.remove(observer);
} else {
realObserver = observer;
}
super.removeObserver(realObserver);
}
private void hook(@NonNull Observer<T> observer) throws Exception {
//get wrapper's version
Class<LiveData> classLiveData = LiveData.class;
Field fieldObservers = classLiveData.getDeclaredField("mObservers");
fieldObservers.setAccessible(true);
Object objectObservers = fieldObservers.get(this);
Class<?> classObservers = objectObservers.getClass();
Method methodGet = classObservers.getDeclaredMethod("get", Object.class);
methodGet.setAccessible(true);
Object objectWrapperEntry = methodGet.invoke(objectObservers, observer);
Object objectWrapper = null;
if (objectWrapperEntry instanceof Map.Entry) {
objectWrapper = ((Map.Entry) objectWrapperEntry).getValue();
}
if (objectWrapper == null) {
throw new NullPointerException("Wrapper can not be bull!");
}
Class<?> classObserverWrapper = objectWrapper.getClass().getSuperclass();
Field fieldLastVersion = classObserverWrapper.getDeclaredField("mLastVersion");
fieldLastVersion.setAccessible(true);
//get livedata's version
Field fieldVersion = classLiveData.getDeclaredField("mVersion");
fieldVersion.setAccessible(true);
Object objectVersion = fieldVersion.get(this);
//set wrapper's version
fieldLastVersion.set(objectWrapper, objectVersion);
}
}
}
复制代码
LiveDataBus.get()
.with("key_test", String.class)
.observe(this, new Observer<String>() {
@Override
public void onChanged(@Nullable String s) {
}
});
复制代码
LiveDataBus.get().with("key_test").setValue(s);
复制代码
LiveDataBus的源码能够直接拷贝使用,也能够前往做者的GitHub仓库查看下载: https://github.com/JeremyLiao/LiveDataBus
本文提供了一个新的消息总线框架——LiveDataBus。订阅者能够订阅某个消息通道的消息,发布者能够把消息发布到消息通道上。利用LiveDataBus,不只能够实现消息总线功能,并且对于订阅者,他们不须要关心什么时候取消订阅,极大减小了由于忘记取消订阅形成的内存泄漏风险。
海亮,美团高级工程师,2017年加入美团,目前主要负责美团轻收银、美团收银零售版等App的相关业务及模块开发工做。