注解,能够标注哪一个方法能够被注册和通知。它要求被注解的方法有且只有一个参数,而且该参数就是要注册监听的事件,例如:java
class EventBusChangeRecorder {
@Subscribe
public void recordCustomerChange(ChangeEvent e) {
recordChange(e.getChange());
}
}
复制代码
注册时,无须明确指定事件git
eventBus.register(new EventBusChangeRecorder());
复制代码
被@Subscribe注解的方法,能够被调用执行,至关于事件处理器github
private EventBus bus;
final Object target;
private final Method method;
private final Executor executor;
final void dispatchEvent(final Object event) {
this.executor.execute(new Runnable() {
public void run() {
try {
Subscriber.this.invokeSubscriberMethod(event);
} catch (InvocationTargetException var2) {
Subscriber.this.bus.handleSubscriberException(var2.getCause(), Subscriber.this.context(event));
}
}
});
}
复制代码
target,事件注册的实例编程
method,Java反射中的方法实例安全
dispatchEvent,对外暴露的调用(事件分发)方法并发
事件的注册表类,主要提供了注册方法,取消注册方法框架
void register(Object listener) {
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> eventMethodsInListener = entry.getValue();
//引入了CopyOnWriteArraySet解决集合的线程安全问题
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers == null) {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
}
eventSubscribers.addAll(eventMethodsInListener);
}
}
复制代码
void unregister(Object listener) {
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> listenerMethodsForType = entry.getValue();
CopyOnWriteArraySet<Subscriber> currentSubscribers = subscribers.get(eventType);
if (currentSubscribers == null || !currentSubscribers.removeAll(listenerMethodsForType)) {
throw new IllegalArgumentException();
}
}
}
复制代码
从代码中咱们能够看到,这两个方法都实现了线程安全,经过使用CopyOnWriteArraySet巧妙解决了注册/取消注册时的线程安全问题。ide
CopyOnWriteArraySet,在写入数据的时候,会建立一个新的 set,而且将原始数据 clone 到新的 set 中,在新的 set 中写入数据完成以后,再用新的 set 替换老的 set。这样就能保证在写入数据的时候,不影响数据的读取操做,以此来解决读写并发问题。post
后续有时间解析一下CopyOnWriteArraySet的源码this
事件总线的组合类,组合了事件分发器(后面要提的Dispatcher)、事件注册表等,统一对外提供注册、分发等功能
private final String identifier;
private final Executor executor;
private final SubscriberExceptionHandler exceptionHandler;
private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
private final Dispatcher dispatcher;
public void register(Object object) {
subscribers.register(object);
}
public void unregister(Object object) {
subscribers.unregister(object);
}
public void post(Object event) {
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
复制代码
总结一下,以上几个模块构成了EventBus的主体框架,基于这个框架解决了咱们上一章节提到的
线程安全问题:经过引入线程安全集合CopyOnWriteArraySet
显式注册问题:经过定义注解类Subscribe,标记能够被注册的方法,而且将要监听的Event做为方法的惟一参数,再利用Java反射的特性,实现了隐式注册
EventBus提供专门的事件分发器,而且为事件的分发提供了两种策略,一种是广度优先,一种是深度优先。
private static final class PerThreadQueuedDispatcher extends Dispatcher {
private final ThreadLocal<Queue<Event>> queue;
private final ThreadLocal<Boolean> dispatching;
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
Queue<Event> queueForThread = queue.get();
queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) {
dispatching.set(true);
try {
Event nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
while (nextEvent.subscribers.hasNext()) {
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
dispatching.remove();
queue.remove();
}
}
}
}
复制代码
上面的分发器,在被同一个线程分发(同一个线程调用post)时,可以保证事件分发的有序性。同时也引入了Queue实现了广度优先,下面咱们看一下另一个深度优先的实现
private static final class ImmediateDispatcher extends Dispatcher {
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
while (subscribers.hasNext()) {
subscribers.next().dispatchEvent(event);
}
}
}
复制代码
上章节中咱们一样提到了调用时异常处理的问题,guava一样给处理解决办法。guava定义了一个接口
public interface SubscriberExceptionHandler {
/** Handles exceptions thrown by subscribers. */
void handleException(Throwable exception, SubscriberExceptionContext context);
}
复制代码
在调用出现异常时,回调这个接口(完整代码请参考Subscriber的dispatchEvent方法)
try {
Subscriber.this.invokeSubscriberMethod(event);
} catch (InvocationTargetException var2) {
//出现异常,调用自定义异常处理
Subscriber.this.bus.handleSubscriberException(var2.getCause(), Subscriber.this.context(event));
}
复制代码
同时在实例化EventBus时,可传入自定义异常处理
public EventBus(SubscriberExceptionHandler exceptionHandler) {
this("default", MoreExecutors.directExecutor(), Dispatcher.perThreadDispatchQueue(), exceptionHandler);
}
复制代码
以上,结合上章节提出的问题,对guava的EventBus作了分析,咱们能看到Google实现的Eventbus代码很优雅,程序也很健壮,他们在设计的时候会考虑到不少方面,这对咱们本身编程以及代码框架会有很多启发。
有关EventBus的整个系列都写完了,在写做的过程当中,我不断回看guava的源码,收获甚多。建议你们也去读一读guava源码,了解一下世界上顶级的Java开发者是如何写代码的。