事件驱动模型也就是咱们常说的观察者,或者发布-订阅模型;理解它的几个关键点:java
好比说订单状态变化的时候,可以通知到邮件服务,短信服务,积分变化等等; 若是你是个新手,想象一下你去实现这个业务的代码怎么去实现?确定是一个OrderService里面引入积分Service,短信Service,邮件Service,还有不少不少Service,可能还要调用第三方接口。是否是发现问题所在了,Service耦合严重,又是还会出现循环引用的问题,代码超长,以致于不方便维护。spring
从如上例子能够看出,应该使用一个观察者来解耦这些Service之间的依赖关系,如图:
设计模式
图中增长了一个Listener来解耦OrderService和其余Service,即注册成功后,只须要通知相关的监听器,不须要关心它们如何处理,处理起来很是容易。这就是一个典型的事件处理模型-观察者模式,解耦目标对象和它的依赖对象,目标只须要通知它的依赖对象,具体怎么处理,依赖对象本身决定。好比是异步仍是同步,延迟仍是非延迟等。
其实上边其实也使用了DIP(依赖倒置原则),依赖于抽象,而不是具体。
仍是就是使用了IOC思想,即之前主动去建立它依赖的Service,如今只是被动等待别人注册进来。
主要目的是:松散耦合对象间的一对多的依赖关系。并发
设计模式里面的观察者模式app
JDK观察者模式异步
JavaBean事件驱动ide
spring事件驱动post
......测试
JavaBean规范提供了一种监听属性变化的事件驱动模型,提供操做JavaBean属性的类PropertyChangeSupport,PropertyEditorSupport以及PropertyChangeListener支持,PropertyEditorSupport就是目标,而PropertyChangeListener就是监听器。ui
具体表明者是ApplicationEvent,其下有一个ApplicationContextEvent,表示Spring容器事件,且其又有以下实现:
注:org.springframework.context.support.AbstractApplicationContext抽象类实现了LifeCycle的start和stop回调并发布ContextStartedEvent和ContextStoppedEvent事件。
事件发布具体表明者是:ApplicationEventPublisher及ApplicationEventMulticaster。
一、ApplicationContext接口继承了ApplicationEventPublisher,并在AbstractApplicationContext实现了具体代码,实际执行是委托给ApplicationEventMulticaster(能够认为是多播),咱们经常使用的ApplicationContext都继承自AbstractApplicationContext,如ClassPathXmlApplicationContext、XmlWebApplicationContext等,因此自动拥有这个功能。
二、ApplicationContext自动到本地容器里找一个名字为”applicationEventMulticaster“的ApplicationEventMulticaster实现,若是没有本身new一个SimpleApplicationEventMulticaster,
监听器具体表明者是:ApplicationListener。其继承自JDK的EventListener,JDK要求全部监听器将继承它。
1. 它只提供了onApplicationEvent方法,咱们须要在该方法实现内部判断事件类型来处理,或者指定某一个事件类型(泛型,实现ApplicationListener)。
2. 它没有提供按顺序触发监听器的语义,因此Spring提供了另外一个接口SmartApplicationListener,该接口支持判断的事件类型、目标类型,及执行顺序。
因为在上一篇文章:使用事件驱动进行代码解耦-Spring篇,已经介绍了基于spring事件方式进行代码解耦的示例,这里主要介绍基于google guava来进行示例展现。
在guava中,事件处理器被称为事件总线EventBus,具体表明有EventBus类和AsyncEventBus类(异步);事件监听者被称为订阅者Subscriber。
1. 注册订阅者:新建一个事件总线EventBus/AsyncEventBus,就能够向其中注册订阅者,订阅者其实就是一个被标注了注解@com.google.common.eventbus.Subscribe的方法,向事件总线EventBus/AsyncEventBus注册订阅者的代码逻辑若下:
void register(Object listener) { Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener); for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { Class<?> eventType = entry.getKey(); Collection<Subscriber> eventMethodsInListener = entry.getValue(); CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType); if (eventSubscribers == null) { CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>(); eventSubscribers = MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet); } eventSubscribers.addAll(eventMethodsInListener); } } private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) { Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create(); Class<?> clazz = listener.getClass(); for (Method method : getAnnotatedMethods(clazz)) { Class<?>[] parameterTypes = method.getParameterTypes(); Class<?> eventType = parameterTypes[0]; methodsInListener.put(eventType, Subscriber.create(bus, listener, method)); } return methodsInListener; } private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) { Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes(); Map<SubscriberRegistry.MethodIdentifier, Method> identifiers = Maps.newHashMap(); for (Class<?> supertype : supertypes) { for (Method method : supertype.getDeclaredMethods()) { if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) { // TODO(cgdecker): Should check for a generic parameter type and error out Class<?>[] parameterTypes = method.getParameterTypes(); checkArgument( parameterTypes.length == 1, "Method %s has @Subscribe annotation but has %s parameters." + "Subscriber methods must have exactly 1 parameter.", method, parameterTypes.length); SubscriberRegistry.MethodIdentifier ident = new SubscriberRegistry.MethodIdentifier(method); if (!identifiers.containsKey(ident)) { identifiers.put(ident, method); } } } } return ImmutableList.copyOf(identifiers.values()); }
从“if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic())”这一句代码能够明显的知道,一个bean向事件总线EventBus/AsyncEventBus进行注册,注册的并非bean自己,而是该bean中被全部标注了注解@Subscribe的方法,一个被标注了注解@Subscribe的方法就是一个订阅者。当有事件发布到事件总线中,事件总线会遍历全部的订阅者进行事件处理。
2. 向事件总线EventBus和AsyncEventBus发布事件,直接调用事件总线的post方法便可
public void post(Object event) { Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); if (eventSubscribers.hasNext()) { dispatcher.dispatch(event, eventSubscribers); } else if (!(event instanceof DeadEvent)) { // the event had no subscribers and was not itself a DeadEvent post(new DeadEvent(this, event)); } }
本示例模拟订单生成与更新时会发送短信与站内信简单业务场景demo
1. 自定义一些接口以及POJO
/** * 业务事件发布者 * @author dongsilin * @version 1.0 * @date 2019/1/24 */ public interface BizEventPublisher { /** * 发布同步事件 * @param eventData */ void publishEvent(Object eventData); /** * 发布异步事件 * @param eventData */ void publishEventAsync(Object eventData); } /** * 业务事件发布者 * @author dongsilin * @version 1.0 * @date 2019/1/24 */ public interface BizEventPublisher { /** * 发布同步事件 * @param eventData */ void publishEvent(Object eventData); /** * 发布异步事件 * @param eventData */ void publishEventAsync(Object eventData); } /** * 业务事件类型 * @author dongsilin * @version 1.0 * @date 2019/1/24 */ public enum BizEventType { ORDER_CREATE("订单-建立"), ORDER_UPDATE("订单-修改"), ; String describe; BizEventType(String describe) { this.describe = describe; } } /** * @author dongsilin * @version 1.0 * @date 2019/1/24 * 测试订单类 */ @Data public class Order { private long orderId; private long userId; public Order(long orderId, long userId) { this.setOrderId(orderId); this.setUserId(userId); } }
2. 自定义业务数据,包含事件通用数据属性
/** * @author dongsilin * @version 1.0 * @date 2019/1/24 * 业务事件数据 */ @Data @AllArgsConstructor public class BizEventData<S> implements EventExecutor<S> { private BizEventType eventType; private S data; @Override public void executeEvent(Consumer<S> executor) { executor.accept(data); } public static<S> BizEventData of(BizEventType eventType, S data) { return new BizEventData(eventType, data); } }
3. 业务事件发布配置管理
/** * @author dongsilin * @version 1.0 * @date 2019/1/24 * 业务事件发布配置管理 */ @Slf4j @Component public class BizEventPublisherConfiguration implements BizEventPublisher { /** 同步事件总线 */ private EventBus eventBus = new EventBus(); /** 异步事件总线 */ private AsyncEventBus eventBusAsync = new AsyncEventBus( new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 5L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadFactoryBuilder().setNameFormat("guava-event-executor-pool-%d").build() ), (Throwable e, SubscriberExceptionContext exceptionContext) -> { log.error("", e); } ); @Override public void publishEvent(Object eventData) { eventBus.post(eventData); } @Override public void publishEventAsync(Object eventData) { eventBusAsync.post(eventData); } /** * 构造器,注册监听者 * @param beanFactory */ public BizEventPublisherConfiguration (ListableBeanFactory beanFactory) { // 获取全部带有 @BizEventListener 的 bean,将他们注册为监听者 beanFactory.getBeansWithAnnotation(BizEventListener.class) .forEach((beanName, listener) -> { eventBusAsync.register(listener); eventBus.register(listener); }); } }
4. 业务事件监听配置管理
/** * @author dongsilin * @version 1.0 * @date 2019/1/24 * 业务事件监听配置管理 */ @Slf4j @Component @BizEventListener public class BizEventListenerConfiguration { @Autowired private TestMsgService msgService; @Subscribe public void executeEvent(BizEventData bizEventData) { switch (bizEventData.getEventType()) { // 订单建立,发送短信,....... case ORDER_CREATE: bizEventData.executeEvent((data) -> { msgService.sendPhoneMsg((Order) data); });break; // 订单修改,站内信提醒,....... case ORDER_UPDATE: bizEventData.executeEvent((data) -> { msgService.sendWebMsg((Order) data); });break; default: bizEventData.executeEvent((data) -> { log.info("executeEvent bizEventData = {}", data); }); } } }
5. 定义两个测试service,TestOrderService 和 TestMsgService
/** * @author dongsilin * @version 1.0 * @date 2019/1/24 */ @Slf4j @Service public class TestOrderService { @Autowired private BizEventPublisher bizEventPublisher; public void create(Order order) { ...... log.info("TestOrderService create order = {}", order); bizEventPublisher.publishEvent(BizEvent.of(BizEventType.ORDER_CREATE, order)); } public void update(Order order) { ...... log.info("TestOrderService update order = {}", order); bizEventPublisher.publishEventAsync(BizEvent.of(BizEventType.ORDER_UPDATE, order)); } } /** * @author dongsilin * @version 1.0 * @date 2019/1/24 */ @Slf4j @Service public class TestMsgService { public void sendPhoneMsg(Order order) { ...... log.info("TestMsgService sendPhoneMsg order = {}", order); } public void sendWebMsg(Order order) { ...... log.info("TestMsgService sendWebMsg order = {}", order); } }
此处TestOrderService 中并无强制依赖注入TestMsgService,而是经过发布事件的方式将事件数据发布到事件管理中心,由事件管理者来统一管理事件处理方式,解决了各个业务service严重耦合的场景,实现软件开发中的“高内聚-低耦合”原则。
经过如上,大致了解了google guava的事件机制,可使用该机制很是简单的完成事件流程。