1、问题背景html
最近离职来到了一家新的公司,原先是在乙方工做,这回到了甲方,在这一个月中,发现目前的业务很大一部分是靠轮询实现的,例如:经过轮询判断数据处于B状态了,则轮询到数据后执行某种动做,这个实际上是很是浪费的,而且对于数据的实时性也会不怎么友好,基于以上的状况,在某天开车堵车时候,想到了以前偶然了解过的事件总线(EventBus),对比了公司当前的场景后,以为事件总线应该是能够知足需求的(PS:只是我以为这个有问题,不少人不以为有问题),那既然想到了,那就想本身是否能够作个事件总线的轮子git
2、什么是事件总线数据库
咱们知道事件是由一个Publisher跟一个或多个的Subsriber组成,可是在实际的使用过程当中,咱们会发现,Subsriber必须知道Publisher是谁才能够注册事件,进而达到目的,那这其实就是一种耦合,为了解决这个问题,就出现了事件总线的模式,事件总线容许不一样的模块之间进行彼此通讯而又不须要相互依赖,以下图所示,经过EventBus,让Publisher以及Subsriber都只须要对事件源(EventData)进行关注,不用管Publisher是谁,那么EventBus主要是作了一些什么事呢?异步
3、EventBus作了什么事?async
一、EventBus实现了对于事件的注册以及取消注册的管理函数
二、EventBus内部维护了一份事件源与事件处理程序的对应关系,而且经过这个对应关系在事件发布的时候能够找到对应的处理程序去执行微服务
三、EventBus应该要支持默认就注册事件源与处理程序的关系,而不须要开发人员手动去注册(这里可让开发人员去控制自动仍是手动)测试
4、具体实现思路spa
首先在事件总线中,存在注册、取消注册以及触发事件这三种行为,因此咱们能够将这三种行为抽象一个接口出来,最终的接口代码以下:orm
using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; namespace MEventBus.Core { public interface IEventBus { #region 接口注册 void Register<TEventData>(Type handlerType) where TEventData : IEventData; void Register(Type eventType, Type handlerType); void Register(string eventType, Type handlerType); #endregion #region 接口取消注册 void Unregister<TEventData>(Type handler) where TEventData : IEventData; void Unregister(Type eventType, Type handlerType); void Unregister(string eventType, Type handlerType); #endregion void Trigger(string pubKey, IEventData eventData); Task TriggerAsync(string pubKey, IEventData eventData); Task TriggerAsync<TEventData>(TEventData eventData) where TEventData : IEventData; void Trigger<TEventData>(TEventData eventData) where TEventData : IEventData; } }
在以上代码中发现有些方法是有IEventData约束的,这边IEventData就是约束入参行为,原则上规定,每次触发的EventData都须要继承IEventData,而注册的行为也是直接跟入参类型相关,具体代码以下:
using System; using System.Collections.Generic; using System.Text; namespace MEventBus.Core { public interface IEventData { string Id { get; set; } DateTime EventTime { get; set; } object EventSource { get; set; } } }
接下来咱们看下具体的实现代码
using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; using System.Reflection; using System.Text; using System.Threading.Tasks; namespace MEventBus.Core { public class EventBus : IEventBus { private static ConcurrentDictionary<string, List<Type>> dicEvent = new ConcurrentDictionary<string, List<Type>>(); private IResolve _iresolve { get; set; } public EventBus(IResolve resolve) { _iresolve = resolve; InitRegister(); } public void InitRegister() { if (dicEvent.Count > 0) { return; } //_iresolve = ioc_container; dicEvent = new ConcurrentDictionary<string, List<Type>>(); //自动扫描类型而且注册 foreach (var file in Directory.GetFiles(AppDomain.CurrentDomain.BaseDirectory, "*.dll")) { var ass = Assembly.LoadFrom(file); foreach (var item in ass.GetTypes().Where(p => p.GetInterfaces().Contains(typeof(IEventHandler)))) { if (item.IsClass) { foreach (var item1 in item.GetInterfaces()) { foreach (var item2 in item1.GetGenericArguments()) { if (item2.GetInterfaces().Contains(typeof(IEventData))) { Register(item2, item); } } } } } } } //注册以及取消注册的时候须要加锁处理 private static readonly object obj = new object(); #region 注册事件 public void Register<TEventData>(Type handlerType) where TEventData : IEventData { //将数据存储到mapDic var dataType = typeof(TEventData).FullName; Register(dataType, handlerType); } public void Register(Type eventType, Type handlerType) { var dataType = eventType.FullName; Register(dataType, handlerType); } public void Register(string pubKey, Type handlerType) { lock (obj) { //将数据存储到mapDic if (dicEvent.Keys.Contains(pubKey) == false) { dicEvent[pubKey] = new List<Type>(); } if (dicEvent[pubKey].Exists(p => p.GetType() == handlerType) == false) { //IEventHandler obj = Activator.CreateInstance(handlerType) as IEventHandler; dicEvent[pubKey].Add(handlerType); } } } #endregion #region 取消事件注册 public void Unregister<TEventData>(Type handler) where TEventData : IEventData { var dataType = typeof(TEventData); Unregister(dataType, handler); } public void Unregister(Type eventType, Type handlerType) { string _key = eventType.FullName; Unregister(_key, handlerType); } public void Unregister(string eventType, Type handlerType) { lock (obj) { if (dicEvent.Keys.Contains(eventType)) { if (dicEvent[eventType].Exists(p => p.GetType() == handlerType)) { dicEvent[eventType].Remove(dicEvent[eventType].Find(p => p.GetType() == handlerType)); } } } } #endregion #region Trigger触发 //trigger时候须要记录到数据库 public void Trigger<TEventData>(TEventData eventData) where TEventData : IEventData { var dataType = eventData.GetType().FullName; //获取当前的EventData绑定的全部Handler Notify(dataType, eventData); } public void Trigger(string pubKey, IEventData eventData) { //获取当前的EventData绑定的全部Handler Notify(pubKey, eventData); } public async Task TriggerAsync<TEventData>(TEventData eventData) where TEventData : IEventData { await Task.Factory.StartNew(new Action(()=> { var dataType = eventData.GetType().FullName; Notify(dataType, eventData); })); } public async Task TriggerAsync(string pubKey, IEventData eventData) { await Task.Factory.StartNew(new Action(() => { var dataType = eventData.GetType().FullName; Notify(pubKey, eventData); })); } //通知每成功执行一个就须要记录到数据库 private void Notify<TEventData>(string eventType, TEventData eventData) where TEventData : IEventData { //获取当前的EventData绑定的全部Handler var handlerTypes = dicEvent[eventType]; foreach (var handlerType in handlerTypes) { var resolveObj = _iresolve.Resolve(handlerType); IEventHandler<TEventData> handler = resolveObj as IEventHandler<TEventData>; handler.Handle(eventData); } } #endregion } }
代码说明:
一、如上的EventBus是继承了IEventBus后的具体实现,小伙伴可能看到在构造函数里,有一个接口参数IResolve,这个主要是为了将解析的过程进行解耦,因为在一些WebApi的项目中,更加多的是使用IOC的机制进行对象的建立,那基于IResolve就能够实现不一样的对象建立方式(内置的是经过反射实现)
二、InitRegister方法经过遍历当前目录下的dll文件,去寻找全部实现了IEventHandler<IEventData>接口的信息,而且自动注册到EventBus中,因此在实际使用过程当中,应该是没有机会去适用register注册的
三、触发机制实现了同步以及异步的调用,这个从方法命名中就能够看出来
5、程序Demo
TestHandler2(继承IEventHandler)
using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Windows.Forms; using MEventBus.Core; namespace MEventBusHandler.Test { public class TestHandler2 : IEventHandler<TestEventData> { public void Handle(TestEventData eventData) { Thread.Sleep(2000); MessageBox.Show(eventData.EventTime.ToString()); } } }
TestEventData(继承EventData,EventData是继承了IEventData的代码)
using MEventBus.Core; using System; using System.Collections.Generic; using System.Text; namespace MEventBusHandler.Test { public class TestEventData : EventData { } }
调用代码
using MEventBus.Core; using MEventBusHandler.Test; using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Windows.Forms; namespace MEventBus.Test { public partial class Form1 : Form { public Form1() { InitializeComponent(); TestHandler.OnOut += TestHandler_OnOut; } private void TestHandler_OnOut(object sender, EventArgs e) { MessageBox.Show("Hello World"); } private void button1_Click(object sender, EventArgs e) { var task = new MEventBus.Core.EventBus(new ReflectResolve()).TriggerAsync(new TestEventData()); task.ContinueWith((obj) => { MessageBox.Show("事情所有作完"); }); } private void button2_Click(object sender, EventArgs e) { new EventBus(new ReflectResolve()).Trigger(new TestEventData()); } } }
执行结果
我在真正的Demo中,实际上是注册了2个handler,能够在后续公布的项目地址里看到
6、总结
从有这个想法开始,到最终实现这个事件总线,大概总共花了2,3天的时间(PS:晚上回家独自默默干活),目前只能说是有一个初步可使用的版本,而且还存在着一些问题:
一、在.NetFrameWork下(目前公司还不想升级到.NetCore,吐血。。),若是使用AutoFac建立EventBus(单例模式下),若是Handler也使用AutoFac进行建立,会出现要么对象建立失败,要么handler里的对象与调用方的对象不是同一个实例,为了解决这个问题,我让EventBus再也不是单例模式,将dicEvent变成了静态,暂时表面解决
二、未考虑跨进程的实现(感受用savorboard大佬的CAP就能够了)
三、目前这个东西在一个小的新项目里使用,暂时在测试环境还算没啥问题,各位小伙伴若是有相似需求,能够作个参考
因为我的缘由,在测试上可能会有所不够,若是有什么bug的话,还请站内信告知,感谢(ps:文字表达弱鸡,技术渣渣,各位多多包涵)
最后:附上项目地址:https://gitee.com/OneMango/MEventBus
做者: Mango
出处: http://www.cnblogs.com/OMango/
关于本身:专一.Net桌面开发以及Web后台开发,对.NetCore、微服务、DevOps,K8S等感兴趣,最近到了个甲方公司准备休养一段时间
本文版权归做者和博客园共有,欢迎转载,但未经做者赞成必须保留此段声明,且在文章页面明显位置给出原文连接,若有问题, 可站内信告知.