上半部分主要是EventBus3.0架构分析,接下来开始EventBus3.0的源码分析了。java
咱们从EventBus3.0使用方式开始源码分析,先来分析注册事件~git
注册事件方式:github
EventBus.getDefault().register(this);
EventBus的getDefault()是一个单例,确保只有一个EventBus对象实例缓存
public static EventBus getDefault() { EventBus instance = defaultInstance; if (instance == null) { synchronized (EventBus.class) { instance = EventBus.defaultInstance; if (instance == null) { instance = EventBus.defaultInstance = new EventBus(); } } } return instance; }
EventBus构造方法作什么呢?架构
private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder(); public EventBus() { this(DEFAULT_BUILDER); }
在EventBus无参构造方法调用了有一个参数的构造方法,参数传入的是EventBusBuilder对象,该对象配置EventBus相关默认的属性。异步
EventBus(EventBusBuilder builder) { logger = builder.getLogger(); subscriptionsByEventType = new HashMap<>(); typesBySubscriber = new HashMap<>(); stickyEvents = new ConcurrentHashMap<>(); mainThreadSupport = builder.getMainThreadSupport(); mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null; backgroundPoster = new BackgroundPoster(this); asyncPoster = new AsyncPoster(this); indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0; subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes, builder.strictMethodVerification, builder.ignoreGeneratedIndex); logSubscriberExceptions = builder.logSubscriberExceptions; logNoSubscriberMessages = builder.logNoSubscriberMessages; sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent; sendNoSubscriberEvent = builder.sendNoSubscriberEvent; throwSubscriberException = builder.throwSubscriberException; eventInheritance = builder.eventInheritance; executorService = builder.executorService; }
这些属性能够经过配置EventBusBuilder来更改async
EventBus.builder() .logNoSubscriberMessages(false) .sendNoSubscriberEvent(false) .installDefaultEventBus();
接下来分析EventBus注册重要的一个方法register,先看下它的源码ide
public void register(Object subscriber) { // 获取当前注册类的Class对象 Class<?> subscriberClass = subscriber.getClass(); // 查找当前注册类中被@Subscribe注解标记的全部方法 List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass); synchronized (this) { // 遍历全部的订阅方法,完成注册 for (SubscriberMethod subscriberMethod : subscriberMethods) { subscribe(subscriber, subscriberMethod); } } }
从register方法源码能够看出,主要完成订阅方法查找和注册,订阅方法查找由findSubscriberMethods完成,注册由subscribe完成。
首先看下findSubscriberMethods方法源码实现源码分析
// 缓存订阅方法,key:当前注册类的Class value:订阅方法集合 private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>(); List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) { // 先从缓存中获取当前注册类的订阅方法,若是找到,直接返回 List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass); if (subscriberMethods != null) { return subscriberMethods; } // ignoreGeneratedIndex默认值为false // 主要做用:是否忽略由APT(注解处理器)生成的订阅索引 if (ignoreGeneratedIndex) { // 若是忽略,则使用反射技术查找全部被@Subscribe注解标记的全部订阅方法 subscriberMethods = findUsingReflection(subscriberClass); } else { // 默认不忽略,可是若是没有使用APT(注解处理器)生成的订阅索引,则仍是经过反射技术查找 subscriberMethods = findUsingInfo(subscriberClass); } // 若是当前注册类中没有订阅方法,则抛出异常 if (subscriberMethods.isEmpty()) { throw new EventBusException("Subscriber " + subscriberClass + " and its super classes have no public methods with the @Subscribe annotation"); } else { // 当前注册类中有订阅方法,添加到缓存中,以便再次注册直接使用 METHOD_CACHE.put(subscriberClass, subscriberMethods); return subscriberMethods; } }
findSubscriberMethods方法主要干的事情很清晰,首先从缓存中获取当前注册类的订阅方法,若是找到,直接返回;若是没有找到,经过APT(注解处理器)或者反射技术查找当前注册类中的全部订阅方法,若是没有找到,抛出异常,不然缓存到内存中,并返回当前注册类中全部的订阅方法。post
其中查找当前注册类中的全部订阅方法经过findUsingInfo方法实现
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) { // FindState FindState findState = prepareFindState(); findState.initForSubscriber(subscriberClass); while (findState.clazz != null) { // 若是没有使用APT(注解处理器)生成订阅方法索引,返回null,则进入else语句中 findState.subscriberInfo = getSubscriberInfo(findState); if (findState.subscriberInfo != null) { SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods(); for (SubscriberMethod subscriberMethod : array) { if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) { findState.subscriberMethods.add(subscriberMethod); } } } else { // 使用反射技术查找当前注册类中全部的订阅方法 findUsingReflectionInSingleClass(findState); } // 从父类中继续查找,直到父类为null findState.moveToSuperclass(); } // 返回注册类中全部的订阅方法,并释放findState中状态,同时把findState对象放回缓存池中 return getMethodsAndRelease(findState); }
findUsingInfo方法主要是从当前注册类及父类中查找全部的订阅方法,首先从经过APT(注解处理器)生成订阅方法索引中查找,若是没有使用APT(注解处理器)生成,则经过反射技术查找。
先来看下经过反射技术怎么去查找?关键方法findUsingReflectionInSingleClass源码以下:
private void findUsingReflectionInSingleClass(FindState findState) { Method[] methods; try { // This is faster than getMethods, especially when subscribers are fat classes like Activities methods = findState.clazz.getDeclaredMethods(); } catch (Throwable th) { // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149 methods = findState.clazz.getMethods(); findState.skipSuperClasses = true; } for (Method method : methods) { int modifiers = method.getModifiers(); // 方法必须是public的,且不能是ABSTRACT或者STATIC if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) { Class<?>[] parameterTypes = method.getParameterTypes(); // 方法的参数必须是有一个 if (parameterTypes.length == 1) { Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class); // 方法必须被@Subscribe注解标记 if (subscribeAnnotation != null) { // 方法参数类型 Class<?> eventType = parameterTypes[0]; // 用来判断该方法是否已经添加过 if (findState.checkAdd(method, eventType)) { ThreadMode threadMode = subscribeAnnotation.threadMode(); findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode, subscribeAnnotation.priority(), subscribeAnnotation.sticky())); } } } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) { String methodName = method.getDeclaringClass().getName() + "." + method.getName(); throw new EventBusException("@Subscribe method " + methodName + "must have exactly 1 parameter but has " + parameterTypes.length); } } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) { String methodName = method.getDeclaringClass().getName() + "." + method.getName(); throw new EventBusException(methodName + " is a illegal @Subscribe method: must be public, non-static, and non-abstract"); } } }
订阅方法的查找已经分析完了,而后就是subscribe方法源码分析了
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) { // 订阅事件类型,也就是订阅方法的参数类型 Class<?> eventType = subscriberMethod.eventType; // Subscription封装当前注册类的对象和订阅方法 Subscription newSubscription = new Subscription(subscriber, subscriberMethod); // subscriptionsByEventType是一个Map,key为订阅事件类型,value为Subscription集合 CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList<>(); subscriptionsByEventType.put(eventType, subscriptions); } else { if (subscriptions.contains(newSubscription)) { throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } // 将newSubscription添加到subscriptions集合中 int size = subscriptions.size(); for (int i = 0; i <= size; i++) { if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) { subscriptions.add(i, newSubscription); break; } } // typesBySubscriber也是一个Map,key为注册类的对象,value为当前注册类中全部订阅方法中的参数类型集合 List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber); if (subscribedEvents == null) { subscribedEvents = new ArrayList<>(); typesBySubscriber.put(subscriber, subscribedEvents); } subscribedEvents.add(eventType); // 默认sticky为false,表明不是粘性事件,下面代码先不看,后面说到粘性事件再来分析它 if (subscriberMethod.sticky) { if (eventInheritance) { // Existing sticky events of all subclasses of eventType have to be considered. // Note: Iterating over all events may be inefficient with lots of sticky events, // thus data structure should be changed to allow a more efficient lookup // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>). Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet(); for (Map.Entry<Class<?>, Object> entry : entries) { Class<?> candidateEventType = entry.getKey(); if (eventType.isAssignableFrom(candidateEventType)) { Object stickyEvent = entry.getValue(); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } } else { Object stickyEvent = stickyEvents.get(eventType); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } }
subscribe方法中的subscriptionsByEventType和typesBySubscriber两个Map。在事件发布时用到
subscriptionsByEventType来完成事件的处理,在取消注册时用到subscriptionsByEventType和typesBySubscriber这两个Map,后面会具体分析到~~
取消注册方式:
EventBus.getDefault().unregister(this);
取消注册调用EventBus的unregister方法,下面是它的源码
public synchronized void unregister(Object subscriber) { // typesBySubscriber是一个Map,注册的时候已经将key为当前注册类的对象,value为当前注册类全部订阅方法的参数类型放入当前Map中 List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber); // 若是subscribedTypes不为null, 说明当前类有注册 if (subscribedTypes != null) { for (Class<?> eventType : subscribedTypes) { // 根据事件类型eventType,取消订阅 unsubscribeByEventType(subscriber, eventType); } // 将注册的对象从Map中移除 typesBySubscriber.remove(subscriber); } else { logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass()); } }
从unregister方法源码可知,先根据当前取消注册类的对象从typesBySubscriber缓存中找到全部订阅方法的事件类型,而后根据事件类型,取消订阅。接下来看下unsubscribeByEventType源码:
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) { // subscriptionsByEventType也是一个Map,注册的时候已经将key为订阅事件类型,value为Subscription对象集合放入当前Map中 List<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions != null) { int size = subscriptions.size(); for (int i = 0; i < size; i++) { Subscription subscription = subscriptions.get(i); if (subscription.subscriber == subscriber) { subscription.active = false; // 移除Subscription subscriptions.remove(i); i--; size--; } } } }
从取消注册源码分析可知,主要是从typesBySubscriber和subscriptionsByEventType这两个Map中移除注册类对象和移除订阅方法。
普通事件的发布,能够经过下面方式:
EventBus.getDefault().post("hello, eventbus!");
能够看到发布事件经过post方法完成
public void post(Object event) { // currentPostingThreadState是一个PostingThreadState类型的ThreadLocal // PostingThreadState维护者事件队列和线程模型 PostingThreadState postingState = currentPostingThreadState.get(); List<Object> eventQueue = postingState.eventQueue; // 将要发送的事件先加入事件队列中 eventQueue.add(event); // isPosting默认值为false if (!postingState.isPosting) { // 是不是主线程 postingState.isMainThread = isMainThread(); postingState.isPosting = true; if (postingState.canceled) { throw new EventBusException("Internal error. Abort state was not reset"); } try { // 循环遍历事件队列,从队列的头部开始发布事件 while (!eventQueue.isEmpty()) { postSingleEvent(eventQueue.remove(0), postingState); } } finally { postingState.isPosting = false; postingState.isMainThread = false; } } }
post方法先将要事件加入到事件队列中,而后循环事件队列,交给postSingleEvent方法处理
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error { Class<?> eventClass = event.getClass(); boolean subscriptionFound = false; // eventInheritance默认为true,表示查找事件的继承类 if (eventInheritance) { // 查找该事件和该事件继承的事件集合 List<Class<?>> eventTypes = lookupAllEventTypes(eventClass); int countTypes = eventTypes.size(); for (int h = 0; h < countTypes; h++) { Class<?> clazz = eventTypes.get(h); subscriptionFound |= postSingleEventForEventType(event, postingState, clazz); } } else { subscriptionFound = postSingleEventForEventType(event, postingState, eventClass); } // 若是没有注册,subscriptionFound为false if (!subscriptionFound) { if (logNoSubscriberMessages) { // logNoSubscriberMessages默认值为true logger.log(Level.FINE, "No subscribers registered for event " + eventClass); } if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) { post(new NoSubscriberEvent(this, event)); } } }
postSingleEvent方法会查找该事件和它的继承事件,而后交给postSingleEventForEventType方法处理
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) { CopyOnWriteArrayList<Subscription> subscriptions; synchronized (this) { // subscriptionsByEventType在注册时,会将事件和订阅方法加入 subscriptions = subscriptionsByEventType.get(eventClass); } // 若是没有注册,subscriptions就会为null if (subscriptions != null && !subscriptions.isEmpty()) { for (Subscription subscription : subscriptions) { postingState.event = event; postingState.subscription = subscription; boolean aborted = false; try { // 事件处理 postToSubscription(subscription, event, postingState.isMainThread); aborted = postingState.canceled; } finally { postingState.event = null; postingState.subscription = null; postingState.canceled = false; } if (aborted) { break; } } return true; } return false; }
postSingleEventForEventType中会从注册表subscriptionsByEventType中找出该事件的全部订阅方法,交给postToSubscription方法处理
最后是事件的处理了,会根据ThreadMode线程模式切换线程处理事件
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) { // 根据订阅方法的线程模式判断,事件须要在哪一个线程处理 switch (subscription.subscriberMethod.threadMode) { // 默认线程模式,在哪一个线程发布事件就在哪一个线程处理事件 case POSTING: invokeSubscriber(subscription, event); break; // 要求在主线程处理事件 case MAIN: // 若是发布事件在主线程,直接执行 if (isMainThread) { invokeSubscriber(subscription, event); } else { // 若是发布事件在子线程,先将事件入队列,而后经过Handler切换到主线程执行 mainThreadPoster.enqueue(subscription, event); } break; // 要求在主线程处理事件 case MAIN_ORDERED: // 不管发布事件在哪一个线程,都会把事件入队列,而后经过Handler切换到主线程执行 if (mainThreadPoster != null) { mainThreadPoster.enqueue(subscription, event); } else { // temporary: technically not correct as poster not decoupled from subscriber invokeSubscriber(subscription, event); } break; // 要求在后台线程处理事件 case BACKGROUND: // 若是发布事件在主线程,先将事件入队列,而后丢到线程池处理,串行处理 if (isMainThread) { backgroundPoster.enqueue(subscription, event); } else { // 若是发布事件在子线程,直接执行 invokeSubscriber(subscription, event); } break; // 要求在异步线程处理事件 case ASYNC: // 不管发布事件在哪一个线程,都会先入队列,而后丢到线程池处理,并行处理 asyncPoster.enqueue(subscription, event); break; default: throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode); } }
线程切换其实就是根据订阅事件方法的线程模型及发布事件的线程来决定如何处理,处理方式分为两种:一种在相应的线程直接经过invokeSubscriber方法,使用反射技术来执行订阅事件方法
void invokeSubscriber(Subscription subscription, Object event) { try { subscription.subscriberMethod.method.invoke(subscription.subscriber, event); } catch (InvocationTargetException e) { handleSubscriberException(subscription, event, e.getCause()); } catch (IllegalAccessException e) { throw new IllegalStateException("Unexpected exception", e); } }
还有一种就是先将事件入队列(底层是双向链表结构实现),而后交给Handler或者线程池处理。以ASYNC线程模型为例, asyncPoster是AsyncPoster类的一个实例
class AsyncPoster implements Runnable, Poster { private final PendingPostQueue queue; private final EventBus eventBus; AsyncPoster(EventBus eventBus) { this.eventBus = eventBus; // PendingPostQueue是一个双向链表 queue = new PendingPostQueue(); } public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); // 加入链表的尾部 queue.enqueue(pendingPost); // 直接丢给线程池处理 eventBus.getExecutorService().execute(this); } @Override public void run() { PendingPost pendingPost = queue.poll(); if(pendingPost == null) { throw new IllegalStateException("No pending post available"); } eventBus.invokeSubscriber(pendingPost); } }
先分析到这,接下来分析粘性事件,订阅索引,AsyncExecutor等~~~