说在前面java
本文转自“天河聊技术”微信公众号spring
事件驱动模型设计是一种优雅的程序设计方式,实现有不少,原理都是发布与订阅,观察者设计模式实现,java自带的实现、spring ioc的事件驱动模型,还有guava的实现,今天介绍guava eventbus的源码实现,看过这篇文章你本身也能够实现实现一套了。数据库
guava event源码解析设计模式
先上一个demo实现,了解车的原理以前先上去感觉下缓存
/** * 事件 * weifeng.jiang 2018-06-11 19:06 */ public class HelloEvent { private String message; public HelloEvent(String message) { this.message = message; } public String getMessage() { return message; } }
/** * 订阅者 * weifeng.jiang 2018-06-11 19:11 */ public class EventListener { @Subscribe public void listen(HelloEvent helloEvent){ System.out.println(helloEvent.getMessage()); } }
/** * 客户端 * weifeng.jiang 2018-06-11 19:12 */ public class Main { public static void main(String[] args) { // 建立事件总线 EventBus eventBus = new EventBus("test"); // 建立订阅者 EventListener listener = new EventListener(); // 注册订阅者 eventBus.register(listener); // 发布事件 eventBus.post(new HelloEvent("asdasd")); eventBus.post(new HelloEvent("asdasdasdas")); eventBus.post(new HelloEvent("asdasdasdasd")); } }
实现原理架构图服务器
怎么成为一个订阅者接受事件呢微信
接受事件的对象应有一个public方法,用@Subscribe注解标记这个方法,将接受事件对象传递给EventBus实例的register(Object)方法,参考图一和图三架构
怎么发布事件呢并发
只须要调用EventBus实例的post方法,参考图三异步
guava eventbus事件总线有两种,同步的实现EventBus,异步的实现AsyncEventBus,若是订阅者在接收事件后进行长时间的逻辑处理,好比和数据库交互,这时候就须要用异步事件了,若是是简单处理,同步实现就能够。
这里以EventBus事件总线同步实现为例进行源码解析。
成为订阅者的源码实现
和@Subscribe注解配合使用的还有一个@AllowConcurrentEvents注解,这个注解是能够容许事件并发的执行,看下建立订阅者对象的源码实现,以下
/** Creates a {@code Subscriber} for {@code method} on {@code listener}. 为监听器上的方法建立一个订阅服务器。*/ static Subscriber create(EventBus bus, Object listener, Method method) { return isDeclaredThreadSafe(method) ? new Subscriber(bus, listener, method) : new SynchronizedSubscriber(bus, listener, method); }
能够容许并发事件,在这个类中
@VisibleForTesting static final class SynchronizedSubscriber extends Subscriber { private SynchronizedSubscriber(EventBus bus, Object target, Method method) { super(bus, target, method); } @Override void invokeSubscriberMethod(Object event) throws InvocationTargetException { synchronized (this) { super.invokeSubscriberMethod(event); } } } }
执行事件的时候是同步实现。
事件总线订阅源码实现
com.google.common.eventbus.SubscriberRegistry#register
void register(Object listener) { // 查找全部订阅者,维护了一个key是事件类型,value是定订阅这个事件类型的订阅者集合的一个map Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener); for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { // 获取事件类型 Class<?> eventType = entry.getKey(); // 获取这个事件类型的订阅者集合 Collection<Subscriber> eventMethodsInListener = entry.getValue(); // 从缓存中按事件类型查找订阅者集合 CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); if (eventSubscribers == null) { // 从缓存中取不到,更新缓存 CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>(); eventSubscribers = MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet); } eventSubscribers.addAll(eventMethodsInListener); } }
事件和订阅事件的订阅者集合是在com.google.common.eventbus.SubscriberRegistry这里维护的
private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers = Maps.newConcurrentMap();
到这里,订阅者已经准备好了,准备接受事件了。
发布事件源码实现
com.google.common.eventbus.EventBus#post
public void post(Object event) { // 获取事件的订阅者集合 Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); if (eventSubscribers.hasNext()) { // 转发事件 dispatcher.dispatch(event, eventSubscribers); // 若是不是死亡事件,从新包装成死亡事件从新发布 } else if (!(event instanceof DeadEvent)) { // the event had no subscribers and was not itself a DeadEvent post(new DeadEvent(this, event)); } }
Iterator<Subscriber> getSubscribers(Object event) { // 获取事件类型类的超类集合 ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass()); List<Iterator<Subscriber>> subscriberIterators = Lists.newArrayListWithCapacity(eventTypes.size()); for (Class<?> eventType : eventTypes) { // 获取事件类型的订阅者集合 CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); if (eventSubscribers != null) { // eager no-copy snapshot subscriberIterators.add(eventSubscribers.iterator()); } } return Iterators.concat(subscriberIterators.iterator()); }
事件转发器有三种实现
第一种是当即转发,实时性比较高,其余两种都是队列实现。
执行订阅方法都是异步实现
final void dispatchEvent(final Object event) { executor.execute( new Runnable() { @Override public void run() { try { invokeSubscriberMethod(event); } catch (InvocationTargetException e) { bus.handleSubscriberException(e.getCause(), context(event)); } } }); }
说到最后
本次源码解析到这里,仅供参考。