Spring Boot自带了消息机制,能够让咱们在一个地方发布消息,多个地方同时接收消息并处理消息,固然这是在同一个JVM内存下进行的,不一样的进程还须要使用MQ来实现。我以为该消息模式跟观察者模式有必定的区别,观察者模式通常观察的是一个对象内部属性发生变化的时候使用。而该消息机制能够在任意地方使用。app
消息事件自己是一个对象,继承于ApplicationEvent异步
@Data public class DemoEvent extends ApplicationEvent { private String type; private List<Map> msg; public DemoEvent(Object source,String type,List<Map> msg) { super(source); this.type = type; this.msg = msg; } }
还须要有一个消息事件发布者,将这个消息事件给发布出去async
@Component public class DemoPublisher { @Autowired ApplicationContext applicationContext; public void publish(DemoEvent demoEvent) { applicationContext.publishEvent(demoEvent); } }
而后就是咱们的侦听者,侦听者能够有任意个根据业务不一样作不一样的处理,他的写法分两种,一个是实现了ApplicationListener接口,一个是在方法上打上@EventListener标签ide
@Component @Slf4j public class DemoListener implements ApplicationListener<DemoEvent> { @Override @Async public void onApplicationEvent(DemoEvent demoEvent) { log.info("接收到publisher发送到消息,时间" + Time.getTime()); List<Map> msg = demoEvent.getMsg(); String type = demoEvent.getType(); try { Thread.sleep(3000); }catch (Exception e) { e.printStackTrace(); } log.info("类型" + type + ",消息内容:" + msg); } }
@Component @Slf4j public class DemoListener1 { @EventListener public void onDemoEvent(DemoEvent demoEvent) { log.info("listener1经过注解接收到了publisher发送的消息,时间" + Time.getTime()); String type = demoEvent.getType(); List<Map> msg = demoEvent.getMsg(); try { Thread.sleep(2000); }catch (Exception e) { e.printStackTrace(); } log.info("listener1:类型" + type + ",消息内容:" + msg); } }
可是咱们须要知道的是,多个消息监听是同步执行的,他们会发生阻塞,因此咱们须要进行异步监听,实现异步监听只须要在方法上打上@Async标签,同时在Springboot主程序中开启容许异步测试
@EnableAsync @SpringBootApplication public class LanmdaApplication { public static void main(String[] args) { SpringApplication.run(LanmdaApplication.class, args); } }
最后写一个测试的Controllerthis
@Slf4j @RestController public class TestController { @Autowired private DemoPublisher publisher; @GetMapping("/test") public String testListener() { List<Map> list = new ArrayList<>(); Map<String,String> m1 = new HashMap<>(); m1.put("1","2"); Map<String,String> m2 = new HashMap<>(); m2.put("3","4"); Map<String,String> m3 = new HashMap<>(); m3.put("5","6"); list.add(m1); list.add(m2); list.add(m3); log.info("开始发布消息:" + Time.getTime()); publisher.publish(new DemoEvent(this,"测试消息",list)); log.info("消息发布结束:" + Time.getTime()); return "消息发布成功"; } }
执行后,日志以下spa
2019-07-21 10:42:39.686 INFO 1756 --- [nio-8080-exec-1] c.g.lanmda.controller.TestController : 开始发布消息:10:42:39
2019-07-21 10:42:39.687 INFO 1756 --- [nio-8080-exec-1] com.guanjian.lanmda.event.DemoListener1 : listener1经过注解接收到了publisher发送的消息,时间10:42:39
2019-07-21 10:42:41.690 INFO 1756 --- [nio-8080-exec-1] com.guanjian.lanmda.event.DemoListener1 : listener1:类型测试消息,消息内容:[{1=2}, {3=4}, {5=6}]
2019-07-21 10:42:41.695 INFO 1756 --- [nio-8080-exec-1] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either
2019-07-21 10:42:41.697 INFO 1756 --- [nio-8080-exec-1] c.g.lanmda.controller.TestController : 消息发布结束:10:42:41
2019-07-21 10:42:41.697 INFO 1756 --- [cTaskExecutor-1] com.guanjian.lanmda.event.DemoListener : 接收到publisher发送到消息,时间10:42:41
2019-07-21 10:42:44.701 INFO 1756 --- [cTaskExecutor-1] com.guanjian.lanmda.event.DemoListener : 类型测试消息,消息内容:[{1=2}, {3=4}, {5=6}].net