在Spring中使用事件机制

1. 事件机制

事件机制的底层设计模式是观察者模式,观察者设计模式定义了对象间的一种一对多的组合关系,以便一个对象的状态发生变化时,全部依赖于它的对象都获得通知并自动刷新,它可以将观察者和被观察者之间进行解耦。本文不具体介绍观察者模式,读者能够从其余文章进行查阅。java

使用事件机制可以应对不少场景,它可以对系统业务逻辑之间的解耦。例如Swing中抽象出不少事件例如鼠标点击、键盘键入等,它经过事件派发线程不断从事件队列中获取事件并调用事件监听器的事件处理方法来处理事件。spring

再举一个实际业务中会存在的场景,咱们有一个方法TradeService进行用户交易操做,此时有一个用户支付订单成功的方法paySuccess。当用户支付成功时,会有不少操做,好比向库存发送通知,以下述代码(咱们对其进行简化,仅传入orderId):设计模式

public class TradeServiceImpl implements TradeService {

	/**
	 * 支付成功相关操做
	 * @param orderId 订单号
	 */
	@Override
	public void paySuccess(Long orderId) {
		Preconditions.checkNotNull(orderId);
		
		// 1.修改订单状态为支付成功

		// 2.告知仓库准备发货

	}

}


复制代码

可是此时产品经理须要咱们在支付成功以后可以经过短信告诉用户能够进行抽奖,那咱们仍须要修改该类。每次添加新功能的时候都须要修改原有的类,难以维护,这违反了设计模式的单一职责原则开闭原则。咱们可使用事件机制,将支付操做和其余支付以后的相关操做进行分离,经过事件来解耦。下文将围绕该业务经过事件进行改造。 (注:实际上该应用场景由于一般是分布式场景因此咱们须要经过消息中间件例如Kafka进行异步处理,本文因为介绍Spring内置事件机制因此利用本地事件进行简化)bash

2. Spring原生事件驱动模型

Spring提供了一些内置的事件供开发者使用:app

事件名称 概述
ContextRefreshedEvent 该事件会在ApplicationContext被初始化或者更新时发布。也能够在调用ConfigurableApplicationContext 接口中的refresh()方法时被触发。
ContextStartedEvent 当容器调用ConfigurableApplicationContext的Start()方法开始/从新开始容器时触发该事件。
ContextStoppedEvent 当容器调用ConfigurableApplicationContext的Stop()方法中止容器时触发该事件。
ContextClosedEvent 当ApplicationContext被关闭时触发该事件。容器被关闭时,其管理的全部单例Bean都被销毁。
RequestHandledEvent 在Web应用中,当一个http请求(request)结束触发该事件。

2.1 同步方式

  • 建立事件

首先咱们先建立相关的事件,本场景下咱们须要围绕支付成功进行相关操做,因此咱们须要建立支付成功事件PaySuccessEvent,相关代码以下所示:异步

@Data
public class PaySuccessEvent extends ApplicationEvent {

	/**
	 * 订单ID
	 */
	Long orderId;

	public PaySuccessEvent(Object source, Long orderId) {
		super(source);
		this.orderId = orderId;
	}
}

复制代码
  • 发布事件

发布事件咱们能够经过ApplicationContext或者ApplicationEventPublisher进行发布,可是若是咱们只须要进行发布事件,咱们只须要建立一个类实现ApplicationEventPublisherAware接口,经过回调方法,使其可以得到ApplicationEventPublish,该接口的publishEvent方法可以对事件进行发布:async

public interface ApplicationEventPublisherAware extends Aware {

	/**
	 * Set the ApplicationEventPublisher that this object runs in.
	 * <p>Invoked after population of normal bean properties but before an init
	 * callback like InitializingBean's afterPropertiesSet or a custom init-method. * Invoked before ApplicationContextAware's setApplicationContext.
	 * @param applicationEventPublisher event publisher to be used by this object
	 */
	void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher);

}
复制代码

咱们经常使用的ApplicationContext也是继承自该接口:分布式

public interface ApplicationContext extends EnvironmentCapable, ListableBeanFactory, HierarchicalBeanFactory,
		MessageSource, ApplicationEventPublisher, ResourcePatternResolver
复制代码

若须要获取ApplicationContext咱们只须要实现ApplicationContextAware接口经过其回调方法便可设置上下文环境。ide

咱们建立一个事件发布的类:ui

public class EventPublisher implements ApplicationEventPublisherAware {

	public static ApplicationEventPublisher applicationEventPublisher;

	@Override
	public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
		EventPublisher.applicationEventPublisher = applicationEventPublisher;
	}

	public static void publishEvent(ApplicationEvent applicationEvent) {
		applicationEventPublisher.publishEvent(applicationEvent);
	}
}

复制代码

而后咱们对支付成功事件进行发布,以下所示:

public class TradeServiceImpl implements TradeService {

	/**
	 * 支付成功相关操做
	 * @param orderId 订单号
	 */
	@Override
	public void paySuccess(Long orderId) {
		Preconditions.checkNotNull(orderId);
		EventPublisher.publishEvent(new PaySuccessEvent(this, orderId));
	}

}
复制代码
  • 事件监听

咱们可使用EventListener注解或者实现ApplicationListener接口来对事件进行监听。

  • 1. 使用EventListener注解

@Component
public class PaySuccessEventListener {

	/**
	 * 修改订单状态
	 * @param paySuccessEvent 支付成功事件
	 */
	@EventListener
	public void modifyOrderStatus(PaySuccessEvent paySuccessEvent)
	{
		System.out.println("修改订单状态成功!orderId:" + paySuccessEvent.getOrderId());
	}

	/**
	 * 发送短信告知用户进行抽奖
	 * @param paySuccessEvent 支付成功事件
	 */
	@EventListener
	public void sendSMS(PaySuccessEvent paySuccessEvent)
	{
		System.out.println("发送短信成功!orderId:" + paySuccessEvent.getOrderId());
	}


}

复制代码
  • 2. 实现ApplicationListener接口

public class PaySuccessListener implements ApplicationListener<PaySuccessEvent> {

	@Override
	public void onApplicationEvent(PaySuccessEvent event) {
		System.out.println("告知仓库准备发货成功!" + event.getOrderId());
	}

}
复制代码

2.2 异步方式

Spring经过ApplicationEventMulticaster提供异步侦听事件的方式,可是注册 ApplicationEventMulticaster Bean 后全部的事件侦听处理都会变成的异步形式,若是须要针对特定的事件侦听采用异步方式的话:可使用@EnableAsync和@Async组合来实现。 咱们能够经过@EnableAsync注解开启异步方式,以后咱们在须要异步监听的方法上加上@Async注解,这样可以使得发布事件时,发布方能异步调用监听器,主方法不会阻塞。如下是EnableAsync的部分注释:

* <p>By default, Spring will be searching for an associated thread pool definition:
 * either a unique {@link org.springframework.core.task.TaskExecutor} bean in the context,
 * or an {@link java.util.concurrent.Executor} bean named "taskExecutor" otherwise. If
 * neither of the two is resolvable, a {@link org.springframework.core.task.SimpleAsyncTaskExecutor}
 * will be used to process async method invocations. Besides, annotated methods having a
 * {@code void} return type cannot transmit any exception back to the caller. By default,
 * such uncaught exceptions are only logged.
 
  * <p>To customize all this, implement {@link AsyncConfigurer} and provide:
 * <ul>
 * <li>your own {@link java.util.concurrent.Executor Executor} through the
 * {@link AsyncConfigurer#getAsyncExecutor getAsyncExecutor()} method, and</li>
 * <li>your own {@link org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler
 * AsyncUncaughtExceptionHandler} through the {@link AsyncConfigurer#getAsyncUncaughtExceptionHandler
 * getAsyncUncaughtExceptionHandler()}
 * method.</li>
 * </ul>
复制代码

简而言之就是Spring默认状况下会先搜索TaskExecutor或者名称为taskExecutor的Executor类型的bean,若都不存在那么Spring会用SimpleAsyncTaskExecutor去执行异步方法。此外咱们还能够经过实现AsyncConfigurer接口去自定义异步配置。

咱们新建一个异步配置类AsyncConfig,使其具有异步能力:

@Configuration
@EnableAsync
@Slf4j
public class AsyncConfig implements AsyncConfigurer {

	@Override
	@Bean(name = "taskExecutor")
	public Executor getAsyncExecutor() {
		ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
		taskExecutor.setCorePoolSize(5);
		taskExecutor.setMaxPoolSize(10);
		taskExecutor.setQueueCapacity(25);
		taskExecutor.initialize();
		return taskExecutor;
	}

	@Override
	public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
		return new MyAsyncExceptionHandler();
	}

	/**
	 * 自定义异常处理类
	 */
	class MyAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {

		//手动处理捕获的异常
		@Override
		public void handleUncaughtException(Throwable throwable, Method method, Object... obj) {
			System.out.println("------catched exception!------");
			log.info("Exception message - " + throwable.getMessage());
			log.info("Method name - " + method.getName());
			for (Object param : obj) {
				log.info("Parameter value - " + param);
			}
		}
	}
}
复制代码

在开启异步化以后,咱们只须要在监听方法上添加@Async注解就能够实现事件的异步调用。

相关文章
相关标签/搜索