title: 源码阅读系列:EventBus
date: 2016-12-22 16:16:47
tags: 源码阅读
---java
EventBus 是人们在平常开发中常常会用到的开源库,即便是不直接用的人,也多少借鉴过事件总线的用法。并且EventBus的代码实际上是很是简单的,能够试着阅读一下。android
源码阅读系列不采用对功能进行归类的方法进行阅读,而是采用一个刚开始阅读源码的视角,从咱们平时的API调用,一步步的去理解设计意图和实现原理。git
从这里开始吧,咱们最经常使用的地方就是给一个函数添加上注解,咱们先抛开apt生成的table,只看这个运行时版本的订阅设定。github
// eventbus/Subscribe @Documented @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.METHOD}) public @interface Subscribe { ThreadMode threadMode() default ThreadMode.POSTING; /** * If true, delivers the most recent sticky event (posted with * {@link EventBus#postSticky(Object)}) to this subscriber (if event available). */ boolean sticky() default false; /** Subscriber priority to influence the order of event delivery. * Within the same delivery thread ({@link ThreadMode}), higher priority subscribers will receive events before * others with a lower priority. The default priority is 0. Note: the priority does *NOT* affect the order of * delivery among subscribers with different {@link ThreadMode}s! */ int priority() default 0; }
这个设定仍是很是简单的,并且都是咱们熟悉的东西,线程类型(默认的是抛出线程),是不是粘性事件,时间的优先级。通过这个类的出现,咱们就能够在类里面写咱们常常写的某个函数是订阅函数了。缓存
@Subscribe (...) public void getMessage(Event event) { ... }
下面的问题是咱们改怎么让EventBus找到这些方法呢?经过apt的版本咱们知道这里面确定有一个map或者是table的东西记录了Object和Method之间的订阅关系,并且仍是一对多的。这个地方就是从每一个咱们进行register的地方进行的。多线程
// eventbus/EventBus /** * Registers the given subscriber to receive events. Subscribers must call {@link #unregister(Object)} once they * are no longer interested in receiving events. * <p/> * Subscribers have event handling methods that must be annotated by {@link Subscribe}. * The {@link Subscribe} annotation also allows configuration like {@link * ThreadMode} and priority. */ public void register(Object subscriber) { Class<?> subscriberClass = subscriber.getClass(); List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass); synchronized (this) { for (SubscriberMethod subscriberMethod : subscriberMethods) { subscribe(subscriber, subscriberMethod); } } }
咱们在Activity/Fragment中都有可能会调用这个方法,若是是Fragment里面咱们还会在onDestoryView()
中进行unregister(...)
。在这段函数里咱们发现使用反射从这个Class中找到了全部的订阅者函数了,而后对每一个订阅者函数进行注册。并发
这里咱们看看咱们的SubribeMethod被包装成了什么样子:app
/** Used internally by EventBus and generated subscriber indexes. */ public class SubscriberMethod { final Method method; final ThreadMode threadMode; final Class<?> eventType; final int priority; final boolean sticky; /** Used for efficient comparison */ String methodString; public SubscriberMethod(Method method, Class<?> eventType, ThreadMode threadMode, int priority, boolean sticky) { this.method = method; this.threadMode = threadMode; this.eventType = eventType; this.priority = priority; this.sticky = sticky; } @Override public boolean equals(Object other) { if (other == this) { return true; } else if (other instanceof SubscriberMethod) { checkMethodString(); SubscriberMethod otherSubscriberMethod = (SubscriberMethod)other; otherSubscriberMethod.checkMethodString(); // Don't use method.equals because of http://code.google.com/p/android/issues/detail?id=7811#c6 return methodString.equals(otherSubscriberMethod.methodString); } else { return false; } } private synchronized void checkMethodString() { if (methodString == null) { // Method.toString has more overhead, just take relevant parts of the method StringBuilder builder = new StringBuilder(64); builder.append(method.getDeclaringClass().getName()); builder.append('#').append(method.getName()); builder.append('(').append(eventType.getName()); methodString = builder.toString(); } } @Override public int hashCode() { return method.hashCode(); } }
SubscribeMethod 携带了Method函数原型,还有就是咱们在注解类里面提供的全部信息。还有一个Class<?>类型的EventType是指咱们的事件类所对应的Class,其他的方法都是为了比较和判断是否相等来作的,equal/checkMethodString都是各类的拼字串来进行存储和判断。异步
下面咱们来看register里面调用的这段subscribe,这段很是的重要涉及了EventBus运行时处理的绝大多数部分,还有就是粘性事件的分发。这段使用了大量的JDK的反射包的API,自己注释也提醒咱们了这段代码须要加锁,毕竟里面这一堆并发容器。因此咱们最好先明确这段里面用的并发容器到底都是什么,这段代码才好继续看的下去。async
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType; private final Map<Object, List<Class<?>>> typesBySubscriber; private final Map<Class<?>, Object> stickyEvents;
主要的有这几个:
Subscription
从名字上也能看出Key是Event的Class对象。知道这三个都是什么以后,这段代码就好看了。咱们来看前一部分。
// Must be called in synchronized block private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) { Class<?> eventType = subscriberMethod.eventType; Subscription newSubscription = new Subscription(subscriber, subscriberMethod); // Map<Class<?>, CopyOnWriteArrayList<Subscription>> CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType) ; if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList<>(); subscriptionsByEventType.put(eventType, subscriptions); } else { if (subscriptions.contains(newSubscription)) { throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } int size = subscriptions.size(); for (int i = 0; i <= size; i++) { if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) { subscriptions.add(i, newSubscription); break; } } List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber); if (subscribedEvents == null) { subscribedEvents = new ArrayList<>(); typesBySubscriber.put(subscriber, subscribedEvents); } subscribedEvents.add(eventType);
这段写的虽然有点乱套,但实际上写的挺简单的,并且一堆堆的O(n)遍历,性能也就那样(?)。
首先这里面出现了Subscription:
final class Subscription { final Object subscriber; final SubscriberMethod subscriberMethod; /** * Becomes false as soon as {@link EventBus#unregister(Object)} is called, which is checked by queued event delivery * {@link EventBus#invokeSubscriber(PendingPost)} to prevent race conditions. */ volatile boolean active; Subscription(Object subscriber, SubscriberMethod subscriberMethod) { this.subscriber = subscriber; this.subscriberMethod = subscriberMethod; active = true; } @Override public boolean equals(Object other) { if (other instanceof Subscription) { Subscription otherSubscription = (Subscription) other; return subscriber == otherSubscription.subscriber && subscriberMethod.equals(otherSubscription.subscriberMethod); } else { return false; } } @Override public int hashCode() { return subscriber.hashCode() + subscriberMethod.methodString.hashCode(); } }
咱们发现了这是订阅者和订阅方法类的一个契约关系类。
因此说上面subscribe函数主要作了,
subscriptionsByEventType
中typesBySubscriber
添加了对应的类型而后咱们能够看一下这个函数的下一半,咱们会惊奇地发现,StickyEvent的发送时机竟然是在register的时候:
... if (subscriberMethod.sticky) { if (eventInheritance) { // Existing sticky events of all subclasses of eventType have to be considered. // Note: Iterating over all events may be inefficient with lots of sticky events, // thus data structure should be changed to allow a more efficient lookup // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>). Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet(); for (Map.Entry<Class<?>, Object> entry : entries) { Class<?> candidateEventType = entry.getKey(); if (eventType.isAssignableFrom(candidateEventType)) { Object stickyEvent = entry.getValue(); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } } else { Object stickyEvent = stickyEvents.get(eventType); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } }
这时候轮了一遍全部的粘性事件。isAssignableFrom相似于使用在Class之间的instance of
就是判断两个类是否有相同的接口关系,也就是说有继承和实现关系的事件类,都会被判断处理。
private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) { if (stickyEvent != null) { // If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state) // --> Strange corner case, which we don't take care of here. postToSubscription(newSubscription, stickyEvent, Looper.getMainLooper() == Looper.myLooper()); } }
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) { switch (subscription.subscriberMethod.threadMode) { case POSTING: invokeSubscriber(subscription, event); break; case MAIN: if (isMainThread) { invokeSubscriber(subscription, event); } else { mainThreadPoster.enqueue(subscription, event); } break; case BACKGROUND: if (isMainThread) { backgroundPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break; case ASYNC: asyncPoster.enqueue(subscription, event); break; default: throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode); } }
以后就是针对各类的ThreadMode进行了处理,同一线程的直接依赖Java的反射invoke执行了,各类不能够的状况,好比说发到主线程但还没在主线程的时候,都是用队列进行发送到对应线程。
接下来咱们看看这里面在各线程之间的发送是怎么实现的。
咱们发如今Subscription和event入队的时候咱们把他们封装成了一个PendingPost类:
// HandlePoster void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) { queue.enqueue(pendingPost); if (!handlerActive) { handlerActive = true; if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } } } }
而后才进行的入队和发送,这个PendingPost就是一个带有回收池的掩饰传送类:
final class PendingPost { private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>(); Object event; Subscription subscription; PendingPost next; private PendingPost(Object event, Subscription subscription) { this.event = event; this.subscription = subscription; } static PendingPost obtainPendingPost(Subscription subscription, Object event) { synchronized (pendingPostPool) { int size = pendingPostPool.size(); if (size > 0) { PendingPost pendingPost = pendingPostPool.remove(size - 1); pendingPost.event = event; pendingPost.subscription = subscription; pendingPost.next = null; return pendingPost; } } return new PendingPost(event, subscription); } static void releasePendingPost(PendingPost pendingPost) { pendingPost.event = null; pendingPost.subscription = null; pendingPost.next = null; synchronized (pendingPostPool) { // Don't let the pool grow indefinitely if (pendingPostPool.size() < 10000) { pendingPostPool.add(pendingPost); } } } }
这里的设计其实挺不错的,一个静态的回收池,初始化靠一个静态方法,优先使用被回收的对象,实现和Message
其实很像。另外一个release方法就是把用完的对象回收起来。
PendingPostQueue
就是一个PendingPost的队列,里面的操做基本上就是入队出队之类的,有点特殊的是入队和出队都有一把锁。
接着这个队列被用在了好几个Poster类中,实现了向各个线程的消息转换,首先咱们来看向主线程发送数据的:
final class HandlerPoster extends Handler { private final PendingPostQueue queue; private final int maxMillisInsideHandleMessage; private final EventBus eventBus; private boolean handlerActive; HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) { super(looper); this.eventBus = eventBus; this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage; queue = new PendingPostQueue(); } void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) { queue.enqueue(pendingPost); if (!handlerActive) { handlerActive = true; if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } } } } @Override public void handleMessage(Message msg) { boolean rescheduled = false; try { long started = SystemClock.uptimeMillis(); while (true) { PendingPost pendingPost = queue.poll(); if (pendingPost == null) { synchronized (this) { // Check again, this time in synchronized pendingPost = queue.poll(); if (pendingPost == null) { handlerActive = false; return; } } } eventBus.invokeSubscriber(pendingPost); long timeInMethod = SystemClock.uptimeMillis() - started; if (timeInMethod >= maxMillisInsideHandleMessage) { if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } rescheduled = true; return; } } } finally { handlerActive = rescheduled; } } }
HandlePoster 继承自 Handler 再加上初始化的时候传进去的是Looper.getMainThread();因此能向主线程发送消息。每次入队以后都会发送一条空消息去通知handleMessage函数处理队列数据,使用handlerActive做为控制标记位。handleMessage是个死循环两段的if判断用来处理多线程的状况,invokeSubscriber的方式和以前相似。以后就是有一个阀值,当时间超过10ms的时候就会发一个消息重入,而且退出此次循环,这是防止时间太长阻塞主线程。
final class BackgroundPoster implements Runnable { private final PendingPostQueue queue; private final EventBus eventBus; private volatile boolean executorRunning; BackgroundPoster(EventBus eventBus) { this.eventBus = eventBus; queue = new PendingPostQueue(); } public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) { queue.enqueue(pendingPost); if (!executorRunning) { executorRunning = true; eventBus.getExecutorService().execute(this); } } } @Override public void run() { try { try { while (true) { PendingPost pendingPost = queue.poll(1000); if (pendingPost == null) { synchronized (this) { // Check again, this time in synchronized pendingPost = queue.poll(); if (pendingPost == null) { executorRunning = false; return; } } } eventBus.invokeSubscriber(pendingPost); } } catch (InterruptedException e) { Log.w("Event", Thread.currentThread().getName() + " was interruppted", e); } } finally { executorRunning = false; } } }
BackgroundPoster
自身是一个Runnable ,入队以后就调用EventBus携带的一个线程池进行运行,一样也是一个死循环,用了一个生产者 vs 消费者模式
进行了有限等待,这1000ms内入队的消息都会被弹出处理。
synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException { if (head == null) { wait(maxMillisToWait); } return poll(); }
PendingPostQueue的poll(int)方法对队列为空的状况进行了等待,唤醒则出如今enqueue:
synchronized void enqueue(PendingPost pendingPost) { if (pendingPost == null) { throw new NullPointerException("null cannot be enqueued"); } if (tail != null) { tail.next = pendingPost; tail = pendingPost; } else if (head == null) { head = tail = pendingPost; } else { throw new IllegalStateException("Head present, but no tail"); } notifyAll(); // 在这进行了唤醒 }
若是说Background尚且能保证在同一个线程内完成,AsyncPoster就彻底进行了异步操做。
class AsyncPoster implements Runnable { private final PendingPostQueue queue; private final EventBus eventBus; AsyncPoster(EventBus eventBus) { this.eventBus = eventBus; queue = new PendingPostQueue(); } public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); queue.enqueue(pendingPost); eventBus.getExecutorService().execute(this); } @Override public void run() { PendingPost pendingPost = queue.poll(); if(pendingPost == null) { throw new IllegalStateException("No pending post available"); } eventBus.invokeSubscriber(pendingPost); } }
这里面基本上什么都不控制,直接就来一个运行一次,也不会有什么问题。。。
到这为止咱们不但知道了方法是怎么注册和绑定的,咱们甚至还知道了粘性事件是怎么发送的了,接着咱们来看方法查找和普通事件的发送是怎么进行的。
// package org.greenrobot.eventbus.meta; /** Base class for generated index classes created by annotation processing. */ public interface SubscriberInfo { // 获取订阅的类 Class<?> getSubscriberClass(); // 全部的method SubscriberMethod[] getSubscriberMethods(); // 获取父类的info SubscriberInfo getSuperSubscriberInfo(); // 是否检查父类 boolean shouldCheckSuperclass(); }
SubscriberInfo
描述了能经过注解类生成的Index的方法(具体功能我加了主食)。
/** * Interface for generated indexes. */ public interface SubscriberInfoIndex { SubscriberInfo getSubscriberInfo(Class<?> subscriberClass); }
这个接口是查找info的。
另外能够说这其中的SubscriberMethodInfo
存储着SubscriberMethod
所需的元信息:
public class SubscriberMethodInfo { final String methodName; final ThreadMode threadMode; final Class<?> eventType; final int priority; final boolean sticky; ...
AbstractSubscriberInfo
是一个抽象类,主要负责从Info建立出Method,又是一个反射:
protected SubscriberMethod createSubscriberMethod(String methodName, Class<?> eventType, ThreadMode threadMode, int priority, boolean sticky) { try { Method method = subscriberClass.getDeclaredMethod(methodName, eventType); return new SubscriberMethod(method, eventType, threadMode, priority, sticky); } catch (NoSuchMethodException e) { throw new EventBusException("Could not find subscriber method in " + subscriberClass + ". Maybe a missing ProGuard rule?", e); } }
另外还有一个SimpleSubscriberInfo做为他的子类。
接下来的SubscriberMethodFinder
也很是重要运行时的方法查找都来自这里:
刚才咱们在EventBus.register(...)
中调用了这个函数:
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) { List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass); if (subscriberMethods != null) { return subscriberMethods; } if (ignoreGeneratedIndex) { subscriberMethods = findUsingReflection(subscriberClass); } else { subscriberMethods = findUsingInfo(subscriberClass); } if (subscriberMethods.isEmpty()) { throw new EventBusException("Subscriber " + subscriberClass + " and its super classes have no public methods with the @Subscribe annotation"); } else { METHOD_CACHE.put(subscriberClass, subscriberMethods); return subscriberMethods; } }
其中的METHOD_CACHE是对每一个类方法进行缓存,防止屡次查找,毕竟运行时查找仍是个复杂的操做,根据是否忽略生成Index。
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) { FindState findState = prepareFindState(); findState.initForSubscriber(subscriberClass); while (findState.clazz != null) { findUsingReflectionInSingleClass(findState); findState.moveToSuperclass(); } return getMethodsAndRelease(findState); } private void findUsingReflectionInSingleClass(FindState findState) { Method[] methods; try { // This is faster than getMethods, especially when subscribers are fat classes like Activities methods = findState.clazz.getDeclaredMethods(); } catch (Throwable th) { // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149 methods = findState.clazz.getMethods(); findState.skipSuperClasses = true; } for (Method method : methods) { int modifiers = method.getModifiers(); if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) { Class<?>[] parameterTypes = method.getParameterTypes(); if (parameterTypes.length == 1) { Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class); if (subscribeAnnotation != null) { Class<?> eventType = parameterTypes[0]; if (findState.checkAdd(method, eventType)) { ThreadMode threadMode = subscribeAnnotation.threadMode(); findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode, subscribeAnnotation.priority(), subscribeAnnotation.sticky())); } } } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) { String methodName = method.getDeclaringClass().getName() + "." + method.getName(); throw new EventBusException("@Subscribe method " + methodName + "must have exactly 1 parameter but has " + parameterTypes.length); } } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) { String methodName = method.getDeclaringClass().getName() + "." + method.getName(); throw new EventBusException(methodName + " is a illegal @Subscribe method: must be public, non-static, and non-abstract"); } } }
findUsingReflectionInSingleClass对反射类进行了处理,这里面经过掩模运算检查了访问权限, 检查了参数个数。
boolean checkAdd(Method method, Class<?> eventType) { // 2 level check: 1st level with event type only (fast), 2nd level with complete signature when required. // Usually a subscriber doesn't have methods listening to the same event type. Object existing = anyMethodByEventType.put(eventType, method); if (existing == null) { return true; } else { if (existing instanceof Method) { if (!checkAddWithMethodSignature((Method) existing, eventType)) { // Paranoia check throw new IllegalStateException(); } // Put any non-Method object to "consume" the existing Method anyMethodByEventType.put(eventType, this); } return checkAddWithMethodSignature(method, eventType); } }
其中的checkAdd检查了类型和方法签名,每次轮转完成以后都会进行一次findState.moveToSuperclass();对父类进行处理。
由于反射所使用的运行时查找速度缓慢,因此咱们也常常会经过apt使用已经建立好的Index。
刚才另外一个分支的findUsingInfo
就是使用已有的Index:
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) { FindState findState = prepareFindState(); findState.initForSubscriber(subscriberClass); while (findState.clazz != null) { findState.subscriberInfo = getSubscriberInfo(findState); if (findState.subscriberInfo != null) { SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods(); for (SubscriberMethod subscriberMethod : array) { if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) { findState.subscriberMethods.add(subscriberMethod); } } } else { findUsingReflectionInSingleClass(findState); } findState.moveToSuperclass(); } return getMethodsAndRelease(findState); }
这段很是简单,几乎就是刚才的验证而已,若是没拿到数据的话,还会进行正常的反射查找。
// EventBusAnnotationProcessor 负责生成注解路由表 private void createInfoIndexFile(String index) { BufferedWriter writer = null; try { JavaFileObject sourceFile = processingEnv.getFiler().createSourceFile(index); int period = index.lastIndexOf('.'); String myPackage = period > 0 ? index.substring(0, period) : null; String clazz = index.substring(period + 1); writer = new BufferedWriter(sourceFile.openWriter()); if (myPackage != null) { writer.write("package " + myPackage + ";\n\n"); } writer.write("import org.greenrobot.eventbus.meta.SimpleSubscriberInfo;\n"); writer.write("import org.greenrobot.eventbus.meta.SubscriberMethodInfo;\n"); writer.write("import org.greenrobot.eventbus.meta.SubscriberInfo;\n"); writer.write("import org.greenrobot.eventbus.meta.SubscriberInfoIndex;\n\n"); writer.write("import org.greenrobot.eventbus.ThreadMode;\n\n"); writer.write("import java.util.HashMap;\n"); writer.write("import java.util.Map;\n\n"); writer.write("/** This class is generated by EventBus, do not edit. */\n"); writer.write("public class " + clazz + " implements SubscriberInfoIndex {\n"); writer.write(" private static final Map<Class<?>, SubscriberInfo> SUBSCRIBER_INDEX;\n\n"); writer.write(" static {\n"); writer.write(" SUBSCRIBER_INDEX = new HashMap<Class<?>, SubscriberInfo>();\n\n"); writeIndexLines(writer, myPackage); writer.write(" }\n\n"); writer.write(" private static void putIndex(SubscriberInfo info) {\n"); writer.write(" SUBSCRIBER_INDEX.put(info.getSubscriberClass(), info);\n"); writer.write(" }\n\n"); writer.write(" @Override\n"); writer.write(" public SubscriberInfo getSubscriberInfo(Class<?> subscriberClass) {\n"); writer.write(" SubscriberInfo info = SUBSCRIBER_INDEX.get(subscriberClass);\n"); writer.write(" if (info != null) {\n"); writer.write(" return info;\n"); writer.write(" } else {\n"); writer.write(" return null;\n"); writer.write(" }\n"); writer.write(" }\n"); writer.write("}\n"); } catch (IOException e) { throw new RuntimeException("Could not write source for " + index, e); } finally { if (writer != null) { try { writer.close(); } catch (IOException e) { //Silent } } } } private void writeIndexLines(BufferedWriter writer, String myPackage) throws IOException { for (TypeElement subscriberTypeElement : methodsByClass.keySet()) { if (classesToSkip.contains(subscriberTypeElement)) { continue; } String subscriberClass = getClassString(subscriberTypeElement, myPackage); if (isVisible(myPackage, subscriberTypeElement)) { writeLine(writer, 2, "putIndex(new SimpleSubscriberInfo(" + subscriberClass + ".class,", "true,", "new SubscriberMethodInfo[] {"); List<ExecutableElement> methods = methodsByClass.get(subscriberTypeElement); writeCreateSubscriberMethods(writer, methods, "new SubscriberMethodInfo", myPackage); writer.write(" }));\n\n"); } else { writer.write(" // Subscriber not visible to index: " + subscriberClass + "\n"); } } }
有了这两个方法以后咱们就知道,日常的index就是经过这种方式拼接出来的。
/** Posts the given event to the event bus. */ public void post(Object event) { PostingThreadState postingState = currentPostingThreadState.get(); List<Object> eventQueue = postingState.eventQueue; eventQueue.add(event); if (!postingState.isPosting) { postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper(); postingState.isPosting = true; if (postingState.canceled) { throw new EventBusException("Internal error. Abort state was not reset"); } try { while (!eventQueue.isEmpty()) { postSingleEvent(eventQueue.remove(0), postingState); } } finally { postingState.isPosting = false; postingState.isMainThread = false; } } }
PostingThreadState是一个存储在ThreadLocal中的对象,包含有如下各类内容,线程信息,是不是主线程,是否取消,还有一个相应的事件队列。
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error { Class<?> eventClass = event.getClass(); boolean subscriptionFound = false; if (eventInheritance) { /** Looks up all Class objects including super classes and interfaces. Should also work for interfaces. */ List<Class<?>> eventTypes = lookupAllEventTypes(eventClass); int countTypes = eventTypes.size(); // 对全部的订阅函数,都调用发送数据 for (int h = 0; h < countTypes; h++) { // 全部的订阅类 Class<?> clazz = eventTypes.get(h); subscriptionFound |= postSingleEventForEventType(event, postingState, clazz); } } else { // 只发送一次 subscriptionFound = postSingleEventForEventType(event, postingState, eventClass); } if (!subscriptionFound) { if (logNoSubscriberMessages) { Log.d(TAG, "No subscribers registered for event " + eventClass); } if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) { // 无订阅者的处理 post(new NoSubscriberEvent(this, event)); } } }
以后:
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) { CopyOnWriteArrayList<Subscription> subscriptions; synchronized (this) { subscriptions = subscriptionsByEventType.get(eventClass); } if (subscriptions != null && !subscriptions.isEmpty()) { for (Subscription subscription : subscriptions) { postingState.event = event; postingState.subscription = subscription; boolean aborted = false; try { postToSubscription(subscription, event, postingState.isMainThread); aborted = postingState.canceled; } finally { postingState.event = null; postingState.subscription = null; postingState.canceled = false; } if (aborted) { break; } } return true; } return false; }
以后对全部的订阅类的全部订阅者都发送一次数据,发送数据方法和上文相同。
发送粘性数据就是拿锁而后保存到队列中去,这样就能够在从新发送:
public void postSticky(Object event) { synchronized (stickyEvents) { stickyEvents.put(event.getClass(), event); } // Should be posted after it is putted, in case the subscriber wants to remove immediately post(event); }
由于咱们没法肯定何时粘性事件应该中止继续传播,这取决于咱们应用的须要,因此咱们应当手动remove掉Sticky Event :
// 系统提供了以下方法 public <T> T removeStickyEvent(Class<T> eventType) { synchronized (stickyEvents) { return eventType.cast(stickyEvents.remove(eventType)); } } public boolean removeStickyEvent(Object event) { synchronized (stickyEvents) { Class<?> eventType = event.getClass(); Object existingEvent = stickyEvents.get(eventType); if (event.equals(existingEvent)) { stickyEvents.remove(eventType); return true; } else { return false; } } } public void removeAllStickyEvents() { synchronized (stickyEvents) { stickyEvents.clear(); } }
至此咱们就分析完了EventBus的基本上全部的代码(处理util包下的错误日志),EventBus自己的实现并不复杂,使用运行时的反射技巧也很简单,单纯的使用注解类可能会拖慢速度,可是经过apt生成的静态表把速降提高到了一个新的高度,apt的生成你们也看到了并非很复杂,几乎就是类型检查和拼接字串,不过想法决定了EventBus仍然是一个优秀的开源库,但愿咱们在使用的同时,仍能对实现原理有所了解。