本文来自尚妆前端团队路远html
发表于尚妆github博客,欢迎订阅!前端
EventBus
是基于观察者模式的发布/订阅事件总线,它让组件间的通讯变得更加简单。相似广播系统,不过 EventBus
全部的订阅和发送都是在内存层面的,使用起来远比广播简单,也更容易管理。java
先说明在事件总线中的几个关键词:git
当但愿接受到事件时,须要在 onCreate()
执行 register()
方法,在注册方法中会检索当前类中声明的接受事件的方法,并将他们注册到对应的映射中。github
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
复制代码
内存中存储的数据结构有以下几个:apache
// 事件 - List<订阅(Subscription)> 每一个订阅由订阅者、事件类型惟一肯定
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
// 订阅者 - List<关注的事件> 每一个订阅者可能关注多个事件
private final Map<Object, List<Class<?>>> typesBySubscriber;
// 事件对应下的粘滞事件
private final Map<Class<?>, Object> stickyEvents;
复制代码
当执行 register()
方法时,会借助 SubscriberMethodFinder
类从注册的对象的 Class
中查找。缓存
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
// 从缓存中找是否已经检索过了,有缓存就直接返回
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
// 是否忽略索引功能,忽略的话会直接使用反射的方法搜索,不然会检测有没有相关的索引可使用
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
// 支持索引的状况,会优先从索引中查找,加快查找的速度
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
// 没有找到任何的订阅方法将会抛出异常,因此至少要用注解订阅一个方法
} else {
// 针对这个 class 查找到订阅的方法列表,存缓存,下次更快的返回
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
复制代码
由于咱们不考虑索引的状况,最终查找方法都会走到方法 findUsingReflectionInSingleClass
,内部的原理相对简单,遍历该类的全部方法,找到共有的、只有一个参数、且带有 @Subscribe
注解的方法,存储到列表中。weex
private static final int MODIFIERS_IGNORE = Modifier.ABSTRACT | Modifier.STATIC | BRIDGE | SYNTHETIC;
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
methods = findState.clazz.getDeclaredMethods();
for (Method method : methods) {
int modifiers = method.getModifiers();
// 共有的方法 & 不是静态、抽象、不是编译生成的方法
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
// 参数长度只能是1
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
// 方法上面带有 @Subscribe 注解
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
}
}
}
}
复制代码
这个过程是一个循环,每次都会向上查找当前类的父类,知道到达 java
内置的类中,这就意味着,父类中声明的订阅方法,在子类实例中也会接收到。查找的结果最终会生成一个 SubscriberMethod
的列表,这个类中存储了订阅方法的所有信息,数据结构以下:数据结构
public class SubscriberMethod {
final Method method; // 当前的方法,可执行
final ThreadMode threadMode; // 线程类型
final Class<?> eventType; // 参数的类型,也就是他订阅的事件的类型
final int priority; // 优先级
final boolean sticky; // 是不是粘滞事件
String methodString; // 方法的字符串
}
复制代码
// 事件 - List<订阅(Subscription)> 每一个订阅由订阅者、事件类型惟一肯定
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
// 订阅者 - List<关注的事件>
private final Map<Object, List<Class<?>>> typesBySubscriber;
复制代码
订阅的过程就是根据订阅者 Subscriber
及该订阅者的某个处理事件的方法 SubscriberMethod
来生成 Subscription
而且存储到映射当中。异步
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
// 存储到 事件 - List<订阅> 映射中
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); // ... 不存在则建立新的
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
// 存储到 订阅者 - List<关注的事件> 映射中
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber); // ... 不存在则建立新的
subscribedEvents.add(eventType);
// ...
// 对 Sticky Event 的处理,后面单独说
}
复制代码
因为事件总线的机制基于内存实现,全部的订阅都会存储在内存中,所以必须在合适的时机取消注册,来释放占用的内存空间。
当取消注册时:
订阅者-List<关注事件>
的映射快速的获取到,当前订阅者感兴趣的事件列表。事件-List<订阅>
的映射中,删除全部的订阅。订阅者-List<关注事件>
删除,完成取消订阅的过程。获取当前订阅者关注的所有事件,遍历取消注册。
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {
}
}
// 从订阅列表中删除对应的订阅
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}
复制代码
当须要发送事件使用 EventBus
的 post()
方法。
借助 ThreadLocal
每一个线程单独维护一个、且仅一个 PostingThreadState
对象,这个对象的数据结构以下, 内部存储了当前发送事件状态的的一些关键信息。
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<Object>(); // 事件队列
boolean isPosting; // 是否正在发送事件,是的话不须要启动循环读取事件
boolean isMainThread; // 是不是主线程
Subscription subscription; // 一个订阅
Object event; // 当前的事件
boolean canceled; // 是否被取消
}
复制代码
获取本线程的 PostingThreadState 对象,进行初始化,并开始轮询处理队列中的事件。
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
postingState.isPosting = true;
try {
// 从队列中循环读取事件处理
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
复制代码
继续往深里面看 postSingleEvent()
方法,他每次处理一个从队列中取出来的事件,这里作了一个区分,是否支持继承,这个值默认是 true
,支持继承时,若是对当前事件的父类、接口对应的事件感兴趣,那么他也能够处理该事件。例如当前要处理 A 事件,A 继承自 B,同时实现 C 接口,能处理 B,C 事件的订阅者将也会参与处理此 A 事件。
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
// 向父类搜索,将父类、接口所有查找到
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
// 没有找到订阅的方法,处理分支
}
}
复制代码
接下来会走 postSingleEventForEventType()
方法,这个方法负责找到对这个事件感兴趣的 订阅 Subscription 列表, Subscription
里面包含了订阅者、处理对应事件的方法等信息。
拿到列表以后便循环将事件给列表中的订阅依次处理,在以前注册时,是有一个优先级别的,优先级高的将会先得到处理事件的权利。
优先级别较高的处理者能够中止事件的传递,只须要抛出一个异常,被 finally
块捕捉后,就会中断轮询,从而终止事件的传递。
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> CopyOnWriteArrayList<Subscription> subscriptions; synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
// 遍历全部的订阅,处理事件
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
// 让 subscription 处理 event
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
// 若是优先级别较高的处理者异常,则后续处理者将没法处理该事件
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
// 退出轮询
if (aborted) {
break;
}
}
return true;
}
return false;
}
复制代码
处理事件的最后一步,是 postToSubscription()
他负责将事件的处理分发到不一样的线程队列中,在添加订阅注解 @Subscribe
时能够指定 threadMode
,这极大的方便了咱们在事件传递后切换不一样线程处理事件,例如咱们经常要在子线程处理数据,而通知主线程更新 UI
,使用 EventBus
只须要指定 @Subscribe(threadMode=ThreadMode.Main)
则在处理事件时全部操做在内部便被切换到了主线程,真正作到了对线程切换的无感知。
分为了以下几种类型:
POSTING
发送线程,或者说是当前线程更贴切一些,在其余类库中一般叫 Immediate
, 也就是不用切换线程。MAIN
主线程,不解释。BACKGROUND
后台线程,若是发送线程是主线程,则开辟新的线程执行,不然将在当前线程执行。ASYNC
异步线程,不管怎样,老是开启新的子线程去执行。这里就要看一下几个处理者 HandlerPoster
/BackgroundPoster
/AsyncPoster
实现原理大体相同,内部维护一个队列,不停的把里面的事件取出来处理。
HandlerPoster
是基于 Handler
实现对队列的轮询。BackgroundPoster
则是用死循环来作的,谁让人家有本身的线程呢。AsyncPoster
就更富了,根本不轮询,每次都是一个新的线程。private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
}
}
复制代码
最终调用的 invokeSubscriber()
很简单就是利用反射调一下对应的 method
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
复制代码
我把 Sticky Event
翻译成 粘滞事件 不知道对不对,他的出现主要是由于咱们须要处理事件是老是要先注册再发送事件,根本缘由在于当一个事件发出时,他的生命周期很短,全部对他感兴趣的订阅者处理完了以后他就被抛弃了,后面的订阅者再感兴趣也没用,由于早就被清理啦。
要解决这个问题也很简单,就是延长事件的生命周期,即便你们都不理他了,他也能顽强的活着,万一后面还有人对他感兴趣呢。因此实现的原理也就很明了了,找个列表把它所有存起来,除非你手动给删除,不然就 粘不拉几 的附着在你的内存里,等着他的真命天子出现。
// 事件类型 - 事件实例
private final Map<Class<?>, Object> stickyEvents;
// 发送粘滞事件时,先存起来给后面的人用,而后按照常规流发送出去
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
post(event);
}
复制代码
还要提供一个渠道,让新加入进来的订阅者可以察觉到这里有粘滞事件的存在,若是感兴趣也能够处理它。这个时机就是注册时,当一个订阅者被添加到注册表中时,此时若是存在粘滞事件,用当前订阅者感兴趣的事件为 key
获取存在的粘滞事件,若是有感兴趣的就临幸一下。因而能够完善一下以前未说完的 register()
方法:
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
// ... 前面这块说过了
// 这个订阅者的这个订阅方法是对粘滞事件感兴趣的
if (subscriberMethod.sticky) {
// 事件是否继承
if (eventInheritance) {
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
// 当前事件的子类粘滞事件都会被取出来检查是否能够被处理
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
复制代码
接下来的 checkPostStickyEventToSubscription()
就会调用前面已经说过的 postToSubscription()
方法,开始发送到不一样的线程中执行,这部分和普通的事件是同样的啦。
粘滞事件这里也出现了一个关于事件继承的检索,在上一节也出现了一次,单独拿出来讲一下异同之处。
能够类比函数入参的限制,若是一个方法声明中参数是父类,那么传参时能够传递子类对象进去,声明了子类的话,是不能传递父类对象的。
举个例子,设定下场景,咱们如今有事件基类 BaseEvent
和一个事件子类 ImplEvent
是继承关系。
第一种场景,发送普通事件,我发送了一个 ImplEvent
,由于我发的是个子类事件,也就是说全部声明关注 BaseEvent
的订阅者也均可以将当前事件做为入参,因此向上检索对 ImplEvent
父类、父接口感兴趣的订阅者去执行。
第二个场景,发送粘滞事件,发送一个 BaseEvent
的粘滞事件,由于是在注册时触发执行,那么说明当前订阅者对 BaseEvent
感兴趣,既然他的入参是父类事件,那么子类事件也一样能够做为他的处理事件方法的入参,因而检索全部粘滞事件找到全部 BaseEvent
的子类事件都交给当前订阅者处理。
在 Weex
中有一个 BroadcastChannel
的 API
用来实现页面间的通讯,在原生部分使用 WebSocketModule
实现,不过通过实验发现,注册和发送没有什么大问题,不过在取消注册这块作的有漏洞,出现屡次页面销毁可是没法取消对事件监听的状况(多是当时尝试的时候版本低一些),主要是由于 module
的生命周期没能和 weex
页面实例更好的绑定起来,并且它是基于 W3C
的标准设计的,也没有实现相似粘滞事件这种功能的支持。
最后决定根据事件总线的机制来尝试实现页面之间的通讯,在 Weex
中有一个 页面内 通讯的接口,他是 native
和 weex
通讯的通道,能够用一个 key
做为标示符,触发当前 weex
页面中对 key
事件感兴趣的的方法,关于 weex
相关的内容这里不细说。
((WXSDKInstance)instance).fireGlobalEventCallback(key, params)
复制代码
实现原理相似 EventBus
,不过由于基于 weex
就没那么复杂,一样须要维护一个注册表,相对于 EventBus
要对订阅者强引用持有,这里使用了每一个 weex
页面惟一的 instanceId
做为标记,存储这个标记而不是存储真正的 WXSDKInstance
对象,避免内存泄漏。
private val mEventInstanceIdMap by lazy { mutableMapOf<String, MutableSet<String>>() }
复制代码
注册,当 weex
那边发起注册时,拿到对应的 instanceId
存储到映射中。
// 注册接受某事件
// event.registerEvent('myEvent')
// globalEvent.addEventListener('myEvent', (params) => {});
fun registerEvent(key: String?, instantId: String?) {
// do check...
val nonNullKey = key ?: return
val registerInstantIds = mEventInstanceIdMap[nonNullKey] ?: mutableSetOf()
registerInstantIds.add(instantId)
mEventInstanceIdMap[nonNullKey] = registerInstantIds
}
复制代码
发送事件时,根据事件的 key
拿到对他关注的订阅者的 instanceId
列表,循环从 weex sdk
中取出真正的 WXSDKInstance
对象,再利用页面内通讯的 API
将事件发送给指定页面,达到页面间通讯的目的。
// 发送事件
// event.post('myEvent',{isOk:true});
fun postEvent(key: String, params: Map<String, Any>) {
// do check...
val registerInstantIds = mEventInstanceIdMap[key] ?: listOf<String>()
val allInstants = renderManager.allInstances
for (instance in allInstants) {
// 遍历找到订阅的 instanceId 进而拿到 weex 实例发送页面内事件
if (instance != null
&& !instance.instanceId.isNullOrEmpty()
&& registerInstantIds.contains(instance.instanceId)) {
instance.fireGlobalEventCallback(key, params)
}
}
}
复制代码
当页面销毁时,同时自动取消注册,释放内存和避免没必要要的事件触发
override fun onWxInstRelease(weexPage: WeexPage?, instance: WXSDKInstance?) {
val nonNullId = instance?.instanceId ?: return
for (mutableEntry in mEventInstanceIdMap) {
if (mutableEntry.value.isNotEmpty()) {
mutableEntry.value.remove(nonNullId)
}
}
}
复制代码
最后,目前只是一个简单的实现,可以基本实现页面间通讯的需求,不过还须要更多地调研和其余端同窗的配合,相信会愈来愈完善。
目前维护的几个项目,求 ✨✨✨✨