上一篇讲述了
EventBus
的整个执行流程, 本片则从细节处出发,探讨下设计的精妙java
看代码时,能够看到不少地方都用到了缓存,如再注册时, 根据class获取全部带注解的方法; 推送消息时,根据事件类型,获取全部的超类集合缓存
如注册时,一条完整的调用链安全
com.google.common.eventbus.SubscriberRegistry#register -> com.google.common.eventbus.SubscriberRegistry#findAllSubscribers -> com.google.common.eventbus.SubscriberRegistry#getAnnotatedMethods -> subscriberMethodsCache.getUnchecked(clazz) -> com.google.common.eventbus.SubscriberRegistry#getAnnotatedMethodsNotCached
TypeToken.of(concreteClass).getTypes().rawTypes()); // 咱们本身的实现, 一直到返回null为止 clz.getSuperClass().getSuperClass(); // 获取接口 clz.getInterfaces()
异步推送处理Event和同步处理主要的区别点是使用的 Dispatcher不一样, 同步是使用 PerThreadQueuedDispatcher
, 异步是 LegacyAsyncDispatcher
异步
异步的消息分发ide
/** * Global event queue. */ private final ConcurrentLinkedQueue<EventWithSubscriber> queue = Queues.newConcurrentLinkedQueue(); @Override void dispatch(Object event, Iterator<Subscriber> subscribers) { checkNotNull(event); while (subscribers.hasNext()) { queue.add(new EventWithSubscriber(event, subscribers.next())); } EventWithSubscriber e; while ((e = queue.poll()) != null) { e.subscriber.dispatchEvent(e.event); } } private static final class EventWithSubscriber { private final Object event; private final Subscriber subscriber; private EventWithSubscriber(Object event, Subscriber subscriber) { this.event = event; this.subscriber = subscriber; } } }
同步的消息推送学习
/** * Per-thread queue of events to dispatch. */ private final ThreadLocal<Queue<Event>> queue = new ThreadLocal<Queue<Event>>() { @Override protected Queue<Event> initialValue() { return Queues.newArrayDeque(); } }; @Override void dispatch(Object event, Iterator<Subscriber> subscribers) { checkNotNull(event); checkNotNull(subscribers); Queue<Event> queueForThread = queue.get(); queueForThread.offer(new Event(event, subscribers)); if (!dispatching.get()) { dispatching.set(true); try { Event nextEvent; while ((nextEvent = queueForThread.poll()) != null) { while (nextEvent.subscribers.hasNext()) { nextEvent.subscribers.next().dispatchEvent(nextEvent.event); } } } finally { dispatching.remove(); queue.remove(); } } } private static final class Event { private final Object event; private final Iterator<Subscriber> subscribers; private Event(Object event, Iterator<Subscriber> subscribers) { this.event = event; this.subscribers = subscribers; } } }
执行时, 在 AsyncEventBus
是在线程池中执行; 而 EventBus
则是直接执行, 实质上的执行器this
public static Executor directExecutor() { return DirectExecutor.INSTANCE; } /** See {@link #directExecutor} for behavioral notes. */ private enum DirectExecutor implements Executor { INSTANCE; @Override public void execute(Runnable command) { command.run(); } }
没有订阅者时, 抛一个 DeadEvent
google
订阅者接收消息后的,执行异常时 (订阅者之间的隔离)spa
final void dispatchEvent(final Object event) { executor.execute(new Runnable() { @Override public void run() { try { invokeSubscriberMethod(event); } catch (InvocationTargetException e) { bus.handleSubscriberException(e.getCause(), context(event)); } } }); }
Guava的EventBus不支持定义订阅者的顺序,更谈不上截断线程