若是熟悉C#语言的小伙伴们通常都会知道委托、事件的好处,只需在某个类中提早定义好公开的委托或事件(委托的特殊表现形式)变量,而后在其它类中就能够很随意的订阅该委托或事件,当委托或事件被触发执行时,会自动通知全部的订阅者进行消费处理。(观察者模式用委托来实现是最好不过了,DDD所提倡的事件驱动其根本理念也是如此),固然我这里想到的是不须要在每一个类中进行定义委托或事件,而是由一个统一的中介者(即EventPublishSubscribeUtils)来提供事件的订阅及发布操做,这样各模块之间无需直接依赖,只需经过中介者完成发布通知与订阅回调便可,何乐而不为呢?java
这里我先借助C#语言独有的委托类型快速实现了一个简易的EventPublishSubscribeUtils,代码以下:json
/// <summary> /// 自定义事件发布订阅回调工具类(业务解藕、关注点分离,避免互相依赖)--演示版 /// EventBus简化版,观察者模式 /// author:zuowenjun /// </summary> public static class EventPublishSubscribeUtils { private static ConcurrentDictionary<Type, EventHandler<object>> EventHandlers { get; } = new ConcurrentDictionary<Type, EventHandler<object>>(); private static void removeRegisters(ref EventHandler<object> srcEvents, EventHandler<object> removeTargetEvents) { var evtTypes = removeTargetEvents.GetInvocationList().Select(d => d.GetType()); var registeredEventHandlers = Delegate.Combine(srcEvents.GetInvocationList().Where(ei => evtTypes.Contains(ei.GetType())).ToArray()); srcEvents -= (EventHandler<object>)registeredEventHandlers; } public static void Register<T>(EventHandler<object> eventHandlers) { EventHandlers.AddOrUpdate(typeof(T), eventHandlers, (t, e) => { //先根据订阅委托类型匹匹配过滤掉以前已有的相同订阅,而后再从新订阅,防止重复订阅,屡次执行的状况。 removeRegisters(ref e, eventHandlers); e += eventHandlers; return e; }); } public static void UnRegister<T>(EventHandler<object> eventHandlers = null) { Type eventMsgType = typeof(T); if (eventHandlers == null) { EventHandlers.TryRemove(eventMsgType, out eventHandlers); return; } var e = EventHandlers[eventMsgType]; removeRegisters(ref e, eventHandlers); } public static void PublishEvent<T>(T eventMsg, object sender) { Type eventMsgType = eventMsg.GetType(); if (EventHandlers.ContainsKey(eventMsgType)) { EventHandlers[eventMsgType].Invoke(sender, eventMsg); } } }
而后使用就比较简单了,咱们只需经过EventPublishSubscribeUtils.Register注册订阅事件消息,经过EventPublishSubscribeUtils.PublishEvent发布事件通知,这样就可让两个甚至多个不相关的模块(类)可以经过消息类型实现1对多的通信与协同处理。使用示例代码以下:c#
class EventMessage { public string Name { get; set; } public string Msg { get; set; } public DateTime CreatedDate { get; set; } } class DemoA { public DemoA() { EventHandler<object> eventHandlers = EventCallback1; eventHandlers += EventCallback2; EventPublishSubscribeUtils.Register<EventMessage>(eventHandlers); } private void EventCallback1(object sender, object e) { string json = JsonConvert.SerializeObject(e); System.Diagnostics.Debug.WriteLine($"EventCallback1=> sender:{sender},e:{json}"); } private void EventCallback2(object sender, object e) { string json = JsonConvert.SerializeObject(e); System.Diagnostics.Debug.WriteLine($"EventCallback2=> sender:{sender},e:{json}"); } } class DemoB { public void ShowMsg(string name, string msg) { System.Diagnostics.Debug.WriteLine($"ShowMsg=> name:{name},msg:{msg}"); var eventMsg = new EventMessage { Name = name, Msg = msg, CreatedDate = DateTime.Now }; EventPublishSubscribeUtils.PublishEvent(eventMsg, nameof(DemoB.ShowMsg)); } } //main方法中使用: var demoA = new DemoA(); var demoB = new DemoB(); demoB.ShowMsg("梦在旅途", "i love csharp and java!");
从上述示例代码中能够看出,DemoA与DemoB各为独立,互不依赖,它们都不知道有对方的存在,它们只关心业务的处理,经过执行demoB.ShowMsg方法进而触发回调demoA.EventCallback1,demoA.EventCallback2方法,是否是比起直接从DemoA中调DemoB更好呢?app
c#有委托类型(方法的引用),那若是是在java中该如何实现呢?异步
其实同理,咱们能够借助匿名内部类+匿名实现类的方式(如:函数式接口)实现与C#殊途同归的效果,一样能够实现相似的事件发布与订阅功能,以下即是采用java语言的实现EventPublishSubscribeUtils类的代码:async
这个因项目须要,我特地实现了两种模式,一种支持1对多的普通方式,另外一种支持1对1的订阅回调方式,有返回值。函数
/** * 自定义事件发布订阅回调工具类(业务解藕、关注点分离,避免互相依赖) * EventBus简化版,观察者模式 * <pre> * 支持两种模式 * 1.无返回值:订阅事件消费(register)+ 发布事件消息(publishEvent/publishEventAsync) * 2.有返回值:监听回调通知处理(listenCallback)+通知回调(notifyCallback),经过notifyMessageType+MessageChannel 便可标识惟一的一组通知回调与监听回调处理 * <pre> * @author zuowenjun * @date 20200310 */ public final class EventPublishSubscribeUtils { private static final Logger LOGGER = LoggerFactory.getLogger(EventPublishSubscribeUtils.class); private static final Map<Class<?>, LinkedList<Consumer<Object>>> eventConsumers = new ConcurrentHashMap<>(); private static final Map<Class<?>, ConcurrentHashMap<MessageChannel, Function<Object, Object>>> callbackFuncs = new ConcurrentHashMap<>(); private EventPublishSubscribeUtils() { } /** * 注册事件回调消费者 * 用法:EventSubscribeConsumeUtils.register(this::xxxx方法) 或lambda表达式 * 注意:若回调方法添加了事务注解,则应指派其代理对象的方法来完成回调,如: * EventSubscribeConsumeUtils.register((xxxService)SpringUtils.getBean(this.class)::xxxx方法) * * @param eventConsumer */ public static void register(Class<?> eventMessageType, Consumer<Object> eventConsumer) { if (eventConsumer == null) { return; } LinkedList<Consumer<Object>> eventConsumerItems = null; if (!eventConsumers.containsKey(eventMessageType)) { eventConsumers.putIfAbsent(eventMessageType, new LinkedList<>()); } eventConsumerItems = eventConsumers.get(eventMessageType); eventConsumerItems.add(eventConsumer); } /** * 取消订阅回调 * * @param eventMessageType * @param eventConsumer */ public static void unRegister(Class<?> eventMessageType, Consumer<Object> eventConsumer) { if (!eventConsumers.containsKey(eventMessageType)) { return; } LinkedList<Consumer<Object>> eventConsumerItems = eventConsumers.get(eventMessageType); int eventConsumerIndex = eventConsumerItems.indexOf(eventConsumer); if (eventConsumerIndex == -1) { return; } eventConsumerItems.remove(eventConsumerIndex); } /** * 发布事件,同步触发执行回调事件消费者方法(存在阻塞等待),即事件消息生产者 * 用法:在须要触发事件消息回调时调用,如:publishEvent(eventMessage); * * @param eventMessage */ public static <T> void publishEvent(T eventMessage) { Class<?> eventMessageType = eventMessage.getClass(); if (!eventConsumers.containsKey(eventMessageType)) { return; } LOGGER.info("事件已发布,正在执行通知消费:{}", JSONObject.toJSONString(eventMessage)); for (Consumer<Object> eventConsumer : eventConsumers.get(eventMessageType)) { try { eventConsumer.accept(eventMessage); } catch (Exception ex) { LOGGER.error("eventConsumer.accept error:{},eventMessageType:{},eventMessage:{}", ex, eventMessageType, JSONObject.toJSONString(eventMessage)); } } } /** * 发布事件,异步触发执行回调事件消费者方法(异步非阻塞),即事件消息生产者 * 用法:在须要触发事件消息回调时调用,如:publishEventAsync(eventMessage); * * @param eventMessage */ public static <T> void publishEventAsync(final T eventMessage) { Executor asyncTaskExecutor = (Executor) SpringUtils.getBean("asyncTaskExecutor"); asyncTaskExecutor.execute(() -> { publishEvent(eventMessage); }); } /** * 监听回调处理(须要有返回值),即有返回值的回调消费者 * * @param notifyMessageType * @param messageChannel * @param callbackFunc */ public static void listenCallback(Class<?> notifyMessageType, MessageChannel messageChannel, Function<Object, Object> callbackFunc) { if (!callbackFuncs.containsKey(notifyMessageType)) { callbackFuncs.putIfAbsent(notifyMessageType, new ConcurrentHashMap<>()); } Map<MessageChannel, Function<Object, Object>> functionMap = callbackFuncs.get(notifyMessageType); if (!functionMap.containsKey(messageChannel)) { functionMap.putIfAbsent(messageChannel, callbackFunc); } else { LOGGER.error("该通知消息类型:{}+消息通道:{},已被订阅监听,重复订阅监听无效!", notifyMessageType.getSimpleName(), messageChannel.getDescription()); } } /** * 通知回调(同步等待获取监听回调的处理结果),即生产者 * * @param notifyMessage * @param messageChannel * @param <R> * @return */ @SuppressWarnings("unchecked") public static <R> R notifyCallback(Object notifyMessage, MessageChannel messageChannel) { Class<?> notifyMessageType = notifyMessage.getClass(); Map<MessageChannel, Function<Object, Object>> functionMap = callbackFuncs.getOrDefault(notifyMessageType, null); if (functionMap != null) { Function<Object, Object> callbackFunction = functionMap.getOrDefault(messageChannel, null); if (callbackFunction != null) { LOGGER.info("通知回调消息已发布,正在执行回调处理:{},messageChannel:[{}]", JSONObject.toJSONString(notifyMessage), messageChannel.getDescription()); Object result = callbackFunction.apply(notifyMessage); try { return (R) result; } catch (ClassCastException castEx) { throw new ClassCastException(String.format("监听回调处理后返回值实际类型与发布通知回调待接收的值预期类型不一致,致使类型转换失败:%s," + "请确保notifyCallback与listenCallback针对通知消息类型:%s+消息通道:%s返回值类型必需一致。", castEx.getMessage(), notifyMessageType.getSimpleName(), messageChannel.getDescription())); } } } return null; } }
固然若是须要实现1对1的通信,除了指定消息类型外,还须要指定消息通信通道(即:惟一标识),目的是能够实现同一种消息类型,支持不一样的点对点的处理。工具
/** * 自定义消息通道 * 做用:用于识别同一个消息类型下不一样的监听回调者(notifyMessage+messageChannel 便可标识惟一的一组通知回调[生产者]与监听回调[消费者]) * @author zuowenjun * @date 2020-03-31 */ public enum MessageChannel { None("无效"), MSG_A("测试消息A"), ; private String description; MessageChannel(String description) { this.description=description; } public String getDescription() { return description; } }
使用方法示例代码以下:测试
@Service public class DemoAService { private static final Logger LOGGER = LoggerFactory.getLogger(DemoAService.class); public void showMsg(String name, String msg) { System.out.printf("【%1$tF %1$tT.%1$tL】hello!%s,DemoAService showMsg:%s %n", new Date(), name, msg); EventMessage eventMessage = new EventMessage(); eventMessage.setName("aaa"); eventMessage.setMsg("test"); eventMessage.setCreatedDate(new Date()); EventPublishSubscribeUtils.publishEvent(eventMessage); String msgJsonStr = EventPublishSubscribeUtils.notifyCallback(eventMessage, MessageChannel.MSG_A); System.out.printf("【%1$tF %1$tT.%1$tL】DemoAService showMsg notifyCallback json result:%2$s %n", new Date(), msgJsonStr); } } @Service public class DemoBService { @Autowired private DemoAService demoAService; @PostConstruct public void init(){ //订阅消费,无返回值,支持1对多,即:同一个消息类型可同时被多个消费者订阅 EventPublishSubscribeUtils.register(EventMessage.class,this::showFinishedMsg); //订阅监听回调,有返回值,只能1对1 EventPublishSubscribeUtils.listenCallback(EventMessage.class, MessageChannel.MSG_A,this::getMsgCallbak); } private void showFinishedMsg(Object eventMsg){ EventMessage eventMessage=(EventMessage)eventMsg; System.out.printf("【%1$tF %1$tT.%1$tL】%s,receive msg:%s doing...%n", eventMessage.getCreatedDate(),eventMessage.getName(),eventMessage.getMsg()); //模拟逻辑处理 try { Thread.sleep(500L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("【%1$tF %1$tT.%1$tL】%s,do finished!!!%n",new Date(),eventMessage.getName()); } private String getMsgCallbak(Object eventMsg){ EventMessage eventMessage=(EventMessage)eventMsg; eventMessage.setMsg(eventMessage.getMsg()+"--callback added!"); eventMessage.setCreatedDate(new Date()); System.out.printf("【%1$tF %1$tT.%1$tL】%s,do msg callback!!!%n",new Date(),eventMessage.getName()); return JSONObject.toJSONString(eventMessage); } }
如上代码所示,咱们借助于EventPublishSubscribeUtils,解耦了两个Service Bean之间的依赖,避免了循环依赖的问题,去掉了以前为了解决循环依赖而使用@Lazy注解的方式,更易于扩展与更改。其实Spring底层也使用了相似的Event机制,说明这种方式仍是有合适的用武之地的。this
这里我经过简单的关系图来对比未引用EventPublishSubscribeUtils前与引用后的区别,你们能够感觉一下哪一种更方便:
以前:
以后:
最后,关于业务解耦,分清业务边界,我我的认为跨进程通信使用MQ,同进程跨多模块(类,或者说跨多业务边界)可以使用Event事件驱动思路来解决。你们以为如何呢?若是有更好的方案欢迎评论交流,谢谢。