EventBus是Android和Java的开源库,它使用发布者/订阅者模式进行松散耦合。EventBus使用了总线控制,可以用几行代码解耦类及简化代码,消除依赖关系,加速应用程序开发。java
下图为官方示例图:git
官网网址:EventBus官网github
Github地址:Github数据库
一、gradle引入设计模式
implementation 'org.greenrobot:eventbus:3.1.1'
复制代码
二、注册与取消注册数组
//注册
EventBus.getDefault().register(this);
//取消注册
EventBus.getDefault().unregister(this);
复制代码
三、事件定义与发送缓存
//定义
public class MessageEvent {
}
//发送
EventBus.getDefault().post(new MessageEvent());
复制代码
四、事件接收bash
@Subscribe(threadMode = ThreadMode.POSTING)
public void onEventMainThread(MessageEvent event) {
// doSomeThing();
};
复制代码
EventBus的使用包含注册、取消注册、事件定义发送及事件接收。当用户进行注册时,会经过反射获取实例中定义事件方法集合,而后将事件方法集合及订阅者加入到Map中,当执行post时,会根据事件类型,从集合中获取对应的订阅集合,经过配置的threadMode,使用对应的Poster调用订阅者的事件,最后经过反射method的invoke执行事件。微信
类 | 说明 |
---|---|
EventBus | 总线调度类,getDefault()为单例模式。内部持有订阅者及事件集合。还有各事件的发送器。eventTypesCache(缓存全部粘性事件的集合)、subscriptionsByEventType(key为事件,value为订阅者集合的Map)、typesBySubscriber(key为订阅者,value为事件集合的Map)、currentPostingThreadState(ThreadLocal,当前线程的事件集合)、mainThreadPoster(主线程的post)、backgroundPoster(后台线程的post)、asyncPoster(异步线程的post)、subscriberMethodFinder(获取订阅者中的事件) |
SubscriberMethod | 订阅方法的类,包含Method、ThreadMode、priority等配置属性 |
Subscription | 订阅者类,包含subscriber的Object实例、subscriberMethod |
PostingThreadState | 存储当前线程的事件集合 |
SubscriberMethodFinder | 用于获取订阅中的定义的事件接收方法 |
PendingPost | subscription与event的包装类,内部维护一个pendingPostPool,当池中有PendingPost实例,会进行复用 |
PendingPostQueue | 内部维护了一个head、tail的PendingPost实例,提供enqueue及poll操做 |
HandlerPoster | 用于处理主线程的事件执行 |
BackgroundPoster | 用于处理后台线程的事件执行 |
AsyncPoster | 用于执行异步线程的事件执行 |
注册主要是经过反射获取订阅者中定义的事件方法集合,将订阅者和事件集合加入对应的Map,而后会判断是否支持粘性事件,将以前发送的粘性事件缓存发送给订阅者。框架
一、 源码实现
public void register(Object subscriber) {
//获取注册的Object的Class
Class<?> subscriberClass = subscriber.getClass();
//经过subscriberMethodFinder获取该Object中全部的订阅方法(SubscriberMethod集合)
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
//遍历事件集合
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
//生成Subscription对象(SubscriberMethod的包装类)
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
//获取subscriptionsByEventType集合(key为事情,value为订阅集合)
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
//获取typesBySubscriber集合(key为订阅者,value为事件集合)
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
//将全部的事件,根据订阅者key加入到事件集合中
subscribedEvents.add(eventType);
//事件方法是否支持(粘性事情)
if (subscriberMethod.sticky) {
//事件方法是否支持(继承事件)
if (eventInheritance) {
//遍历stickyEvents,找出全部继承于事件的父事件
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
//给订阅者发送以前缓存的粘性事件
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
//给订阅者发送以前缓存的粘性事件
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
复制代码
二、 流程图
一、 源码实现
public void post(Object event) {
//获取当前线程的PostingThreadState
PostingThreadState postingState = currentPostingThreadState.get();
//获取事件队列postingState.eventQueue
List<Object> eventQueue = postingState.eventQueue;
//添加事件
eventQueue.add(event);
//是否正在发送
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
//循环获取eventQueue中的事件
try {
while (!eventQueue.isEmpty()) {
//获取集合数据并移除,而后发送事件
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
//事件是否支持继承
if (eventInheritance) {
//找出全部继承事件
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
//遍历集合
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
//发送
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
//发送
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
//若是没有订阅者会发送一个NoSubscriberEvent事件
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
//根据事件获取全部订阅者subscriptions
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
//遍历全部订阅者
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
//发送事件
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
//根据对应的threadMode,使用对应的post进行事件的处理
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
void invokeSubscriber(Subscription subscription, Object event) {
try {
//最后经过反射执行事件方法
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
复制代码
二、 流程图
一、 源码实现
public synchronized void unregister(Object subscriber) {
//根据subscriber从typesBySubscriber获取事件集合
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
//遍历订阅者的事件
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
//typesBySubscriber移除subscriber
typesBySubscriber.remove(subscriber);
} else {
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
//根据eventType从subscriptionsByEventType获取订阅集合
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
//遍历集合移除当前的subscriber
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}
复制代码
二、 流程图
EventBus之因此如此流行,一个很重要的地方就是使用很是简便。咱们只须要在类中定义好方法接收对应的事件、配置好相关的标注就能够。那么怎么获取到订阅者的事件方法集合,就是EventBus设计的一个精髓的地方。经过上面的注册方法,咱们知道主要是经过下面的方法来获取,下面咱们主要分析一下具体的实现。
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
复制代码
接着看findSubscriberMethods的事件
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
//定义了缓存Map,避免每次都执行反射获取,提升性能
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
if (ignoreGeneratedIndex) {
//经过反射获取
subscriberMethods = findUsingReflection(subscriberClass);
} else {
//经过索引获取
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
复制代码
最后会执行indUsingReflection去获取,具体实现以下:
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
//FindState 用来作订阅方法的校验和保存
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
//经过反射来得到订阅方法信息
findUsingReflectionInSingleClass(findState);
//查找父类的订阅方法
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
复制代码
关键的实如今findUsingReflectionInSingleClass方法中,实现以下:
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// //经过反射获得方法数组
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
//遍历Method
for (Method method : methods) {
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
////保证必须只有一个事件参数
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
//获得注解
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
//校验是否添加该方法
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
//实例化SubscriberMethod对象并添加
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
复制代码
总结一下,EventBus是经过反射getDeclaredMethods()获取类的方法集合,而后遍历方法集合,将符合事件定义的方法(public、只有一个事件参数、有Subcribe的注解等)加入到集合中。从而达到识别订阅者中定义的事件方法。
咱们知道,EventBus能够经过定义threadMode来指定事件回调的执行线程。主要的配置以下:
ThreadMode: POSTING:默认的,在同一个线程中执行。
ThreadMode: MAIN :主线程执行
ThreadMode: MAIN_ORDERED: 主线程执行,不过须要排队,若是前一个也是main_ordered 须要等前一个执行完成后才执行,在主线程中执行,能够处理更新ui的操做。
ThreadMode: BACKGROUND :后台进程,处理如保存到数据库等操做。
ThreadMode: ASYNC :异步执行,另起线程操做。
经过上面的流程分析在post的过程当中,最后都是经过postToSubscription执行,在这里面判断了threadMode的类型,具体的实现以下:
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
//根据对应的threadMode,使用对应的post进行事件的处理
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
复制代码
一、POSTING模式,直接执行invokeSubscriber。
二、MAIN模式,若是判断当前线程是主线程则执行invokeSubscriber,不然会使用mainThreadPoster执行enqueue方法。mainThreadPoster为HandlerPoster的实例,继承了Handler,是使用了MainLooper进行建立,也就是其的handleMessage在主线程中执行。 咱们看具体的实现:
public void enqueue(Subscription subscription, Object event) {
//构建PendingPost对象
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//加入队列
queue.enqueue(pendingPost);
//若是没有激活hander
if (!handlerActive) {
handlerActive = true;
//发送消息
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
//循环获取队列中的全部pendingPost
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
pendingPost = queue.poll();
if (pendingPost == null) {
//若是已没有数据,更新 handlerActive
handlerActive = false;
return;
}
}
}
//在主线程中实现事件的方法
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
复制代码
HandlerPoster会经过主线程的Handler去执行队列中的全部事件方法。
三、MAIN_ORDERED模式 优先主线程队列执行
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
复制代码
四、BACKGROUND模式 若是再主线程则执行后台线程执行,不然使用当前线程
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
复制代码
这里咱们主要看backgroundPoster的实现,BackgroundPoster继承了Runnable,实现线程池执行run方法,经过executorRunning控制不会每次都启动一个新任务。
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//加入队列
queue.enqueue(pendingPost);
//变量标识,不要每次都执行
if (!executorRunning) {
executorRunning = true;
//线程池执行
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
while (true) {
//循环间隔1s获取事件
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
//若是已没有任务,更新变量
executorRunning = false;
return;
}
}
}
//在异步线程执行了事件方法
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
复制代码
backgroundPoster会开启一个线程去执行当前全部队列中的事件方法。
五、ASYNC 模式 主要使用了AsyncPoster,也继承了run接口。实现以下:
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
复制代码
AsyncPoster每个事件都会开启一个异步任务,经过线程池去执行。
总结一下 EventBus经过配置threadMode,控制事件在不一样的线程中去执行。总归有5种模式,分别为POSTING、MAIN、MAIN_ORDERED、BACKGROUND、ASYNC。主要是经过HandlerPoster、backgroundPoster、AsyncPoster来实现线程的切换。
- HandlerPoster会经过主线程的Handler开启循环去执行队列中的全部事件方法。
- backgroundPoster会开启一个线程循环执行当前全部队列中的事件方法。
- AsyncPoster每个事件都会开启一个异步任务,经过线程池去执行。
EventBus主要是经过观察者模式来实现事件的发送和接收。使用register后,会将订阅者及定义的事件接收方法加入到Map中,当在任意地方执行post时,会对事件类型进行匹配,找出全部的订阅者,根据配置的threadMode,使用不一样的poster经过反射去执行事件方法。当使用unregister后,会将订阅者在Map中移除,进行取消注册。
一、单例模式
EventBus.getDefault()使用了懒汉的单例模式。
二、外观模式
EventBus对外提供了统一的调度,屏蔽了内部的实现。
三、建造者模式
Event对象的建立使用EventBusBuilder进行建立,将复杂对象的建立和表示分离,调用者不须要知道复杂的建立过程,使用Build的相关方法进行配置建立对象。
四、策略模式
根据threadMode的设置,使用不一样Poster的实现策略来执行事件方法
一、框架的设计不在复杂而在精巧
二、使用反射和标注能够简化不少实现
三、EventBus的使用要注意避免大量的滥用,将致使逻辑的分散,出现问题后很难定位
欢迎关注个人我的公众号
微信搜索:一码一浮生,或者搜索公众号ID:life2code