开始研究源码的设计思路,从
Listener
注册出发,EventBus
如何维护监听者信息,到Publisher
发送消息,消息以怎样的渠道分发给全部的Listener
, 顺序如何保证,传递性如何保证,出现异常如何处理,找不到监听者怎么处理等等html
EventBus
这个类至关于一个中转站,
Publisher
调用它的post(Object)
来推送事件;而后将事件一次推送给注册的Listener
java
在初始化s时, EventBus
对象会维护一个 private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
实例, 这个就是维护订阅关系的核心类web
注册方法以下spring
/** * Registers all subscriber methods on {@code object} to receive events. * * @param object object whose subscriber methods should be registered. */ public void register(Object object) { subscribers.register(object); }
接着咱们看下这个类的具体实现缓存
SubscriberRegistry.java
数据结构
/** * All registered subscribers, indexed by event type. * * <p>The {@link CopyOnWriteArraySet} values make it easy and relatively lightweight to get an * immutable snapshot of all current subscribers to an event without any locking. */ private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers = Maps.newConcurrentMap(); /** * Registers all subscriber methods on the given listener object. */ void register(Object listener) { Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener); for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { Class<?> eventType = entry.getKey(); Collection<Subscriber> eventMethodsInListener = entry.getValue(); CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); if (eventSubscribers == null) { CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>(); eventSubscribers = MoreObjects.firstNonNull( subscribers.putIfAbsent(eventType, newSet), newSet); } eventSubscribers.addAll(eventMethodsInListener); } }
subscribers : - 对象初始化时建立 - 维护的是EventType
-> Listener
的映射关系,value为一个集合,说明一个事件能够推送给多个Listener
- 监听者,能够有能够监听多个不一样类型的事件app
注册流程: - 根据注册的对象,将其中全部的回调方法都捞出来 - 将上步的结果塞入 subscribers
集合中; key为 Listener
的类名less
注册目的就是发布消息后,
EventBus
能够将这个Event
传递”Listener
(即订阅方)ide
为了实现上面的目的,若是要咱们本身实现,会怎么作?post
@Subscribe
注解的方法捞出来Event
, 由于注册的目的是为了实现回调, 因此封装一个类,包含这个Listener
对象的引用 + 要执行的方法上面注册的实际实现和上面的步骤差很少
获取全部包含注解的方法
实际的代码以下
private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) { Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes(); Map<MethodIdentifier, Method> identifiers = Maps.newHashMap(); for (Class<?> supertype : supertypes) { for (Method method : supertype.getDeclaredMethods()) { if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) { // TODO(cgdecker): Should check for a generic parameter type and error out Class<?>[] parameterTypes = method.getParameterTypes(); checkArgument(parameterTypes.length == 1, "Method %s has @Subscribe annotation but has %s parameters." + "Subscriber methods must have exactly 1 parameter.", method, parameterTypes.length); MethodIdentifier ident = new MethodIdentifier(method); if (!identifiers.containsKey(ident)) { identifiers.put(ident, method); } } } } return ImmutableList.copyOf(identifiers.values()); }
看下上面的实现,很是有意思的是,不只将改对象中的全部@Subscribe
注解的方法捞出来,连父类中的也不放过;就是这个 TypeToken.of(clazz).getTypes().rawTypes();
从上面的限定,也能够看出,对于回调方法是有要求的: 有且只能有一个参数, checkArgument(parameterTypes.length == 1,xxx)
过滤重载的回调方法(这点比较有意思,搞了个Map, key由方法名+方法参数确认(MethodIdentifier
的equals方法重写了), 而不是直接用集合的contains
方法, 请注意其中的区别)
method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()
这个判断条件的后一个能够参考http://www.xue163.com/2122/1/21224778.html
将上面的方法转换为Map, 看这个 Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
Event.class
; value为一个包含 Listener
, Method
, EventBus
实例的对象 Subscriber
将上面的map塞入subscribers
集合
subscribers
集合包含的是全部的 (事件 -> 监听者回调集合)Event
-> Set<Listener.Method>
subscribers
的数据结构,其实能够看到,一个Listener
对象,若是注册屡次,最终的效果实际上是同样的,这个监听者,并不会被调用屡次; 若是一个Lisntener
类,有多个对象,则注册后,每一个对象的回调都会执行到;
Subscriber
对象Subscriber
的 hashcode & equals 方法没有重写到此, 注册完毕;注销的方法和上面差很少,惟一的区别是最后一个是向 subscribers
塞数据,一个是从其中删数据而已
题外话
若是咱们想获取工程中全部包含某个注解的类能够怎么办? - 若是是用spring的话, 能够考虑 `ApplicationContext.getBeansWithAnnotation()` 获取工程中,全部包含某个注解的方法,除了上面的主动注册,有什么其余的方法?
###2. 推送事件
发布方,调用
EventBus.post(Object)
方法实现消息的推送
正式开始以前,咱们能够先预测一下,当发布方调用了这个方法以后,会执行那些action
subscribers
中获取出全部的监听者,以及对应的回调方法, 放在一个集合中上面是正向的操做流程,接着一些异常状况和边界也须要考虑下
带着上面的臆测,来实际看下EventBus
本身是怎么玩的
/** * 将事件推送给全部的监听者,无论监听者是否正常处理,都是正确返回 * Posts an event to all registered subscribers. This method will return * successfully after the event has been posted to all subscribers, and * regardless of any exceptions thrown by subscribers. * * 若是一个事件没有监听者,且该事件不是 DeadEvent, 则转为 DeadEvent并从新推送 * <p>If no subscribers have been subscribed for {@code event}'s class, and * {@code event} is not already a {@link DeadEvent}, it will be wrapped in a * DeadEvent and reposted. * * @param event event to post. */ public void post(Object event) { Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); if (eventSubscribers.hasNext()) { dispatcher.dispatch(event, eventSubscribers); } else if (!(event instanceof DeadEvent)) { // the event had no subscribers and was not itself a DeadEvent post(new DeadEvent(this, event)); } }
上面的解释比较清楚, 基本上核心的推送就是 dispatcher.dispatch(event, eventSubscribers);
实际的使用的是 PerThreadQueuedDispatcher
推送代码以下,逻辑比较清晰,将Event
塞入队列, 而后将队列中的全部消息依次推送给全部的订阅者
/** * Per-thread queue of events to dispatch. */ private final ThreadLocal<Queue<Event>> queue = new ThreadLocal<Queue<Event>>() { @Override protected Queue<Event> initialValue() { return Queues.newArrayDeque(); } }; /** * Per-thread dispatch state, used to avoid reentrant event dispatching. */ private final ThreadLocal<Boolean> dispatching = new ThreadLocal<Boolean>() { @Override protected Boolean initialValue() { return false; } }; @Override void dispatch(Object event, Iterator<Subscriber> subscribers) { checkNotNull(event); checkNotNull(subscribers); Queue<Event> queueForThread = queue.get(); queueForThread.offer(new Event(event, subscribers)); if (!dispatching.get()) { dispatching.set(true); try { Event nextEvent; while ((nextEvent = queueForThread.poll()) != null) { while (nextEvent.subscribers.hasNext()) { nextEvent.subscribers.next().dispatchEvent(nextEvent.event); } } } finally { dispatching.remove(); queue.remove(); } } }
最终真正执行推送Event
的是这个方法 com.google.common.eventbus.Subscriber#dispatchEvent
/** * Dispatches {@code event} to this subscriber using the proper executor. */ final void dispatchEvent(final Object event) { executor.execute(new Runnable() { @Override public void run() { try { invokeSubscriberMethod(event); } catch (InvocationTargetException e) { bus.handleSubscriberException(e.getCause(), context(event)); } } }); } /** * Invokes the subscriber method. This method can be overridden to make the invocation * synchronized. */ @VisibleForTesting void invokeSubscriberMethod(Object event) throws InvocationTargetException { try { method.invoke(target, checkNotNull(event)); } catch (IllegalArgumentException e) { throw new Error("Method rejected target/argument: " + event, e); } catch (IllegalAccessException e) { throw new Error("Method became inaccessible: " + event, e); } catch (InvocationTargetException e) { if (e.getCause() instanceof Error) { throw (Error) e.getCause(); } throw e; } }
上面从源码的角度,对整个流程顺了一遍,下面的图对几个主要的类结构进行了抽取,并对上面的几个方法进行了简要的说明
图一, 将上面说明的几个类属性 + 方法进行了说明
图二, 对逻辑进行列举
1.根据class,获取全部超类集合 (EventBus的实际使用中,Event的超类集合都塞入了缓存,加快查询速度)
TypeToken.of(concreteClass).getTypes().rawTypes());
2.获取类中标有注解的方法
private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz, Class annotationClz) { Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes(); Map<MethodIdentifier, Method> identifiers = Maps.newHashMap(); for (Class<?> supertype : supertypes) { for (Method method : supertype.getDeclaredMethods()) { if (method.isAnnotationPresent(annotationClz) && !method.isSynthetic()) { // TODO(cgdecker): Should check for a generic parameter type and error out Class<?>[] parameterTypes = method.getParameterTypes(); MethodIdentifier ident = new MethodIdentifier(method); if (!identifiers.containsKey(ident)) { identifiers.put(ident, method); } } } } return ImmutableList.copyOf(identifiers.values()); } private static final class MethodIdentifier { private final String name; private final List<Class<?>> parameterTypes; MethodIdentifier(Method method) { this.name = method.getName(); this.parameterTypes = Arrays.asList(method.getParameterTypes()); } @Override public int hashCode() { return Objects.hashCode(name, parameterTypes); } @Override public boolean equals(@Nullable Object o) { if (o instanceof MethodIdentifier) { MethodIdentifier ident = (MethodIdentifier) o; return name.equals(ident.name) && parameterTypes.equals(ident.parameterTypes); } return false; } }