guava eventbus源码解析

说在前面java

本文转自“天河聊技术”微信公众号spring

事件驱动模型设计是一种优雅的程序设计方式,实现有不少,原理都是发布与订阅,观察者设计模式实现,java自带的实现、spring ioc的事件驱动模型,还有guava的实现,今天介绍guava eventbus的源码实现,看过这篇文章你本身也能够实现实现一套了。数据库

 

guava event源码解析设计模式

先上一个demo实现,了解车的原理以前先上去感觉下缓存

/**
 * 事件
 * weifeng.jiang 2018-06-11 19:06
 */
public class HelloEvent {

    private String message;

    public HelloEvent(String message) {
        this.message = message;
    }

    public String getMessage() {
        return message;
    }
}
/**
 * 订阅者
 * weifeng.jiang 2018-06-11 19:11
 */
public class EventListener {

    @Subscribe
    public void listen(HelloEvent helloEvent){
        System.out.println(helloEvent.getMessage());
    }
}
/**
 * 客户端
 * weifeng.jiang 2018-06-11 19:12
 */
public class Main {

    public static void main(String[] args) {
//        建立事件总线
        EventBus eventBus = new EventBus("test");
//        建立订阅者
        EventListener listener = new EventListener();
//       注册订阅者
        eventBus.register(listener);
//        发布事件
        eventBus.post(new HelloEvent("asdasd"));
        eventBus.post(new HelloEvent("asdasdasdas"));
        eventBus.post(new HelloEvent("asdasdasdasd"));
    }
}

实现原理架构图服务器

怎么成为一个订阅者接受事件呢微信

接受事件的对象应有一个public方法,用@Subscribe注解标记这个方法,将接受事件对象传递给EventBus实例的register(Object)方法,参考图一和图三架构

 

怎么发布事件呢并发

只须要调用EventBus实例的post方法,参考图三异步

 

guava eventbus事件总线有两种,同步的实现EventBus,异步的实现AsyncEventBus,若是订阅者在接收事件后进行长时间的逻辑处理,好比和数据库交互,这时候就须要用异步事件了,若是是简单处理,同步实现就能够。

 

这里以EventBus事件总线同步实现为例进行源码解析。

 

成为订阅者的源码实现

和@Subscribe注解配合使用的还有一个@AllowConcurrentEvents注解,这个注解是能够容许事件并发的执行,看下建立订阅者对象的源码实现,以下

/** Creates a {@code Subscriber} for {@code method} on {@code listener}. 为监听器上的方法建立一个订阅服务器。*/
static Subscriber create(EventBus bus, Object listener, Method method) {
  return isDeclaredThreadSafe(method)
      ? new Subscriber(bus, listener, method)
      : new SynchronizedSubscriber(bus, listener, method);
}

能够容许并发事件,在这个类中

@VisibleForTesting
  static final class SynchronizedSubscriber extends Subscriber {

    private SynchronizedSubscriber(EventBus bus, Object target, Method method) {
      super(bus, target, method);
    }

    @Override
    void invokeSubscriberMethod(Object event) throws InvocationTargetException {
      synchronized (this) {
        super.invokeSubscriberMethod(event);
      }
    }
  }
}

执行事件的时候是同步实现。

 

事件总线订阅源码实现

com.google.common.eventbus.SubscriberRegistry#register

void register(Object listener) {
//    查找全部订阅者,维护了一个key是事件类型,value是定订阅这个事件类型的订阅者集合的一个map
    Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

    for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
//      获取事件类型
      Class<?> eventType = entry.getKey();
//      获取这个事件类型的订阅者集合
      Collection<Subscriber> eventMethodsInListener = entry.getValue();

//      从缓存中按事件类型查找订阅者集合
      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);

      if (eventSubscribers == null) {
//        从缓存中取不到,更新缓存
        CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
        eventSubscribers =
            MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
      }

      eventSubscribers.addAll(eventMethodsInListener);
    }
  }

事件和订阅事件的订阅者集合是在com.google.common.eventbus.SubscriberRegistry这里维护的

private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
    Maps.newConcurrentMap();

到这里,订阅者已经准备好了,准备接受事件了。

 

发布事件源码实现

com.google.common.eventbus.EventBus#post

public void post(Object event) {
//    获取事件的订阅者集合
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
//      转发事件
      dispatcher.dispatch(event, eventSubscribers);
//      若是不是死亡事件,从新包装成死亡事件从新发布
    } else if (!(event instanceof DeadEvent)) {
      // the event had no subscribers and was not itself a DeadEvent
      post(new DeadEvent(this, event));
    }
  }
Iterator<Subscriber> getSubscribers(Object event) {
//    获取事件类型类的超类集合
    ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());

    List<Iterator<Subscriber>> subscriberIterators =
        Lists.newArrayListWithCapacity(eventTypes.size());

    for (Class<?> eventType : eventTypes) {
//      获取事件类型的订阅者集合
      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
      if (eventSubscribers != null) {
        // eager no-copy snapshot
        subscriberIterators.add(eventSubscribers.iterator());
      }
    }

    return Iterators.concat(subscriberIterators.iterator());
  }

事件转发器有三种实现

第一种是当即转发,实时性比较高,其余两种都是队列实现。

执行订阅方法都是异步实现

final void dispatchEvent(final Object event) {
  executor.execute(
      new Runnable() {
        @Override
        public void run() {
          try {
            invokeSubscriberMethod(event);
          } catch (InvocationTargetException e) {
            bus.handleSubscriberException(e.getCause(), context(event));
          }
        }
      });
}

 

说到最后

本次源码解析到这里,仅供参考。

相关文章
相关标签/搜索