简言:java
相信你们都使用过EventBus,用起来真的是奥利给啊,使用简单,开销还小,这种异步框架真的是让人爱不释手啊,固然了,有一大部分人更加喜欢Rxjava这个框架,毕竟这个框架更NB, 可是今天得主角不是Rxjava,是EventBus,带你们看看他的内部实现是什么?固然在讲源码以前,仍是先介绍一下EventBus,我保证,只是介绍啊, 毕竟我也不想啰嗦。。。android
1.简介
EvenBus是一个android端优化的消息总线,简化了应用程序内各组件,组件与后台线程间的通讯,好比请求网络,等网络返回时经过Handler或者BroadCast通知UI,两个Fragement之间须要经过listener通讯,这些需求均可以经过Evenbus实现;
evenBus是一款针对 android 优化的发布/订阅事件总线。主要功能是代替Intent handler Broacaset 在fragement activity
service线程之间的传递消息,优势是开销小,代码更优雅,以及将发送者和接收者解耦.git
2.基本的使用:github
1,自定义一个类,能够是空类;
2.在要接收消息的页面注册
3.发送消息
4.接收消息的页面实现(共有四个函数,各功能不一样)
5.解除注册
网络
3.EventBus的流程 (如图:官网找的)app
publisher是一个发布器,而后将咱们的事件Event经过post()方法发送到EventBus的主线当中,而后在这个EventBus的主线中,会经过事件(Event)类型匹配相应的订阅者 Subscriber。框架
4.源码分析(EventBus)异步
@Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main2); findViewById(R.id.btn).setOnClickListener(new View.OnClickListener() { @Override public void onClick(View v) { //发送消息 EventBus.getDefault().post(new MyEvents("hello eventbus")); } }); } @Override protected void onStart() { super.onStart(); //注册EventBus EventBus.getDefault().register(this); } @Override protected void onStop() { super.onStop(); //注销EventBus EventBus.getDefault().unregister(this); } @Subscribe(threadMode = ThreadMode.MAIN) public void onMessageEvent(MyEvents events){ Toast.makeText(this,events.message,Toast.LENGTH_SHORT).show(); }
这个是Event的使用类,咱们看在使用EventBus时,不管注册仍是注销发送消息,都有getDefault()这个方法,因此咱们先看看这个getDefault方法实现了什么?async
/** Convenience singleton for apps using a process-wide EventBus instance. */ public static EventBus getDefault() { if (defaultInstance == null) { synchronized (EventBus.class) { if (defaultInstance == null) { defaultInstance = new EventBus(); } } } return defaultInstance; } public static EventBusBuilder builder() { return new EventBusBuilder(); } /** For unit test primarily. */ public static void clearCaches() { SubscriberMethodFinder.clearCaches(); eventTypesCache.clear(); } /** * Creates a new EventBus instance; each instance is a separate scope in which events are delivered. To use a * central bus, consider {@link #getDefault()}. */ public EventBus() { this(DEFAULT_BUILDER); }
这里没有什么可介绍的,这是一个单例模式,咱们关注一下这个EventBus的构造函数,DEFAULT_BUILDER,咱们跟进去看一下:ide
咱们能够看出,这个EventBus最终是构建者模式来建立对象的。咱们看看他是如何建立的,
EventBus(EventBusBuilder builder) { subscriptionsByEventType = new HashMap<>(); typesBySubscriber = new HashMap<>(); stickyEvents = new ConcurrentHashMap<>(); mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10); backgroundPoster = new BackgroundPoster(this); asyncPoster = new AsyncPoster(this); indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0; subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes, builder.strictMethodVerification, builder.ignoreGeneratedIndex); logSubscriberExceptions = builder.logSubscriberExceptions; logNoSubscriberMessages = builder.logNoSubscriberMessages; sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent; sendNoSubscriberEvent = builder.sendNoSubscriberEvent; throwSubscriberException = builder.throwSubscriberException; eventInheritance = builder.eventInheritance; executorService = builder.executorService; }
subscriptionsByEventType = new HashMap<>();
typesBySubscriber = new HashMap<>();
stickyEvents = new ConcurrentHashMap<>();
这三个HashMap,都是作什么?
第一个:它是以Event为key,subscript为value,当发送Event时,均可以经过这个HashMap找到对应订阅者。
第二个:它是以Subscriber为key,event为value,当咱们作反注册的时候都会操做这个hashMap
第三个:这个是维护的粘性事件,我以前讲过粘性事件的定义,这里就不追溯了。
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
backgroundPoster = new BackgroundPoster(this);
asyncPoster = new AsyncPoster(this);
咱们在看看这三个post(重要)
1)HandlerPoster:咱们看参数,传递的是主线程的Looper,是handler现实的,咱们追进去看看handerPoster作了什么?
final class HandlerPoster extends Handler { private final PendingPostQueue queue; private final int maxMillisInsideHandleMessage; private final EventBus eventBus; private boolean handlerActive; HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) { super(looper); this.eventBus = eventBus; this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage; queue = new PendingPostQueue(); } void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) { queue.enqueue(pendingPost); if (!handlerActive) { handlerActive = true; if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } } } } @Override public void handleMessage(Message msg) { boolean rescheduled = false; try { long started = SystemClock.uptimeMillis(); while (true) { PendingPost pendingPost = queue.poll(); if (pendingPost == null) { synchronized (this) { // Check again, this time in synchronized pendingPost = queue.poll(); if (pendingPost == null) { handlerActive = false; return; } } } eventBus.invokeSubscriber(pendingPost); long timeInMethod = SystemClock.uptimeMillis() - started; if (timeInMethod >= maxMillisInsideHandleMessage) { if (!sendMessage(obtainMessage())) { throw new EventBusException("Could not send handler message"); } rescheduled = true; return; } } } finally { handlerActive = rescheduled; } } }
这个HandlerPoser继承Handler,
PendingPostQueue queue; 这是一个队列
int maxMillisInsideHandleMessage; post这个事件在hanlder中最大的时间值
EventBus eventBus;
handlerActive; 他标识的是是否运行起来了
咱们关注一下handleMessage方法作了什么?
经过do,while这个循环,从队列中获取数据,并调用eventBus,invokeSubscriber()分发事件,每分发完一次事件,就对比一下时间,判断这个时间,与上边定义的最大时间值,作比较,若是大于就跳出循环。
2)backgroundPoster (处理后台操做的)
final class BackgroundPoster implements Runnable { private final PendingPostQueue queue; private final EventBus eventBus; private volatile boolean executorRunning; BackgroundPoster(EventBus eventBus) { this.eventBus = eventBus; queue = new PendingPostQueue(); } public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) { queue.enqueue(pendingPost); if (!executorRunning) { executorRunning = true; eventBus.getExecutorService().execute(this); } } } @Override public void run() { try { try { while (true) { PendingPost pendingPost = queue.poll(1000); if (pendingPost == null) { synchronized (this) { // Check again, this time in synchronized pendingPost = queue.poll(); if (pendingPost == null) { executorRunning = false; return; } } } eventBus.invokeSubscriber(pendingPost); } } catch (InterruptedException e) { Log.w("Event", Thread.currentThread().getName() + " was interruppted", e); } } finally { executorRunning = false; } } }
他实现的是Runnalbe,咱们直接看他的run()方法:run()主要是不断的从咱们队列中获取消息,而后经过invokeSubscruber进行事件分发。直到取完为止。
3)asyncPoster
class AsyncPoster implements Runnable { private final PendingPostQueue queue; private final EventBus eventBus; AsyncPoster(EventBus eventBus) { this.eventBus = eventBus; queue = new PendingPostQueue(); } public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); queue.enqueue(pendingPost); eventBus.getExecutorService().execute(this); } @Override public void run() { PendingPost pendingPost = queue.poll(); if(pendingPost == null) { throw new IllegalStateException("No pending post available"); } eventBus.invokeSubscriber(pendingPost); } }
它也是实现的Runnable,
他的run方法,是获取队列中的一个,进行事件分发,与上边的post是不一样的,
上边三个post是EventBus中最核心的类,
3.subscriberMethodFinder(这个也是重要的,他是注解的找寻器)
二,关于注解的分析
@Subscribe(threadMode = ThreadMode.MAIN) public void onMessageEvent(MyEvents events){ Toast.makeText(this,events.message,Toast.LENGTH_SHORT).show(); }
咱们仍是看一下@Subscribe的源码:
@Documented @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.METHOD}) public @interface Subscribe { ThreadMode threadMode() default ThreadMode.POSTING; /** * If true, delivers the most recent sticky event (posted with * {@link EventBus#postSticky(Object)}) to this subscriber (if event available). */ boolean sticky() default false; /** Subscriber priority to influence the order of event delivery. * Within the same delivery thread ({@link ThreadMode}), higher priority subscribers will receive events before * others with a lower priority. The default priority is 0. Note: the priority does *NOT* affect the order of * delivery among subscribers with different {@link ThreadMode}s! */ int priority() default 0; }
ThreadMode 线程模式。这个是很是重要的,咱们分析一下:
public enum ThreadMode { POSTING, MAIN, BACKGROUND, ASYNC }
1)POSTING:一种默认线程模式,表示在执行post事件操做的时候线程直接调订阅者的方法,不管该线程是否在主线程。
2)MAIN:表示在主线程中执行这个方法,
3)BACKGROUND:在后台线程中执行相应的方法。
4)ASYNC:不管发布的是否在主线程。它都会发布一个空线程进行处理。它线程独立,不会出现卡顿。
2.sticky: 这是定义的粘性事件,
3.priority: 优先级,默认状况下是POSTING.
三,register() 注册订阅
public void register(Object subscriber) { Class<?> subscriberClass = subscriber.getClass(); List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass); synchronized (this) { for (SubscriberMethod subscriberMethod : subscriberMethods) { subscribe(subscriber, subscriberMethod); } } }
第一行代码:经过反射获取到咱们的subscriberClass对象,
第二行代码:经过咱们获取到的对象找到对应的集合。subscriberMethodFinder这个找寻器进行寻找。
咱们看一下findSubscriberMethods这个方法源码:
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) { List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass); if (subscriberMethods != null) { return subscriberMethods; } if (ignoreGeneratedIndex) { subscriberMethods = findUsingReflection(subscriberClass); } else { subscriberMethods = findUsingInfo(subscriberClass); } if (subscriberMethods.isEmpty()) { throw new EventBusException("Subscriber " + subscriberClass + " and its super classes have no public methods with the @Subscribe annotation"); } else { METHOD_CACHE.put(subscriberClass, subscriberMethods); return subscriberMethods; } }
咱们看一下findUsingInfo这个方法的源码:
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) { FindState findState = prepareFindState(); findState.initForSubscriber(subscriberClass); while (findState.clazz != null) { findState.subscriberInfo = getSubscriberInfo(findState); if (findState.subscriberInfo != null) { SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods(); for (SubscriberMethod subscriberMethod : array) { if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) { findState.subscriberMethods.add(subscriberMethod); } } } else { findUsingReflectionInSingleClass(findState); } findState.moveToSuperclass(); } return getMethodsAndRelease(findState); }
while这个循环,经过getSubScriberInfo这个方法返回的不为null,
首先获取订阅方法的集合,经过for循序遍历咱们的订阅方法,并经过checkAdd这个方法进行过滤,将符合的添加到subscriberMethods这个集合中。
若是etSubScriberInfo这个方法返回的为null,会走findUsingReflectionInSingleClass这个方法:咱们经过命名就和以猜出,他是经过反射进行查询的。咱们看一下源码:
private void findUsingReflectionInSingleClass(FindState findState) { Method[] methods; try { // This is faster than getMethods, especially when subscribers are fat classes like Activities methods = findState.clazz.getDeclaredMethods(); } catch (Throwable th) { // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149 methods = findState.clazz.getMethods(); findState.skipSuperClasses = true; } for (Method method : methods) { int modifiers = method.getModifiers(); if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) { Class<?>[] parameterTypes = method.getParameterTypes(); if (parameterTypes.length == 1) { Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class); if (subscribeAnnotation != null) { Class<?> eventType = parameterTypes[0]; if (findState.checkAdd(method, eventType)) { ThreadMode threadMode = subscribeAnnotation.threadMode(); findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode, subscribeAnnotation.priority(), subscribeAnnotation.sticky())); } } } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) { String methodName = method.getDeclaringClass().getName() + "." + method.getName(); throw new EventBusException("@Subscribe method " + methodName + "must have exactly 1 parameter but has " + parameterTypes.length); } } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) { String methodName = method.getDeclaringClass().getName() + "." + method.getName(); throw new EventBusException(methodName + " is a illegal @Subscribe method: must be public, non-static, and non-abstract"); } } }
methods = findState.clazz.getDeclaredMethods(); 经过反射作的,咱们猜的没错,这里就获取到全部订阅者的方法,
而后对获取的方法进行了依次的遍历,经过getParameterTpyes方法获取到咱们的参数,经过getAnnotation这个方法获取到咱们的Subscribe对象,这里其实主要作的就是过滤出被Subscribe修饰过的方法,subscribeAnnotation.threadMode,获取线程模式,经过这个线程模式进行调度,
2.咱们继续分析register 中的subscribe
// Must be called in synchronized block private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) { Class<?> eventType = subscriberMethod.eventType; Subscription newSubscription = new Subscription(subscriber, subscriberMethod); CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType); if (subscriptions == null) { subscriptions = new CopyOnWriteArrayList<>(); subscriptionsByEventType.put(eventType, subscriptions); } else { if (subscriptions.contains(newSubscription)) { throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType); } } int size = subscriptions.size(); for (int i = 0; i <= size; i++) { if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) { subscriptions.add(i, newSubscription); break; } } List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber); if (subscribedEvents == null) { subscribedEvents = new ArrayList<>(); typesBySubscriber.put(subscriber, subscribedEvents); } subscribedEvents.add(eventType); if (subscriberMethod.sticky) { if (eventInheritance) { // Existing sticky events of all subclasses of eventType have to be considered. // Note: Iterating over all events may be inefficient with lots of sticky events, // thus data structure should be changed to allow a more efficient lookup // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>). Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet(); for (Map.Entry<Class<?>, Object> entry : entries) { Class<?> candidateEventType = entry.getKey(); if (eventType.isAssignableFrom(candidateEventType)) { Object stickyEvent = entry.getValue(); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } } else { Object stickyEvent = stickyEvents.get(eventType); checkPostStickyEventToSubscription(newSubscription, stickyEvent); } } }
Subscription 这个类是作什么的?咱们看一下:
final class Subscription { final Object subscriber; final SubscriberMethod subscriberMethod; /** * Becomes false as soon as {@link EventBus#unregister(Object)} is called, which is checked by queued event delivery * {@link EventBus#invokeSubscriber(PendingPost)} to prevent race conditions. */ volatile boolean active; Subscription(Object subscriber, SubscriberMethod subscriberMethod) { this.subscriber = subscriber; this.subscriberMethod = subscriberMethod; active = true; } @Override public boolean equals(Object other) { if (other instanceof Subscription) { Subscription otherSubscription = (Subscription) other; return subscriber == otherSubscription.subscriber && subscriberMethod.equals(otherSubscription.subscriberMethod); } else { return false; } } @Override public int hashCode() { return subscriber.hashCode() + subscriberMethod.methodString.hashCode(); } }
这里处理了订阅者,和封装的一些订阅方法,线程模式,等,
咱们回去继续看:
subscriptions这个为null.证实这个事件尚未注册过,咱们就新建立一个CopyOnWriteArrayList,并添加到SubScriptByEventType这个hashMap当中,
subscriptions为null的话就会抛出异常,证实这个时间已经注册过了。
subscriptions.size回去这个集合大小,
经过for循环遍历,经过优先级的条件添加到subscriptions中。
这里对粘性事件也作了处理,这里不讲粘性事件,
最后 checkPostStickyEventToSubscription(newSubscription, stickyEvent);完成了注册。
总结:subscribe
1.判断是否有注册过该事件
2.而后按照优先级加入到SubscriptionByEventType的value的list中
3.而后再添加到typesBySubscriber的value的List中。
4.分发事件