发布消息的业务方没有限制,任何人,能够在任何地方,任什么时候间推送一条消息(或者说触发一个自定义事件)java
代码一览异步
/** Posts the given event to the event bus. */ public void post(Object event) { PostingThreadState postingState = currentPostingThreadState.get(); List<Object> eventQueue = postingState.eventQueue; eventQueue.add(event); if (!postingState.isPosting) { // 标记,发送中时,就拒绝掉再次发的请求 postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper(); postingState.isPosting = true; if (postingState.canceled) { throw new EventBusException("Internal error. Abort state was not reset"); } try { while (!eventQueue.isEmpty()) { postSingleEvent(eventQueue.remove(0), postingState); } } finally { postingState.isPosting = false; postingState.isMainThread = false; } } }
依赖的消息推送代码以下async
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error { Class<?> eventClass = event.getClass(); boolean subscriptionFound = false; if (eventInheritance) { List<Class<?>> eventTypes = lookupAllEventTypes(eventClass); int countTypes = eventTypes.size(); for (int h = 0; h < countTypes; h++) { Class<?> clazz = eventTypes.get(h); subscriptionFound |= postSingleEventForEventType(event, postingState, clazz); } } else { subscriptionFound = postSingleEventForEventType(event, postingState, eventClass); } if (!subscriptionFound) { if (logNoSubscriberMessages) { Log.d(TAG, "No subscribers registered for event " + eventClass); } if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) { post(new NoSubscriberEvent(this, event)); } } }
这个表示订阅者监听的事件类(即订阅者方法的参数类型)与事件类彻底一致时才会接受请求ide
彻底匹配事件类,而后执行下面的逻辑oop
NoSubscriberEvent
事件出来Method.invoke()
便可private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) { CopyOnWriteArrayList<Subscription> subscriptions; synchronized (this) { // 获取事件类对应的全部订阅者信息 subscriptions = subscriptionsByEventType.get(eventClass); } if (subscriptions != null && !subscriptions.isEmpty()) { for (Subscription subscription : subscriptions) { postingState.event = event; postingState.subscription = subscription; boolean aborted = false; try { postToSubscription(subscription, event, postingState.isMainThread); aborted = postingState.canceled; } finally { postingState.event = null; postingState.subscription = null; postingState.canceled = false; } if (aborted) { break; } } return true; } return false; } private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) { // xxx invokeSubscriber(subscription, event); // xxx } void invokeSubscriber(Subscription subscription, Object event) { try { subscription.subscriberMethod.method.invoke(subscription.subscriber, event); } catch (InvocationTargetException e) { handleSubscriberException(subscription, event, e.getCause()); } catch (IllegalAccessException e) { throw new IllegalStateException("Unexpected exception", e); } }
看到上面的执行,同步执行回调,没有采用Executor
,没有使用线程池post
这个表示,订阅者监听的事件类只要是发送事件的超类or该类,就能够接受请求,如你监听一个Object的事件,则全部的消息推送,这种场景下你均可以接收学习
相比上面,多了一步就是获取Event的全部超类丢入集合,遍历这个集合,获取全部类对应的订阅者信息,执行回调方法便可测试
获取超类的方法, 再第三篇小结中,咱们也说到了如何获取超类,Guava是使用内部封装的TypeToken.of(concreteClass).getTypes().rawTypes());
, 下面的使用则是以前提到的 clazz.getSuperclass()
, clazz.getInterfaces()
, 后面这个也是咱们最多见,也最容易想到的方法this
/** Looks up all Class objects including super classes and interfaces. Should also work for interfaces. */ private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) { synchronized (eventTypesCache) { List<Class<?>> eventTypes = eventTypesCache.get(eventClass); if (eventTypes == null) { eventTypes = new ArrayList<>(); Class<?> clazz = eventClass; while (clazz != null) { eventTypes.add(clazz); addInterfaces(eventTypes, clazz.getInterfaces()); clazz = clazz.getSuperclass(); } eventTypesCache.put(eventClass, eventTypes); } return eventTypes; } }
上面贴出的代码是实际的执行者,但在具体的执行者以前,是有一个方法,内部选择不一样的使用姿式来发消息以下线程
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) { switch (subscription.subscriberMethod.threadMode) { case POSTING: invokeSubscriber(subscription, event); break; case MAIN: if (isMainThread) { invokeSubscriber(subscription, event); } else { mainThreadPoster.enqueue(subscription, event); } break; case BACKGROUND: if (isMainThread) { backgroundPoster.enqueue(subscription, event); } else { invokeSubscriber(subscription, event); } break; case ASYNC: asyncPoster.enqueue(subscription, event); break; default: throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode); } }
以 backgroundPoster.enqueue
做为测试研究目标,其余几个设计思路没什么两样, 这个方法内部是
private final PendingPostQueue queue; public void enqueue(Subscription subscription, Object event) { PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event); synchronized (this) { queue.enqueue(pendingPost); if (!executorRunning) { executorRunning = true; eventBus.getExecutorService().execute(this); } } } @Override public void run() { try { try { while (true) { PendingPost pendingPost = queue.poll(1000); if (pendingPost == null) { synchronized (this) { // Check again, this time in synchronized pendingPost = queue.poll(); if (pendingPost == null) { executorRunning = false; return; } } } eventBus.invokeSubscriber(pendingPost); } } catch (InterruptedException e) { Log.w("Event", Thread.currentThread().getName() + " was interruppted", e); } } finally { executorRunning = false; } }
首先是获取 PendingPost
对象, 这个就是表示准备发布的消息,塞入队列,让后把本类丢入 EventBus
的线程池来执行
上面的设计思路,一个是不一样的类型,选择不一样消息发送机制,这个和简单工程模式特别类似,你制定一些发送消息的规则,根据你的须要来选择具体的规则来执行;