何为事件监听者模式 ?java
第一就是为啥我强调事件二字 ,由于他是目标 . 在咱们开发中绝对见到过一堆后缀是
Listener
的类, 这个就是监听者模式, 监听者模式是一种CS开发架构
,很好的作了一种设计的解耦,监听者注册到一个邮局中,订阅某种事件(提早说好了), 邮局会按需求发布消息, 监听者会及时收到消息来处理 . 其中整个Java开发环境中 , JDK已经帮咱们定义好了接口 , Spring就是基于JDK接口下实现的, Guava则是另外一种实现方式, 各有千秋 , 你们看看吧 , 究竟是回调好仍是阻塞, 仍是Guava这种方式呢, 他和观察者模式有何区别呢 ? 咱们有机会在讲spring
事件对象 , 他须要一个事件源 , 用构造函数传递的设计模式
public class EventObject implements java.io.Serializable {
protected transient Object source;
public EventObject(Object source) {
if (source == null)
throw new IllegalArgumentException("null source");
this.source = source;
}
........... 其余省略
}
复制代码
事件监听者,他是负责监听事件的 , JAVA提供的是一个空接口, 让咱们根绝需求写安全
public interface EventListener {
}
复制代码
咱们发现 java 提供的只提供了一个事件对象 ,和一个事件监听器 ,因此须要咱们遵照这个规范去开发多线程
通常状况下 都会设置成一个 Object 类型的 , 不须要咱们去设计一个,为了体现设计模式的角色,咱们就设计了一个架构
@ToString
@Setter
@Getter
public class EventSource {
private String name;
private String info;
}
复制代码
这里咱们继承了 EventObject , 只是简单的实现了一下 , 并无作过多的包装app
public class CoreEventObject extends EventObject {
public CoreEventObject(EventSource source) {
super(source);
}
}
复制代码
这里咱们真正的监听者 , 通常状况下都须要设计成一个 函数式接口 , 我这个是和Spring框架学习的 , 由于函数式接口才能体现回调 ,框架
@FunctionalInterface
public interface CoreEventListener<E extends CoreEventObject> extends EventListener {
void onEventObject(E event);
}
复制代码
事件发布者 ,由于没有发布的事件对象, 哪来的监听异步
public class EventPublisher<E extends CoreEventObject> {
private CoreEventListener<E> listener;
public EventPublisher(CoreEventListener<E> listener) {
this.listener = listener;
}
public void publish(E object){
System.out.println("发布事件 : " + object);
// 传给 CoreEventListener
listener.onEventObject(object);
}
}
复制代码
public class TestDemo {
public static void main(String[] args) {
// 1. 建立一个事件发布者
EventPublisher<CoreEventObject> publisher = new EventPublisher<>(new CoreEventListener<CoreEventObject>() {
@Override
public void onEventObject(CoreEventObject event) {
System.out.println("接收到事件源 : " + event.getSource() + " , 当前线程 : " + Thread.currentThread().getName());
}
});
// 2. 发布一个事件对象
publisher.publish(getCoreEventObject());
}
private static CoreEventObject getCoreEventObject(){
..... 此处省略
return eventObject;
}
}
复制代码
输出结果 :ide
发布事件 : com.example.listener_design_pattern.CoreEventObject[source=EventSource(name=事件源, info=Sat Nov 09 14:34:50 CST 2019)]
接收到事件源 : EventSource(name=事件源, info=Sat Nov 09 14:34:50 CST 2019) , 当前线程 : main
复制代码
咱们发现咱们成功的接收到了事件对象 和 事件源 , 这个就是钩子函数的魅力 . 其实你只是作了一个事件发布你无意观察其余的东西 , 只须要一个监听者就能够作到监听了 , 这样你的事件发布 和 监听 彻底就解耦了 .其实底层就是一个地址引用 .
不少场景下,咱们的发布事件和监听事件彻底在两个线程中,那么咱们如何拿到事件对象呢 ?
若是咱们简单使用一下 , 会这么写 ?
public class TestEventListener implements CoreEventListener<CoreEventObject> {
private CoreEventObject object;
@Override
public void onEventObject(CoreEventObject object) {
// 赋值给成员变量
this.object = object;
}
// 获取成员变量
public CoreEventObject getObject() {
return object;
}
}
复制代码
测试一下 :
public class TestDemo {
public static void main(String[] args) {
TestEventListener listener = new TestEventListener();
CoreEventObject object = listener.getObject();
// 先去拿 ,后去发布
System.out.println(object.getSource());
EventPublisher<CoreEventObject> publisher = new EventPublisher<>(listener);
publisher.publish(getCoreEventObject());
}
private static CoreEventObject getCoreEventObject(){
....
return eventObject;
}
}
复制代码
输出结果
Exception in thread "main" java.lang.NullPointerException
at com.example.listener_design_pattern.TestDemo.main(TestDemo.java:28)
复制代码
有些人就会说 , 你这不对哇 ,你固然拿不到了 ,由于人家还没发布了 ,可是在多线程 ,在解耦的状况下 ,你哪知道对面什么时候发布结束了 , 你再去拿呢 ? 那就须要java的多线程知识了 ,Future 给咱们带来了提醒 , 就是阻塞的思想 , 只有监听者真正的收到对象 , 咱们才能去拿 .
了解过我前面提到的那一节 FutureTask
是如何实现的 ,我以为问题就迎刃而解了 .
public class TestEventListener implements CoreEventListener<CoreEventObject> {
private CoreEventObject object;
/** * 当 X = 0 ,表明 obj尚未初始化了 * 当 x = 1 , 表明 obj 以及初始化了 , 已经接收到了 */
private static volatile int x = 0;
@Override
public void onEventObject(CoreEventObject object) {
this.object = object;
// 收到改为 1
x = 1;
}
public CoreEventObject getObject() {
while (true) {
if (x == 1) {
break;
}
}
// 拿到对象,再设置为1
x = 0;
return object;
}
}
复制代码
因为这个解决方案,会使得执行getObject()
的线程一直的阻塞下去,就是死循环下去,咱们必须一个线程去执行这个方法 ,
public class TestDemo {
public static void main(String[] args) {
TestEventListener listener = new TestEventListener();
// 新建一个线程去接收
Thread thread = new Thread(() -> {
System.out.println("我开始接收对象 : " + System.currentTimeMillis());
CoreEventObject object = listener.getObject();
System.out.println("成功接收对象 : "+object.getSource());
});
thread.start();
// 新建一个线程去发布
EventPublisher<CoreEventObject> publisher = new EventPublisher<>(listener);
new Thread(()->{
publisher.publish(getCoreEventObject());
}).start();
}
private static CoreEventObject getCoreEventObject(){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
EventSource source = new EventSource();
source.setName("事件源");
source.setInfo("" + System.currentTimeMillis());
return new CoreEventObject(source);
}
}
复制代码
输出结果 :
我开始接收对象 : 1573282924555
发布事件 : com.example.listener_design_pattern.CoreEventObject[source=EventSource(name=事件源, info=1573282925590)]
成功接收对象 : EventSource(name=事件源, info=1573282925590)
复制代码
咱们咱们接收的时候是在1573282924555的时间搓 , 而真正拿到的对象确实在1573282925590发布的 , 这个就彻底在俩时间轴上,因此咱们成功的解决了问题 .
Class to be extended by all application events. Abstract as it doesn't make sense for generic events to be published directly.
此类被全部的
application events
所继承 。抽象的缘由是由于直接发布这个ApplicationEvent
是没有意义的。
Interface to be implemented by application event listeners. Based on the standard java.util.EventListener interface for the Observer design pattern.
这个接口被全部的
application event listeners.
所实现 , 基于Java的java.util.EventListener
接口规范
咱们有一个需求就是 ,咱们有一个服务会从远程不断的去拉去配置信息 ,一旦有改变就会发布配置信息 .
@ToString
@Setter
@Getter
public class Config {
private String namespace;
private Map<String, Object> info;
}
复制代码
// 这个注解,咱们是根据Spring源码看到的 , 因此一致性,我就加了
@SuppressWarnings("serial")
public class ConfigEvent extends ApplicationEvent {
public ConfigEvent(Config source) {
super(source);
}
}
复制代码
@Component
public class ConfigEventListener implements ApplicationListener<ConfigEvent> , Ordered, InitializingBean {
@Override
public void onApplicationEvent(ConfigEvent event) {
System.out.println("接收到更新信息 : " + event.getSource()+" , 当前线程 : "+Thread.currentThread().getName());
}
// 保证执行顺序 , 多个 ConfigEventListener就须要实现这个接口
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
// 初始化之后要作什么 ?
@Override
public void afterPropertiesSet() throws Exception {
System.out.println("初始化当前ConfigEventListener");
}
}
复制代码
@Service
public class ConfigServer {
// 注入applicationContext,由于只有他才能够执行发布事件
@Autowired
private ApplicationContext applicationContext;
// 这个是开启异步 ,后面会说到
// @Async
public void publishConfig(){
// 须要发布 --- > 改变的事件
System.out.println("发布事件成功 , 当前线程 : "+Thread.currentThread().getName());
applicationContext.publishEvent(getChange());
}
public ConfigEvent getChange(){
Config config = new Config();
config.setNamespace("application");
HashMap<String, Object> conf = new HashMap<>();
conf.put("server.port", 8088);
config.setInfo(conf);
return new ConfigEvent(config);
}
}
复制代码
@SpringBootApplication
public class SpringListenerApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(SpringListenerApplication.class, args);
}
@Autowired
private ConfigServer server;
@Override
public void run(String... args) throws Exception {
server.publishConfig();
}
}
复制代码
输出结果 :
.....
初始化当前ConfigEventListener
....
发布事件成功 , 当前线程 : main
接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 当前线程 : main
复制代码
因此一个Spring-Boot
的事件监听仍是很简单的 ,类比到 Spring
一个道理,相信懂得人都知道 . 可是又一个问题是咱们的 发布和监听都是 main线程 ,很差吧 ,玩意有不少事件了 ?
须要两个注解 @EnableAsync
启动Async功能 , 和 @Async
某个方法使用异步执行
发布事件成功 , 当前线程 : SimpleAsyncTaskExecutor-1
接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 当前线程 : SimpleAsyncTaskExecutor-1
复制代码
咱们发现就出现了线程池执行 , 这个理的线程池 ,是能够进行配置的 , 只须要咱们显式的注入下面这个SimpleAsyncTaskExecutor
Bean 就能够了
@Bean
public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor() {
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
// 须要传入一个 ThreadFactory实现类 , 因此看过我前面写的文章应该会写这个,好比 JUC- Executor那节
executor.setThreadFactory(new MyThreadFactory("anthony"));
return executor;
}
复制代码
输出结果 :
发布事件成功 , 当前线程 : anthony-1
接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 当前线程 : anthony-1
复制代码
你能够跟我同样选择实现 ApplicationListener
和Ordered
,或者你能够直接实现 SmartApplicationListener
都同样的哈,没有哪一个好哪一个很差
监听器 一 :
@Component
public class ConfigEventListenerStart implements ApplicationListener<ConfigEvent> , Ordered, InitializingBean {
@Override
public void onApplicationEvent(ConfigEvent event) {
System.out.println("ConfigEventListenerStart 接收到更新信息 : " + event.getSource()+" , 当前线程 : "+Thread.currentThread().getName());
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
@Override
public void afterPropertiesSet() throws Exception {
System.out.println("初始化当前监听器 : " + this.toString());
}
}
复制代码
监听器 二 :
@Component
public class ConfigEventListenerEnd implements ApplicationListener<ConfigEvent> , Ordered, InitializingBean {
@Override
public void onApplicationEvent(ConfigEvent event) {
System.out.println("ConfigEventListenerEnd 接收到更新信息 : " + event.getSource()+" , 当前线程 : "+Thread.currentThread().getName());
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE-1;
}
@Override
public void afterPropertiesSet() throws Exception {
System.out.println("初始化当前监听器 : " + this.toString());
}
}
复制代码
输出结果 :
初始化当前监听器 : com.example.springlistener.listener.ConfigEventListenerEnd@6b54655f
初始化当前监听器 : com.example.springlistener.listener.ConfigEventListenerStart@665e9289
.....
发布事件成功 , 当前线程 : anthony-1
ConfigEventListenerStart 接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 当前线程 : anthony-1
ConfigEventListenerEnd 接收到更新信息 : Config(namespace=application, info={server.port=8088}) , 当前线程 : anthony-1
复制代码
Guava的EventBus 就是一个很好的事件注册发布的管理工具 , 他属于一种推送的模式 , 跟spring的很类似,
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>28.0-jre</version>
</dependency>
复制代码
主要的对象就是 ,
EventBus
事件总线, 他是管理全部监听者(或者叫作订阅者) ,经过EventBus#register
或者EventBus#unRegister
来管理的 , 同时监听者要有监听的事件 , 这里是基于方法级别的 (注意方法只能有一个参数,就是监听的事件), 须要在方法上加上@Subscribe
注解来表示监听 ,EventBus
能够经过EventBus#post
方法来发布事件 , 对应类型的监听者就会收到 . 同时EventBus
能够处理异常
public class QuicklyStart {
public static void main(String[] args) {
// 建立一个 事件总线
EventBus bus = new EventBus(new SubscriberExceptionHandler() {
@Override
public void handleException(Throwable exception, SubscriberExceptionContext context) {
// 处理订阅者异常信息
System.out.println("异常信息 : "+exception.getMessage() + ", 异常事件 : " + context.getEvent());
}
});
// 注册你的监听器 , 其实更加准确来讲是订阅者 , 他属于一种发布订阅模式
bus.register(new EventListener());
// 事件总线发布事件
bus.post("sb");
// 事件总线发布事件
bus.post(new Event("hello Guava"));
}
}
/** * 事件源 */
class Event {
String msg;
public Event(String msg) {
this.msg = msg;
}
@Override
public String toString() {
return "Event{" + "msg='" + msg + '\'' + '}';
}
}
/** * 监听器 */
class EventListener {
/** * {@link Subscribe} 一个这个表明一个订阅者,EventBus会将符合的事件发布到对应的订阅者上 , 可是不支持java的基本数据类型, int 之类的 * * @param event */
@Subscribe
public void onEvent(Event event) {
System.out.println("当前线程 : " + Thread.currentThread().getName() + ", 接收到事件 : " + event);
}
@Subscribe
public void onStringEvent(String event) {
error(); // 模拟异常
System.out.println("当前线程 : " + Thread.currentThread().getName() + ", 接收到事件 : " + event);
}
private void error() {
int i = 1 / 0;
}
}
复制代码
输出 :
异常信息 : / by zero, 异常事件 : sb
当前线程 : main, 接收到事件 : Event{msg='hello Guava'}
复制代码
其实很简单 , 第一注册的时候 :
/** Registers all subscriber methods on the given listener object. */
void register(Object listener) {
// 其实就是注解扫描 , 而后把元信息都给整出来
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> eventMethodsInListener = entry.getValue();
// 查找有没有
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
// 没有建立一个对象
if (eventSubscribers == null) {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
eventSubscribers =
MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
}
// 添加进去 ,实际上是 Subscriber对象 , 把method信息, 对象信息所有封装进去了
eventSubscribers.addAll(eventMethodsInListener);
}
}
复制代码
第二就是 发布
public void post(Object event) {
// 根据事件获取对应的 Subscriber
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
// 发布出去
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
checkNotNull(subscribers);
// 获取当前线程的队列 ,用ThreadLocal维护的线程安全, 实际上是为了安全
Queue<Event> queueForThread = queue.get();
// 建立一个事件对象
queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) {
dispatching.set(true);
try {
Event nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
while (nextEvent.subscribers.hasNext()) {
// 处理
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
dispatching.remove();
queue.remove();
}
}
}
/** Dispatches {@code event} to this subscriber using the proper executor. */
final void dispatchEvent(final Object event) {
executor.execute(
new Runnable() {
@Override
public void run() {
try {
// 关键点 ,就是这
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
// 出现异常直接就 ... 调用异常回调方法了
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}
@VisibleForTesting
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
try {
// 原来如此 , .........method.invok 真.... 因此能够抓取异常
method.invoke(target, checkNotNull(event));
} catch (IllegalArgumentException e) {
throw new Error("Method rejected target/argument: " + event, e);
} catch (IllegalAccessException e) {
throw new Error("Method became inaccessible: " + event, e);
} catch (InvocationTargetException e) {
if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
}
throw e;
}
}
复制代码
因此这个玩意很简单 , 原理一看就分析出来了
咱们发现咱们的本身实现的监听者和 Spring
和Guava
这俩种实现有啥区别 , 无非就是咱们本身实现的监听者模式, 对于 listener 的管理,没有作 , 咱们只是一个 Publisher 一个 Listener,一对一的关系 , 这样子就很很差, 100个监听者就须要100个发布者 , 不符合设计模式的原则 , 因此参考Guava
,咱们发现他无非作的就是一个对于Listener 的管理 , 可是有一个细节但愿你们知道, 对于监听者模式 , 万一事件发布失败了 , 咱们如何知道, 因此Guava
至少帮咱们作了 , 他不是基于回调机制的, 而是使用了Java
的 Method#invoke
,看需求而定吧 , 只不过回调更加轻量级,