设计模式之发布订阅模式(3) 深刻Spring Events事件驱动模型

以前文章中咱们讲解了 发布订阅模式的核心概念 ,并经过 Redis的 Pub/Sub 命令 演示了其分布式场景下的实现。相比后面要讲到的 Guava EventBus,能够说 Spring Events 的使用更加广泛,其功能也更增强大。java

事件(Events)是框架中常常被忽略的、重要的功能,也是发布/订阅模式的一种常见实现。Spring框架自己就是事件驱动的。git

下面咱们就一块儿看一下Spring容器中的事件驱动模型,而后一块儿快速实现一个自定义的事件发布和监听,接着再分别探讨一下同步和异步的事件监听如何实现,再接着介绍一下如何定义监听器的顺序,最后提供一个基于SpEL(Spring Expression Language )实现的条件化的事件监听机制。github

学完本节课程,将你会发现:spring

  1. 经过事件机制将代码解耦,会让本身的代码很是干净,且扩展性极强。
  2. 异步的事件处理机制,能够大大提升程序的响应速度,且内部的线程池会大大提升程序的并发效率。
  3. 条件化和泛型化的监听器可让你减小不少显式的逻辑判断,从而让每一个事件监听的原子性更强。

🚜 本文源码Github地址 安全

Spring自己的事件驱动模型

Spring 容器与事件模型

Spring的事件机制主要提供了以下几个接口和类:springboot

  • ApplicationContextEvent

Spring提供的事件抽象类,你能够继承它来实现自定义的事件。bash

  • ApplicationEventMulticaster

ApplicationEventMulticaster是一个事件广播器, 它的做用是把Applicationcontext发布的Event广播给全部的监听器。并发

  • ApplicationListener

ApplicationListener继承自EventListener, 全部的监听器都要实现这个接口。app

这个接口只有一个onApplicationEvent()方法, 该方法接受一个ApplicationEvent或其子类对象做为参数, 在方法体中,能够经过不一样对Event类的判断来进行相应的处理。框架

当事件触发时全部的监听器都会收到消息, 若是你须要对监听器的接收顺序有要求,但是实现该接口的一个实现SmartApplicationListener, 经过这个接口能够指定监听器接收事件的顺序。

  • ApplicationContext

实现事件机制须要三个部分:事件源、事件和事件监听器。 上面介绍的ApplicationEvent至关于事件, ApplicationListener至关于事件监听器, 这里的事件源说的就是ApplicationContext

ApplicationContext是Spring中的全局容器, 也叫"应用上下文", 它负责读取bean的配置, 管理bean的加载, 维护bean之间的依赖关系, 也就是负责管理bean的整个生命周期。

ApplicationContext就是咱们平时所说的IOC容器。

  • ApplicationContextAware

当一个类实现了ApplicationContextAware接口以后,Aware接口的Bean在被初始以后,能够取得一些相对应的资源,这个类能够直接获取 spring 配置文件中全部注入的bean对象。

Spring提供了不少以Aware结尾的接口,经过实现这些接口,你就得到了获取Spring容器内资源的能力。

Spring自己实现了以下4个Event:

  • ContextStartedEvent (容器启动)
  • ContextStoppedEvent (容器中止)
  • ContextClosedEvent (容器关闭)
  • ContextRefreshedEvent (容器刷新)

自定义 Spring Events 实现

自定义Spring的事件模型须要三个角色:事件(Event)、发布者(Publisher)、监听者(Listerner)。

自定义Event

下面自定义了一个注册事件,这个Event的构造函数提供了两个参数,一个是发布源(source),一个是发布的消息(message)。

package net.ijiangtao.tech.designpattern.pubsub.spring.common;

import lombok.Getter;
import org.springframework.context.ApplicationEvent;

/** * 注册事件 * @author ijiangtao * @create 2019-05-02 12:59 **/
@Getter
public class RegisterEvent extends ApplicationEvent {

    private String message;

    public RegisterEvent(Object source, String message) {
        super(source);
        this.message = message;
    }

}
复制代码

自定义Publisher

下面提供了一个发布自定义事件的发布器,咱们经过ApplicationEventPublisher来把事件发布出去。

package net.ijiangtao.tech.designpattern.pubsub.spring.common;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;

import java.time.LocalTime;

/** * 注册事件发布器 * @author ijiangtao * @create 2019-05-02 13:01 **/
@Component
@Slf4j
public class RegisterEventPublisher {

    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;

    public void publish(final String message) {
        log.info("publis a RegisterEvent,message:{}", message + " time: " + LocalTime.now());
        RegisterEvent registerEvent = new RegisterEvent(this, message);
        applicationEventPublisher.publishEvent(registerEvent);
    }
}

复制代码

自定义Listener

下面提供几个自定义事件的监听器,它们都实现了ApplicationListener<RegisterEvent>接口,同时为了模拟处理事件的过程,这里让当前线程休眠了3秒。由于实现过程相似,这里仅提供一个实现。

package net.ijiangtao.tech.designpattern.pubsub.spring.common;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

import java.time.LocalTime;

/** * 发送注册成功邮件提醒 * * @author ijiangtao * @create 2019-05-02 13:07 **/
@Component
@Slf4j
public class SendRegisterEmailListener implements ApplicationListener<RegisterEvent> {
    @Override
    public void onApplicationEvent(RegisterEvent event) {
        try {
            Thread.sleep(3 * 1000);
        } catch (Exception e) {
            log.error("{}", e);
        }
        log.info("SendRegisterEmailListener message: " + event.getMessage()+" time: "+ LocalTime.now());
    }

}
复制代码

测试自定义事件机制

下面经过一个单元测试发布了自定义事件。经过观察log输出,发现事件发布之后,每一个监听器都依次输出了监听日志。

package net.ijiangtao.tech.designpattern.pubsub.spring;

import lombok.extern.slf4j.Slf4j;
import net.ijiangtao.tech.designpattern.pubsub.spring.common.RegisterEventPublisher;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

/** * Spring Events * * @author ijiangtao * @create 2019-05-02 12:53 **/
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class SpringEventsCommonTests {

    @Autowired
    private RegisterEventPublisher registerEventPublisher;

    @Test
    public void test1(){
        registerEventPublisher.publish(" Danny is here.");
        try {
            Thread.sleep(10 * 1000);
        } catch (Exception e) {
            log.error("{}", e);
        }
    }

}
复制代码

这样,一个基于Spring Events的事件监听器就实现了。

实现异步的 Spring Events

经过观察日志中打印的时间你会发现,上面注册的全部监听器,都是依次执行的,也就是Spring Events的事件处理默认是同步的。同步的事件监听耗时比较长,须要等待上一个监听处理结束,下一个监听器才能执行。

那么能不能改为异步监听呢?答案是确定的。下面介绍两种实现方式。

配置

经过JDK提供的SimpleApplicationEventMulticaster将事件广播出去,就能够实现异步并发地让多个监听器同时执行事件监听动做。

package net.ijiangtao.tech.designpattern.pubsub.spring.async.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ApplicationEventMulticaster;
import org.springframework.context.event.SimpleApplicationEventMulticaster;
import org.springframework.core.task.SimpleAsyncTaskExecutor;

/** * 异步事件监听配置 * * @author ijiangtao * @create 2019-05-02 13:23 **/
@Configuration
public class AsynchronousSpringEventsConfig {

    @Bean(name = "applicationEventMulticaster")
    public ApplicationEventMulticaster simpleApplicationEventMulticaster() {
        SimpleApplicationEventMulticaster eventMulticaster  = new SimpleApplicationEventMulticaster();
        eventMulticaster.setTaskExecutor(new SimpleAsyncTaskExecutor());
        return eventMulticaster;
    }
}
复制代码

经过SimpleApplicationEventMulticaster的源码能够看到它的multicastEvent方法会经过线程池并发执行事件发布动做。

public void multicastEvent(ApplicationEvent event, @Nullable ResolvableType eventType) {
     ResolvableType type = eventType != null ? eventType : this.resolveDefaultEventType(event);
     Iterator var4 = this.getApplicationListeners(event, type).iterator();

     while(var4.hasNext()) {
         ApplicationListener<?> listener = (ApplicationListener)var4.next();
         Executor executor = this.getTaskExecutor();
         if (executor != null) {
             executor.execute(() -> {
                 this.invokeListener(listener, event);
             });
         } else {
             this.invokeListener(listener, event);
         }
     }

 }
复制代码

注解

经过注解的方式发布事件,只须要在Listener上加上@Async,而且在发布事件的地方加上@EnableAsync注解便可。

package net.ijiangtao.tech.designpattern.pubsub.spring.async.annotation;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.time.LocalTime;

/** * 发送优惠券 * * @author ijiangtao * @create 2019-05-02 13:07 **/
@Component
@Slf4j
public class UserActionListenerAsyncAnnotation implements ApplicationListener<RegisterEvent> {

    @Async
    @Override
    public void onApplicationEvent(RegisterEvent event) {
        try {
            Thread.sleep(3 * 1000);
        } catch (Exception e) {
            log.error("{}", e);
        }
        log.info("UserActionListener message: " + event.getMessage()+" time: "+ LocalTime.now());
    }

}
复制代码
package net.ijiangtao.tech.designpattern.pubsub.spring;

import lombok.extern.slf4j.Slf4j;
import net.ijiangtao.tech.designpattern.pubsub.spring.async.annotation.RegisterEventPublisherAsyncAnnotation;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.test.context.junit4.SpringRunner;

/** * Spring Events * * @author ijiangtao * @create 2019-05-02 12:53 **/
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
@EnableAsync
public class SpringEventsAsyncAnnotationTests {

    @Autowired
    private RegisterEventPublisherAsyncAnnotation registerEventPublisherAsyncAnnotation;

    @Test
    public void test2() {
        registerEventPublisherAsyncAnnotation.publish(" Danny is here (Async).");
        try {
            Thread.sleep(10 * 1000);
        } catch (Exception e) {
            log.error("{}", e);
        }
    }

}
复制代码

实现 Smart Listener

经过实现SmartApplicationListener接口,能够自定义监听器的执行顺序、支持的事件类型等。

package net.ijiangtao.tech.designpattern.pubsub.spring.smart;

import lombok.Getter;
import org.springframework.context.ApplicationEvent;

/** * event * * @author ijiangtao * @create 2019-05-02 15:33 **/
@Getter
public class SmartEvent extends ApplicationEvent {

    private String message;

    public SmartEvent(Object source, String message) {
        super(source);
    }

}
复制代码
package net.ijiangtao.tech.designpattern.pubsub.spring.smart;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.event.SmartApplicationListener;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;

/** * SmartApplicationListener * * @author ijiangtao * @create 2019-05-02 15:32 **/
@Component
@Slf4j
public class CustomSmartApplicationListener1 implements SmartApplicationListener {


    /** * 自定义支持的事件类型 * @param eventType * @return */
    @Override
    public boolean supportsEventType(Class<? extends ApplicationEvent> eventType) {
        return eventType == SmartEvent.class;
    }

    /** * 定义支持的事件源类型 * @param sourceType * @return */
    @Override
    public boolean supportsSourceType(Class<?> sourceType) {
        return sourceType == String.class;
    }

    /** * 自定义优先级别 * @return */
    @Override
    public int getOrder() {
        return Ordered.LOWEST_PRECEDENCE;
    }

    @Override
    public void onApplicationEvent(ApplicationEvent applicationEvent) {
        log.info("CustomSmartApplicationListener {}",applicationEvent.getSource());
    }

}
复制代码

条件化的事件监听

有时候咱们但愿一个监听器但愿监听多个事件,例如一个系统安全监听器(SecurityEventListener),能够监听各类系统安全问题(NetWorkSecurityEvent、SQLSecurityEvent、AuthorizationSecurityEvent,等等),这个是时候你可让监听器监听这些Event的父类SecurityEvent,这样你的监听器就能够监听到全部该Event的子类型。

有时候咱们须要根据同一个事件抛出的消息的某个值来决定用哪一个监听器来处理。例如SecurityEvent有个安全级别level属性,你定义了5个level,每一个level都有不一样的处理机制。按照传统的实现方式须要经过条件判断(if/else或者switch/case等)来实现,代码的封装性很差。这种状况下,你能够在你的Listener的监听方法上增长@EventListener注解,并经过condition参数来指定过滤条件。例如 condition = "#event.success eq false")就是经过SpEL表示:当方法的参数event变量的success属性等于false的时候才执行监听方法。

下面咱们就演示一下实现过程。

提供泛型的Event基类:

package net.ijiangtao.tech.designpattern.pubsub.spring.generic;

import lombok.Getter;

/** * GenericSpringEvent * * @author ijiangtao * @create 2019-05-02 13:47 **/
@Getter
public class GenericSpringEvent<T> {

    private T what;

    protected boolean success;

    public GenericSpringEvent(T what, boolean success) {
        this.what = what;
        this.success = success;
    }

}
复制代码

基于Event基类自定义Event实现

package net.ijiangtao.tech.designpattern.pubsub.spring.generic.checkout;

import lombok.Getter;
import net.ijiangtao.tech.designpattern.pubsub.spring.generic.GenericSpringEvent;

/**
 * GenericSpringEventCheckout
 *
 * @author ijiangtao
 * @create 2019-05-02 13:58
 **/
@Getter
public class GenericSpringEventCheckout extends GenericSpringEvent<Long> {

    private Long userId;

    public GenericSpringEventCheckout(Long userId, boolean success) {
        super(userId, success);
    }

}
复制代码

提供条件化监听器

监听器监听基类Event的全部字类,而且经过@EventListener注解和SpEL定义监听的Event的过滤条件。

package net.ijiangtao.tech.designpattern.pubsub.spring.generic;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

/** * @author ijiangtao * @create 2019-05-02 13:52 **/
@Component
@Slf4j
public class GenericSpringEventSuccessListenerLong {

    @EventListener(condition = "#event.success")
    public void handle(GenericSpringEvent<Long> event) {
        log.info("Handling generic event Success (conditional). {}",event.getWhat());
    }

}
复制代码
package net.ijiangtao.tech.designpattern.pubsub.spring.generic;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

/** * @author ijiangtao * @create 2019-05-02 13:52 **/
@Component
@Slf4j
public class GenericSpringEventFailListenerLong {

    @EventListener(condition = "#event.success eq false")
    public void handle(GenericSpringEvent<Long> event) {
        log.info("Handling generic event Fail (conditional). {}",event.getWhat());
    }

}
复制代码

自定义事件发布器

package net.ijiangtao.tech.designpattern.pubsub.spring.generic.checkout;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;

import java.time.LocalTime;

/** * GenericSpringEventPublisher * * @author ijiangtao * @create 2019-05-02 13:55 **/
@Component
@Slf4j
public class GenericSpringEventPublisherCheckout {

    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;

    public void publish(final Long userId, boolean success) {

        log.info("publis a GenericSpringEventPublisher, userId:{}", userId + " time: " + LocalTime.now());

        GenericSpringEventCheckout eventCheckout = new GenericSpringEventCheckout(userId, success);

        applicationEventPublisher.publishEvent(eventCheckout);
    }

}
复制代码

单元测试

下面提供了一个测试方法,经过观察日志发现,不一样的条件,触发了不一样的监听器。

package net.ijiangtao.tech.designpattern.pubsub.spring;

import lombok.extern.slf4j.Slf4j;
import net.ijiangtao.tech.designpattern.pubsub.spring.generic.checkout.GenericSpringEventPublisherCheckout;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.ApplicationEvent;
import org.springframework.test.context.junit4.SpringRunner;

/** * Spring Events * * @author ijiangtao * @create 2019-05-02 12:53 **/
@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class SpringEventsGenericTests {

    @Autowired
    private GenericSpringEventPublisherCheckout checkoutPubliser;


    @Test
    public void test1() {

        ApplicationEvent applicationEvent;
        checkoutPubliser.publish(101L, true);

        checkoutPubliser.publish(202L, false);
    }

}

复制代码

Wechat-westcall

相关连接

相关文章
相关标签/搜索