本文是学习了大佬的文章后,本身去动手实践后写的一篇学习笔记。大佬的文章写得比较好,我本身去写未必描述得那么清楚😂,因此本文不少地方都直接引用了大佬的文章。html
源码:https://github.com/LinYaoTian...java
效果图:android
Tip:阅读本文最好对 Jetpack 的 LIfeCycle 和 LiveData 有初步的了解。git
引用:github
对于 Android 系统来讲,消息传递是最基本的组件,每个 App 内的不一样页面,不一样的组件都在进行消息传递。架构
消息传递既能够用于 Android 四大组件以前的通讯,也能够用于异步线程和主线程以前的通讯。并发
对于 Android 开发者来讲,常用的消息传递方式有不少种,从最先的 Handler、BroadcastReceiver 、接口回调,到最近几年的流行的通讯总线类框架 EventBus、RxBus。app
说到 Android 的通讯总线类框架就不得不提到 EventBus 。框架
EventBus 是一个 Android 事件发布/订阅框架,经过解耦发布者和订阅者简化 Android 事件传递。EventBus 能够代替 Android 传统的 Intent、Handler、Broadcast 或接口回调,在 Fragment、Activity、Service 线程之间传递数据,执行方法。异步
EventBus 最大的特色就是:简洁、解耦。
在没有 EventBus 以前咱们一般用广播来实现监听,或者自定义接口函数回调,有的场景咱们也能够直接用Intent携带简单数据,或者在线程之间经过 Handler 处理消息传递。但不管是广播仍是 Handler 机制远远不能知足咱们高效的开发。EventBus 简化了应用程序内各组件间、组件与后台线程间的通讯。EventBus 一经推出,便受到广大开发者的推崇。
如今看来,EventBus 给 Android 开发者世界带来了一种新的框架和思想,就是消息的发布和订阅。 这种思想在其后不少框架中都获得了应用。
订阅发布模式定义了一种“一对多”的依赖关系,让多个订阅者对象同时监听某一个主题对象。这个主题对象在自身状态变化时,会通知全部订阅者对象,使它们可以自动更新本身的状态。
订阅/发布模式和观察者模式二者很是类似,我的以为订阅/发布模式是观察者模式的一种加强版。
二者的区别有:
详情能够看这篇文章:观察者模式 vs 发布-订阅模式
本文是经过 LiveData 去实现 EventBus ,这里先介绍下 LiveData。
LiveData 如同它的名字同样,是一个可观察的数据持有者,和常规的 observable 不一样,LiveData 是能够具备生命周期感知的,这意味着它可以在 Activity、Fragment、Service 中正确的处理生命周期。
实际上 LiveData 并不能单独起做用,它依赖于 Jectpack 的 LifeCycle 组件,由于 LifeCycle 已经在 Activity 的父类中被封装好了,在咱们使用 LiveData 的过程当中是基本是无感知的,因此这里就简单提一下 LifeCycle 好了。
LifeCycle 的做用
想继续了解的同窗能够看下这两篇文章:
下面继续说 LiveData:
LiveData 的数据源通常是 ViewModel,也能够是其余能够更新的 LiveData 组件。
通常咱们使用 LiveData 的 observe()
,当数据更新后,LiveData 会通知它的全部活跃的观察者。与 RxJava 不一样的,LiveData 只会通知活跃的观察者,例如 Activity 位于 Destroyed 状态时是不活跃的,所以不会收到通知。
固然咱们也可使用 LiveData 的 observerForever() 方法进行订阅,区别是 observerForever() 不会受到 Activity 等组件的生命周期的影响,只要数据更新就会收到通知。
LiveData 的简单使用:
public class MainActivity extends AppCompatActivity { private static final String TAG="MainActivity"; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); // 建立 LiveData 对象 MutableLiveData<String> mutableLiveData = new MutableLiveData<>(); // 开始订阅 mutableLiveData.observe(this, new Observer<String>() { @Override public void onChanged(@Nullable final String s) { Log.d(TAG, "onChanged:"+s); } }); // 更新数据,最终会回调上面的 onChange() 方法 mutableLiveData.postValue("Android进阶三部曲"); } }
LiveData 的更多详情能够参考这篇文章Android Jetpack架构组件(四)一文带你了解LiveData(使用篇)
EventBus 是业界知名的通讯类总线库,但它也存在许多被人诟病的缺点:
而经过 LiveData 实现的 LiveDataBus 的具备如下优势:
经过 LiveData 咱们能够很是简单的实现一个事件发布/订阅框架:
public final class LiveDataBus { private final Map<String, MutableLiveData<Object>> bus; private LiveDataBus() { bus = new HashMap<>(); } private static class SingletonHolder { private static final LiveDataBus DATA_BUS = new LiveDataBus(); } public static LiveDataBus get() { return SingletonHolder.DATA_BUS; } public <T> MutableLiveData<T> getChannel(String target, Class<T> type) { if (!bus.containsKey(target)) { bus.put(target, new MutableLiveData<>()); } return (MutableLiveData<T>) bus.get(target); } public MutableLiveData<Object> getChannel(String target) { return getChannel(target, Object.class); } }
没错,上面短短的 27 行代码咱们就实现了一个事件发布/订阅框架!
LiveData 一时使用一时爽,爽完了以后咱们发现这个简易的 LiveDataBus 存在一个问题,就是订阅者会收到订阅以前发布的消息,相似于粘性消息。对于一个消息总线来讲,粘性消息和非粘性消息都是必须支持的,下面咱们来看一下如何解决这个问题。
咱们先看一下为何会出现这个问题:
android.arch.lifecycle.LiveData
@MainThread public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<? super T> observer) { assertMainThread("observe"); // 当前绑定的组件(activity or fragment)状态为DESTROYED的时候, 则会忽视当前的订阅请求 if (owner.getLifecycle().getCurrentState() == DESTROYED) { return; } // 转为带生命周期感知的观察者包装类 LifecycleBoundObserver wrapper = new LifecycleBoundObserver(owner, observer); ObserverWrapper existing = mObservers.putIfAbsent(observer, wrapper); // 对应观察者只能与一个owner绑定 if (existing != null && !existing.isAttachedTo(owner)) { throw new IllegalArgumentException("Cannot add the same observer" + " with different lifecycles"); } if (existing != null) { return; } // lifecycle注册 owner.getLifecycle().addObserver(wrapper); }
能够看到,LiveData 内部会将咱们的传入参数包装成 wrapper ,而后存在一个 Map 中,最后经过 LifeCycle 组件添加观察者。
LiveData 更新数据方式有两个,一个是 setValue() 另外一个是 postValue(),这两个方法的区别是,postValue() 在内部会抛到主线程去执行更新数据,所以适合在子线程中使用;而 setValue() 则是直接更新数据。
这里就只看下 setValue() 方法就行了
android.arch.lifecycle.LiveData
@MainThread protected void setValue(T value) { assertMainThread("setValue"); // 发送版本+1 mVersion++; mData = value; // 信息分发 dispatchingValue(null); }
记住这里的 mVersion,它本问题关键,每次更新数据都会自增,默认值是 -1。而后咱们跟进下 dispatchingValue() 方法:
android.arch.lifecycle.LiveData
void dispatchingValue(@Nullable ObserverWrapper initiator) { // mDispatchingValue的判断主要是为了解决并发调用dispatchingValue的状况 // 当对应数据的观察者在执行的过程当中, 若有新的数据变动, 则不会再次通知到观察者 // 因此观察者内的执行不该进行耗时工做 if (mDispatchingValue) { mDispatchInvalidated = true; return; } mDispatchingValue = true; do { mDispatchInvalidated = false; if (initiator != null) { // 这里 considerNotify(initiator); initiator = null; } else { for (Iterator<Map.Entry<Observer<? super T>, ObserverWrapper>> iterator = mObservers.iteratorWithAdditions(); iterator.hasNext(); ) { // 这里 considerNotify(iterator.next().getValue()); if (mDispatchInvalidated) { break; } } } } while (mDispatchInvalidated); mDispatchingValue = false; }
能够看到,不管条件判断,最终都会执行 considerNotify() 方法,因此咱们继续跟进:
android.arch.lifecycle.LiveData
private void considerNotify(ObserverWrapper observer) { if (!observer.mActive) { return; } if (!observer.shouldBeActive()) { observer.activeStateChanged(false); return; } // 判断 version if (observer.mLastVersion >= mVersion) { return; } observer.mLastVersion = mVersion; //noinspection unchecked observer.mObserver.onChanged((T) mData); }
终于到了最关键的时候了!!若是 ObserverWrapper 的 mLastVersion 小于 LiveData 的 mVersion,那么就会执行的 onChange() 方法去通知观察者数据已更新。
而 ObserverWrapper.mLastVersion 的默认值是 -1, LiveData 只要更新过数据,mVersion 就确定会大于 -1,因此订阅者会立刻收到订阅以前发布的最新消息!!
明白了问题根源所在,咱们就好解决问题了。
解决这个问题的方案有多种,其中美团大佬Android消息总线的演进之路:用LiveDataBus替代RxBus、EventBus 使用的反射的方式修改 LiveData 中的 mVersion 值去实现。还有另外一个方案基于LiveData实现事件总线思路和方案,此方案是基于自定义观察者包装类,由于粘性消息最终会调用到 Observer#onChange() 方法,所以咱们自定义 Observer 包装类,本身维护实际的订阅消息数,来判断是否须要触发真正的 onChange() 方法。本文使用的是第二种方案。
为了业务的拓展性,这里咱们定义先一个 Base 基类:
internal open class BaseBusObserverWrapper<T>(private val mObserver: Observer<in T>, private val mBusLiveData: BusLiveData<T>) : Observer<T> { private val mLastVersion = mBusLiveData.version private val TAG = "BaseBusObserverWrapper" override fun onChanged(t: T?) { Logger.d(TAG,"msg receiver = " + t.toString()) if (mLastVersion >= mBusLiveData.version){ // LiveData 的版本号没有更新过,说明并无新数据,只是由于 // 当前 Observer 的版本号比 LiveData 低致使的调用 onChange() return } try { mObserver.onChanged(t) }catch (e:Exception){ Logger.e(TAG,"error on Observer onChanged() = " + e.message) } } open fun isAttachedTo(owner: LifecycleOwner) = false }
这里咱们保存了 LiveData 的 mVersion 值,每次执行 onChange() 时都先判断一些 LiveData 是否更新过数据,若是没有则不执行观察者的 Observer.onChange() 方法。
而后定义两个子类,BusLifecycleObserver 和 BusAlwaysActiveObserver ,其中 BusLifecycleObserver 用于 LiveData#observer() 方法订阅的事件,而 BusAlwaysActiveObserver 用于 LiveData#observerForever() 方法订阅的事件。
internal class BusLifecycleObserver<T>(private val observer: Observer<in T>, private val owner: LifecycleOwner, private val liveData: BusLiveData<T>) : BaseBusObserverWrapper<T>(observer,liveData),LifecycleObserver{ @OnLifecycleEvent(Lifecycle.Event.ON_DESTROY) fun onDestroy(){ liveData.removeObserver(observer) owner.lifecycle.removeObserver(this) } }
对于 BusLifecycleObserver,在生命周期组件处于 Destroyed 时,须要把观察者移除。
internal class BusAlwaysActiveObserver<T>(private val mObserver: Observer<in T>, private val mBusLiveData: BusLiveData<T>) : BaseBusObserverWrapper<T>(mObserver, mBusLiveData)
对于 BusAlwaysActiveObserver,观察者不受组件生命周期的影响,所以不须要在组件 Destroyed 时移除。
注意:由于 LiveData 的 getVersion() 是包访问级别的!因此 BusLiveData 必须定义到与 LiveData 同一个包内,即 androidx.lifecycle
包下,所以须要你本身建立一个同名的包并将 BusLiveData 放到里面。
若是你的项目工程没有引入 androidx,也可使用 v7 包下的 android.arch.lifecycle
,方法同理,不过要注意的是 android.arch.lifecycle
包下 LiveData 的方法参数的泛型是没有型变的,所以直接复制这里代码会有点问题,须要你本身根据编译器的提示修改下。
class BusLiveData<T>(private val mKey:String) : MutableLiveData<T>() { private val TAG = "BusLiveData" private val mObserverMap: MutableMap<Observer<in T>, BaseBusObserverWrapper<T>> = mutableMapOf() @MainThread override fun observe(owner: LifecycleOwner, observer: Observer<in T>) { val exist = mObserverMap.getOrPut(observer,{ BusLifecycleObserver(observer,owner,this).apply { mObserverMap[observer] = this owner.lifecycle.addObserver(this) } }) super.observe(owner, exist) Logger.d(TAG,"observe() called with: owner = [$owner], observer = [$observer]") } @MainThread override fun observeForever(observer: Observer<in T>) { super.observeForever(observer) val exist = mObserverMap.getOrPut(observer ,{ BusAlwaysActiveObserver(observer,this).apply { mObserverMap[observer] = this } }) super.observeForever(exist) Logger.d(TAG, "observeForever() called with: observer = [$observer]") } @MainThread fun observeSticky(owner: LifecycleOwner, observer: Observer<T>) { super.observe(owner, observer) Logger.d(TAG, "observeSticky() called with: owner = [$owner], observer = [$observer]") } @MainThread fun observeStickyForever(observer: Observer<T>){ super.observeForever(observer) Logger.d(TAG, "observeStickyForever() called with: observer = [$observer]") } @MainThread override fun removeObserver(observer: Observer<in T>) { val exist = mObserverMap.remove(observer) ?: observer super.removeObserver(exist) Logger.d(TAG, "removeObserver() called with: observer = [$observer]") } @MainThread override fun removeObservers(owner: LifecycleOwner) { mObserverMap.iterator().forEach { if (it.value.isAttachedTo(owner)) { mObserverMap.remove(it.key) } } super.removeObservers(owner) Logger.d(TAG, "removeObservers() called with: owner = [$owner]") } @MainThread override fun onInactive() { super.onInactive() if (!hasObservers()) { // 当 LiveData 没有活跃的观察者时,能够移除相关的实例 LiveDataBusCore.getInstance().mBusMap.remove(mKey) } Logger.d(TAG, "onInactive() called") } @MainThread public override fun getVersion(): Int { return super.getVersion() } }
代码比较短和简单,要点就是,对于非粘性消息,即 observer() 和 observerForever() ,咱们须要使用自定义的包装类包装处理。对于粘性消息,则直接使用 LiveData 默认实现便可。
internal class LiveDataBusCore { companion object{ @JvmStatic private val defaultBus = LiveDataBusCore() @JvmStatic fun getInstance() = defaultBus } internal val mBusMap : MutableMap<String, BusLiveData<*>> by lazy { mutableMapOf<String, BusLiveData<*>>() } fun <T> getChannel(key: String) : BusLiveData<T> { return mBusMap.getOrPut(key){ BusLiveData<T>(key) } as BusLiveData<T> } }
class LiveDataBus { companion object{ @JvmStatic @Synchronized fun <T> getSyn(key: String) : BusLiveData<T>{ return get(key) } @JvmStatic fun <T> get(key: String) : BusLiveData<T>{ return LiveDataBusCore.getInstance().getChannel(key) } private fun <T> get(key: String, type: Class<T>) : BusLiveData<T> { return LiveDataBusCore.getInstance().getChannel(key) } @JvmStatic fun <E> of(clz: Class<E>): E { require(clz.isInterface) { "API declarations must be interfaces." } require(clz.interfaces.isEmpty()) { "API interfaces must not extend other interfaces." } return Proxy.newProxyInstance(clz.classLoader, arrayOf(clz), InvocationHandler { _, method, _-> return@InvocationHandler get( // 事件名以集合类名_事件方法名定义 // 以此保证事件的惟一性 "${clz.canonicalName}_${method.name}", (method.genericReturnType as ParameterizedType).actualTypeArguments[0].javaClass) }) as E } } }
使用:
class TestLiveDataBusActivity : AppCompatActivity() { companion object{ private const val TAG = "TestLiveDataBusActivity" } override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_test_live_data_bus) LiveDataBus.get<String>("testObserver").observe(this, Observer<String> { Log.d(TAG, "testObserver = $it") test_observer_id.text = it }) LiveDataBus.get<String>("testObserver").postValue("new value") } }
LiveDataBus 还提供了一个 of() 方法,用于提供用于事件约束,什么是时间约束呢?就是说咱们的如今观察者和发布者获取消息渠道的 key 是一个字符串,在使用的过程当中可能会出现,发布者的 key 是 itO 而观察者的 key 误输入成 it0 的状况,因此这里咱们能够模仿 Retrofit 请求动态代理的作法,在使用的过程当中,咱们须要先定义一个接口:
interface TestLiveEvents { fun event1(): MutableLiveData<String> }
使用:
fun main() { LiveDataBus .of(TestLiveEvents::class.java) .event1() .postValue("new value") }
借助于 Android 官方提供的 LiveData ,咱们能够很是方便地实现本身的 LiveDataBus,所有文件也就只有以上几个类,同时咱们还避免了 EventBus 的许多缺点!
LiveDataBus 的源码:https://github.com/LinYaoTian...
若是文章有任何问题,欢迎在评论区指正。