从 EventBus 的介绍中,EventBus 给的定位是:设计模式
Event bus for Android and Java that simplifies communication between Activities, Fragments, Threads, Services, etc. Less code, better quality安全
简单理解一下就是 Event bus 给 Android 及 Java (固然主要是 Android)的 Activity,Fragment,Threads,Services 之间提供一个简单的通讯方式,从而能让咱们用质量高且少的代码来完成咱们的功能。bash
EventBus 是一个基于发布/订阅的设计模式的事件总线,其架构图以下。网络
从架构图上来看,仍是很简单的,跟咱们所最熟悉的观察者模式是相似的。大概工做过程就是: (1) Subscriber 也就是订阅者,订阅一个 Event,其中 Event 是本身定义的,符合 POJO 的规范便可。 (2) 在须要时,Publisher 也就是发布者,就可将事件经过 EventBus 分发布相应的订阅者 Subscriber。数据结构
恩,就是如此简单。固然,其实现远不会这么简单了,仍是干了不少脏活和累活的。多线程
EventBus 的官方文档为咱们提供了经典的使用 EventBus 的 3 个过程。架构
普通的 POJO 便可框架
public class MessageEvent {
public final String message;
public MessageEvent(String message) {
this.message = message;
}
}
复制代码
文章是基于 EventBus 3 进行分析,对于订阅者的方法命名能够自由命名了,而不须要采用约定命名。异步
@Subscribe(threadMode = ThreadMode.MAIN)
public void onMessageEvent(MessageEvent event) {
Toast.makeText(getActivity(), event.message, Toast.LENGTH_SHORT).show();
}
// This method will be called when a SomeOtherEvent is posted
@Subscribe
public void handleSomethingElse(SomeOtherEvent event) {
doSomethingWith(event);
}
复制代码
准备好了订阅者以后,咱们还须要向总线注册这个订阅者,这个咱们的订阅者方法才能接收到相应的事件。async
@Override
public void onStart() {
super.onStart();
EventBus.getDefault().register(this);
}
@Override
public void onStop() {
EventBus.getDefault().unregister(this);
super.onStop();
}
复制代码
咱们能够在任何地方发送事件。
EventBus.getDefault().post(new MessageEvent("Hello everyone!"));
复制代码
根据前文的 3 步经典使用方法,源码的分析大概也分红以下 4 步来进行。
方法getDefault()
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
复制代码
getDefault() 方法就是一个“懒汉”的单例模式,目的就是让咱们能在全局对 EventBus 的引用进行使用。值得关注的是,这个懒汉的单例模式实现并不会有多线程的安全问题。由于对于 defaultInstance 的定义是 volatile 的。
static volatile EventBus defaultInstance;
复制代码
接下来继续看看构造方法。
构造方法EventBus()
public EventBus() {
this(DEFAULT_BUILDER);
}
EventBus(EventBusBuilder builder) {
logger = builder.getLogger();
subscriptionsByEventType = new HashMap<>();
typesBySubscriber = new HashMap<>();
stickyEvents = new ConcurrentHashMap<>();
mainThreadSupport = builder.getMainThreadSupport();
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
backgroundPoster = new BackgroundPoster(this);
asyncPoster = new AsyncPoster(this);
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
logSubscriberExceptions = builder.logSubscriberExceptions;
logNoSubscriberMessages = builder.logNoSubscriberMessages;
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
throwSubscriberException = builder.throwSubscriberException;
eventInheritance = builder.eventInheritance;
executorService = builder.executorService;
}
复制代码
构造方法使用了常见的 Builder 模式对 EventBus 中的各个属性进行了初始化。这里使用的是默认配置。通常状况下,咱们也都是使用 getDefault() 及其默认配置来获取实例的。咱们也能够经过配置 EventBusBuilder 来 build 出一个本身的实例,可是要注意的是,后面的注册、注销以及发送事件都要基于此实例来进行,不然就会发生事件错乱发错的问题。
方法register() 若是不看方法的实现,根据经验判断,我想当 register 的发生,应该是将订阅者中用于接收事件的方法与该事件关联起来。那是否是这样呢?
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
// 1.经过订阅者类找到其订阅方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
// 2. 而后进行逐个订阅
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
复制代码
首行来看看找订阅方法。这里封装了一个专门的类 SubscriberMethodFinder,而且经过其方法 findSubscriberMethods() 来进行查找,这里就不贴 findSubscriberMethods() 的代码,而是看看其内部实际用于寻找订阅方法的 findUsingReflection()。
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
......
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
......
}
for (Method method : methods) {
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
......
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
......
}
}
}
复制代码
总结一下,该方法主要就是寻找订阅者类中的公有方法,且其参数惟一的以及含有注解声明@Subscribe 的方法。就是咱们在写代码时所定义的订阅者方法了。找到订阅者方法后,会将其 method(方法的反射类 Method) 、event、thread mode 以及优先级等封装成一个 SubscribeMethod 而后添加到 FindState 中。
这个 FindState 是 SubscriberMethodFinder 的一个内部类。其用了大小只有 4 的 FIND_STATE_POOL 来进行管理,这样避免了 FindState 对象的重复建立。
最后在 find 中会将所找到订阅者方法添加到 “METHOD_CACHE” 中,这是一个 ConcurrentHashMap 的结构。
Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
复制代码
方法subscribe()
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
......
}
}
// 按优先级进行插入
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
// 粘性事件,注册就会发送
if (subscriberMethod.sticky) {
.....
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
复制代码
subscribe() 方法完成事件与 subscribe 、 subscribeMethod 的关联,具体怎么关联的呢,请看图。
这里分别用了 2 个 HashMap 来描述。subscriptionsByEventType 记录了一个事件应该调用哪一个订阅者的订阅方法来处理。而 typesBySubscriber 记录了订阅者订阅了哪些事件。
至此,订阅者的注册完成以后,也就至关因而 EventBus 的初始化完成了,那么接下来就能够愉快发送事件了。
public void post(Object event) {
// 获取发送者线程的状态管理器
PostingThreadState postingState = currentPostingThreadState.get();
// 从状态管理器中取出事件队列,并将事件添加到队列中
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
// 若是当前不是发送状态,那就进入发送状态
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
......
}
try {
// 在发送状态下,经过遍历事件队列逐个发送事件
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
复制代码
post() 方法的主要过程都写在代码注释里了。这里主要关注下 currentPostingThreadState,它的定义以下:
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
复制代码
而 PostingThreadState 的定义以下。
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
}
复制代码
这里用了 ThreadLocal 来保存 PostingThreadState。ThreadLocal 的主要做用是使得所定义的资源是线程私有的。那么,也就是说,对于每个发送事件的线程其都有一个惟一的 PostingThreadState 来记录事件发送的队列以及状态。
下图是 ThreadLocal 在 Thread 中的数据结构描述,而这里的 PostingThreadState 就是下图中的 Value。
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
......
复制代码
postSingleEvent 所作的事情主要就是根据要发送的事情收集事件,而后逐个发送。收集的什么事情呢,很简单,就是收集它的父类事件。也就是说,当咱们发送一个事件时,它的全部父类事件也同时会被发送,这里的父类还包括了 Object 在内。因此,这里须要开发者们很是注意了:
事件是具备向上传递性质的
接下来的 postSingleEventForEventType 就是从 subscriptionsByEventType 找出事件所订阅的订阅者以及订阅者方法,即 CopyOnWriteArrayList。 而后再经过方法 postToSubscription() 将事件逐个逐个向订阅者进行发送。在 postToSubscription() 方法中会根据不一样的 ThreadMode 来决定不一样的线程调度策略,具体在下面讲。而无论采用何种策略,最终都将会经过invokeSubscriber() 进行方法的反射调用来进行订阅方法的调用。
void invokeSubscriber(Subscription subscription, Object event) {
......
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
......
}
复制代码
至此,事件的发送就完成了。很简单,其真实的面目就是一个方法的反射调用。而对于事件的发送,这里还应该关注一下线程的调度策略。
上面说到线程的调度策略在 postToSubscription() 方法中,那么就来看一看这个方法。
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
复制代码
针对每一个不一样的策略,下面来简单看一看。 POSTING 发送者线程直接发送,也是 ThreadMode 的默认值。 MAIN 若是发送者是主线程,则直接发送。若是不是则交给 mainThreadPoster 来发送。mainThreadPoster 是 HandlerPoster 类型,该类继承了 Handler 并实现了 Poster 接口,其实例是由 AndroidHandlerMainThreadSupport 所建立的。AndroidHandlerMainThreadSupport 包含了主线程的 Looper ,而 HandlerPoster 包含了一个链表结构的队列 PendingPostQueue,事件也将插入到 PendingPostQueue 中。每个事件都会以一个空 Message 发送到主线程的消息队列,消息处理完再取出下一个事件,一样以 Message 的形式来执行,如此反复执行。直到队列为空。 MAIN_ORDERED 通常来讲也是经过 mainThreadPoster 来发送,异常状况下才经过发送者线程来发送。从代码侧来看,它主要就是属于 MAIN 策略的第二种状况。 BACKGROUND 若是当前为主线程,则将其投递到 backgroundPoster,不然直接发送。这里的 backgroundPoster 就是一个子线程。 ASYNC 异步模式下,最终就是经过 ExecutorService ,也即线程池来发送的。asyncPoster 即 AsyncPoster,其关于线程调度的关键代码为:
eventBus.getExecutorService().execute(this);
复制代码
注销就比较简单了,以下代码,主要就是从 typesBySubscriber 这个 map 中将订阅者移除掉,而且解决订阅者和事件的关联。
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
复制代码
从整个分析过程来看,EventBus 是一个比较简单的框架,其涉及到的核心知识是 ThreadLocal 以及方法的反射调用,而基于此为咱们封装了一套完整的事件 分发-传递-响应 机制,固然也是一个很是优秀的框架。总结一下,其主要的特色:
最后,感谢你能读到并读完此文章,若是分析的过程当中存在错误或者疑问都欢迎留言讨论。若是个人分享可以帮助到你,还请记得帮忙点个赞吧,谢谢。