最近看Elastic-Job源码,看到它里面实现的任务运行轨迹的持久化,使用的是Guava的AsyncEventBus,一个内存级别的异步事件总线服务,实现了简单的生产-消费者模式,从而在不影响任务执行效率的基础上,将任务执行和任务轨迹记录解耦,大大提升了EJ的性能。java
EventBus的使用方法不难,具体能够参考EJ里面几个相关的类:JobEventListener、JobEventBus和LiteJobFacade。主要的流程以下:缓存
@Override public void postJobExecutionEvent(final JobExecutionEvent jobExecutionEvent) { jobEventBus.post(jobExecutionEvent); } @Override public void postJobStatusTraceEvent(final String taskId, final State state, final String message) { TaskContext taskContext = TaskContext.from(taskId); jobEventBus.post(new JobStatusTraceEvent(taskContext.getMetaInfo().getJobName(), taskContext.getId(), taskContext.getSlaveId(), Source.LITE_EXECUTOR, taskContext.getType(), taskContext.getMetaInfo().getShardingItems().toString(), state, message)); if (!Strings.isNullOrEmpty(message)) { log.trace(message); } }
言归正传,咱们来看看EventBus究竟是如何实现观察者模式的。他的主要实现类都在com.google.common.eventbus这个包下面。安全
咱们首先来看一下里面比较重要的几个类,同时理解一些概念。多线程
这个类有几个属性:并发
private final String identifier;//惟一标识,默认为default private final Executor executor;//多线程处理器,默认MoreExecutors.directExecutor() private final SubscriberExceptionHandler exceptionHandler;//异常处理器 private final SubscriberRegistry subscribers = new SubscriberRegistry(this);//订阅注册表 private final Dispatcher dispatcher;//消息分发器,默认为Dispatcher.perThreadDispatchQueue(),单线程消息分发队列
其中,identifier表示,同一个应用中,能够根据identifier来区分不一样的事件总线,只不过默认为default而已。异步
EventBus主要定义了几个方法:async
public void register(Object object) { subscribers.register(object); }
注册的是本身定义的监听器,也就是listener。ide
public void unregister(Object object) { subscribers.unregister(object); }
相似于注册。源码分析
public void post(Object event) { Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); if (eventSubscribers.hasNext()) { dispatcher.dispatch(event, eventSubscribers); } else if (!(event instanceof DeadEvent)) { // the event had no subscribers and was not itself a DeadEvent post(new DeadEvent(this, event)); } }
这块主要是根据event事件类型,来获取事件的订阅者,而后进行事件消息的分发。固然,若是没有订阅者,也就是event的类型是DeadEvent,也会进行对应的处理。post
继承自EventBus,主要区别在于分发器,使用的是Dispatcher.legacyAsync()。这个后续我们再分析。
乍看这个类,就是订阅者,其实咱们看源码就能理解,当一个订阅类的多个方法用@Subscribe注解时,每一个被注解的方法对应的是一个订阅者。
这个类只是package内可见,没有定义为public,能够经过静态方法create来建立它。
static Subscriber create(EventBus bus, Object listener, Method method) { return isDeclaredThreadSafe(method) ? new Subscriber(bus, listener, method) : new SynchronizedSubscriber(bus, listener, method); }
这里传入的method就是使用了@Subscribe注解的方法,这块会先判断这个方法是否线程安全,便是否使用@AllowConcurrentEvent来进行注解,来建立不一样的Subscriber。惟一的差异是SynchronizedSubscriber中一个方法使用了synchronized来修饰。
final void dispatchEvent(final Object event) { executor.execute( new Runnable() { @Override public void run() { try { invokeSubscriberMethod(event); } catch (InvocationTargetException e) { bus.handleSubscriberException(e.getCause(), context(event)); } } }); }
调用多线程来处理event。
@VisibleForTesting void invokeSubscriberMethod(Object event) throws InvocationTargetException { try { method.invoke(target, checkNotNull(event)); } catch (IllegalArgumentException e) { throw new Error("Method rejected target/argument: " + event, e); } catch (IllegalAccessException e) { throw new Error("Method became inaccessible: " + event, e); } catch (InvocationTargetException e) { if (e.getCause() instanceof Error) { throw (Error) e.getCause(); } throw e; } }
调用订阅者的方法。
咱们以前在讲到EventBus时,里面有两个方法register和unregister,调用的就是这个类的方法。这个类的做用也讲到,是存储event和对应的订阅者的关系的。咱们来看一下这个类的设计。
private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers = Maps.newConcurrentMap(); @Weak private final EventBus bus;
这个类有两个属性。
注册监听器。
void register(Object listener) { Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener); for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { Class<?> eventType = entry.getKey(); Collection<Subscriber> eventMethodsInListener = entry.getValue(); CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); if (eventSubscribers == null) { CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>(); eventSubscribers = MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet); } eventSubscribers.addAll(eventMethodsInListener); } }
主要的逻辑是:
实现与register相似,先根据listener找到subscriber,找到须要监听的方法,而后根据事件类型去移除subscriber。
获取监听器中全部的监听方法。
private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) { Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create(); Class<?> clazz = listener.getClass(); for (Method method : getAnnotatedMethods(clazz)) { Class<?>[] parameterTypes = method.getParameterTypes(); Class<?> eventType = parameterTypes[0]; methodsInListener.put(eventType, Subscriber.create(bus, listener, method)); } return methodsInListener; }
findAllSubscribers用于查找事件类型以及事件处理器的对应关系。查找注解须要涉及到反射,经过反射来获取标注在方法上的注解。由于Guava针对EventBus的注册采起的是“隐式契约”而非接口这种“显式契约”。而类与接口是存在继承关系的,全部颇有可能某个订阅者其父类(或者父类实现的某个接口)也订阅了某个事件。所以这里的查找须要顺着继承链向上查找父类的方法是否也被注解标注。
获取event的订阅者。
Iterator<Subscriber> getSubscribers(Object event) { ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass()); List<Iterator<Subscriber>> subscriberIterators = Lists.newArrayListWithCapacity(eventTypes.size()); for (Class<?> eventType : eventTypes) { CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); if (eventSubscribers != null) { // eager no-copy snapshot subscriberIterators.add(eventSubscribers.iterator()); } } return Iterators.concat(subscriberIterators.iterator()); }
分发器,用于将event分发给subscriber。它内部实现了三种不一样类型的分发器,用于不一样的状况下事件的顺序性。它的核心方法是:
abstract void dispatch(Object event, Iterator<Subscriber> subscribers);
它的三种实现:
EventBus默认使用的分发器。它的实现是经过ThreadLocal来实现一个事件队列,每一个线程包含一个这样的内部队列。
它的分发代码以下:
void dispatch(Object event, Iterator<Subscriber> subscribers) { checkNotNull(event); checkNotNull(subscribers); Queue<Event> queueForThread = queue.get(); queueForThread.offer(new Event(event, subscribers)); if (!dispatching.get()) { dispatching.set(true); try { Event nextEvent; while ((nextEvent = queueForThread.poll()) != null) { while (nextEvent.subscribers.hasNext()) { nextEvent.subscribers.next().dispatchEvent(nextEvent.event); } } } finally { dispatching.remove(); queue.remove(); } } }
嵌套两层循环,第一层事件不为空,第二层该事件下的订阅者不为空,则分发事件下去。
AsyncEventBus使用的分发器。它在内部经过一个ConcurrentLinkedQueue
void dispatch(Object event, Iterator<Subscriber> subscribers) { checkNotNull(event); while (subscribers.hasNext()) { queue.add(new EventWithSubscriber(event, subscribers.next())); } EventWithSubscriber e; while ((e = queue.poll()) != null) { e.subscriber.dispatchEvent(e.event); } }
是一前一后两个循环。前面一个是遍历事件订阅处理器,并构建一个事件实体对象存入队列。后一个循环是遍历该事件实体对象队列,取出事件实体对象中的事件进行分发。
同步分发器。
void dispatch(Object event, Iterator<Subscriber> subscribers) { checkNotNull(event); while (subscribers.hasNext()) { subscribers.next().dispatchEvent(event); } }
Elastic-Job使用的EventBus,能够说很好的对任务的运行和轨迹记录进行了解耦,借鉴了Guava的思想,将代码优雅发挥到了新的境界。固然,Guava对EventBus的设计思想是咱们须要进行学习和使用的。