最近在看微软eShopOnContainers 项目,看到事件总线以为不错,和你们分享一下异步
发布订阅模式可让应用程序组件之间解耦,这是咱们使用这种模式最重要的理由之一,若是你彻底不知道这个东西,建议你先经过搜索引擎了解一下这种模式,网上的资料不少这里就再也不赘述了。async
eShop中的EventBus就是基于这种模式的发布/订阅。
发布订阅模式核心概念有三个:发布者、订阅者、调度中心,这些概念在消息队列中就是生产者、消费者、MQ实例。ide
在eShop中有两个EventBus的实现:函数
EventBusRabbitMQ
EventBusServiceBus
。IEventBus
开始先来看一看,全部EventBus的接口IEventBus
ui
public interface IEventBus { void Publish(IntegrationEvent @event); void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T>; void SubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler; void UnsubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler; void Unsubscribe<T, TH>() where TH : IIntegrationEventHandler<T> where T : IntegrationEvent; }
嗯,乍一看看是有点眼晕的,仔细看它的核心功能只有三个:搜索引擎
这对应着发布订阅模式的基本概念,不过对于事件总线的接口添加了许多约束:线程
IntegrationEvent
及其子类IIntegrationEventHandler
的实现类Ok,看到这里先不要管Dynamic
相关的方法,而后记住这个两个关键点:设计
IntegrationEvent
IIntegrationEventHandler<T>
且T
是IntegrationEvent
子类另外,看下 IntegrationEvent
有什么日志
public class IntegrationEvent { public IntegrationEvent() { Id = Guid.NewGuid(); CreationDate = DateTime.UtcNow; } public Guid Id { get; } public DateTime CreationDate { get; } }
public interface IEventBusSubscriptionsManager { bool IsEmpty { get; } event EventHandler<string> OnEventRemoved; void AddDynamicSubscription<TH>(string eventName) where TH : IDynamicIntegrationEventHandler; void AddSubscription<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T>; void RemoveSubscription<T, TH>() where TH : IIntegrationEventHandler<T> where T : IntegrationEvent; void RemoveDynamicSubscription<TH>(string eventName) where TH : IDynamicIntegrationEventHandler; bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent; bool HasSubscriptionsForEvent(string eventName); Type GetEventTypeByName(string eventName); void Clear(); IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent; IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName); string GetEventKey<T>(); }
这个接口看起来稍显复杂些,咱们来简化下看看:code
public interface IEventBusSubscriptionsManager { void AddSubscription<T, TH>() void RemoveSubscription<T, TH>() IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() }
最终,这三个方法就是咱们要关注的,添加订阅、移除订阅、获取指定事件的订阅信息。
SubscriptionInfo
是什么?public bool IsDynamic { get; } public Type HandlerType{ get; }
SubscriptionInfo
中只有两个信息,这是否是一个Dynamic类型的Event以及这个Event所对应的处理器的类型。
这是你可能会有另外一个疑问:
IEventBus
有什么关系?IEventBusSubscriptionsManager
含有更多功能:查看是否有订阅,获取事件的Type,获取事件的处理器等等IEventBusSubscriptionsManager
由IEventBus
使用,在RabbitMq和ServiceBus的实现中,都使用Manager去存储事件的信息,例以下面的代码:
public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { // 查询事件的全名 var eventName = _subsManager.GetEventKey<T>(); //向mq添加注册 DoInternalSubscription(eventName); // 向manager添加订阅 _subsManager.AddSubscription<T, TH>(); } private void DoInternalSubscription(string eventName) { var containsKey = _subsManager.HasSubscriptionsForEvent(eventName); if (!containsKey) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } using (var channel = _persistentConnection.CreateModel()) { channel.QueueBind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName); } } }
查询事件的名字是manager作的,订阅的时候是先向mq添加订阅,以后又加到manager中,manager管理着订阅的基本信息。
另一个重要功能是获取事件的处理器信息,在rabbit mq的实现中,ProcessEvent方法中用manager获取了事件的处理器,再用依赖注入得到处理器的实例,反射调用Handle
方法处理事件信息:
private async Task ProcessEvent(string eventName, string message) { // 从manager查询信息 if (_subsManager.HasSubscriptionsForEvent(eventName)) { using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) { // 从manager获取处理器 var subscriptions = _subsManager.GetHandlersForEvent(eventName); foreach (var subscription in subscriptions) { // Di + 反射调用,处理事件(两个都是,只是针对是不是dynamic作了不一样的处理) if (subscription.IsDynamic) { var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; dynamic eventData = JObject.Parse(message); await handler.Handle(eventData); } else { var eventType = _subsManager.GetEventTypeByName(eventName); var integrationEvent = JsonConvert.DeserializeObject(message, eventType); var handler = scope.ResolveOptional(subscription.HandlerType); var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); } } } } }
在eShop中只有一个实现就是InMemoryEventBusSubscriptionsManager
类
这个类中有两个重要的字段
private readonly Dictionary<string, List<SubscriptionInfo>> _handlers; private readonly List<Type> _eventTypes;
他们分别存储了事件列表和事件处理器信息词典
接下来就是实现一个
了
咱们要作什么呢?IEventBusSubscriptionsManager 已经有了InMemory的实现了,咱们能够直接拿来用,因此咱们只须要本身实现一个EventBus就行了
先贴出最终代码:
public class InMemoryEventBus : IEventBus { private readonly IServiceProvider _provider; private readonly ILogger<InMemoryEventBus> _logger; private readonly ISubscriptionsManager _manager; private readonly IList<IntegrationEvent> _events; public InMemoryEventBus( IServiceProvider provider, ILogger<InMemoryEventBus> logger, ISubscriptionsManager manager) { _provider = provider; _logger = logger; _manager = manager; } public void Publish(IntegrationEvent e) { var eventType = e.GetType(); var handlers = _manager.GetHandlersForEvent(eventType.FullName); foreach (var handlerInfo in handlers) { var handler = _provider.GetService(handlerInfo.HandlerType); var method = handlerInfo.HandlerType.GetMethod("Handle"); method.Invoke(handler, new object[] { e }); } } public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { _manager.AddSubscription<T, TH>(); } public void SubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler { throw new NotImplementedException(); } public void Unsubscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { _manager.RemoveSubscription<T, TH>(); } public void UnsubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler { throw new NotImplementedException(); } }
首先构造函数中声明咱们要使用的东西:
public InMemoryEventBus( IServiceProvider provider, ILogger<InMemoryEventBus> logger, ISubscriptionsManager manager) { _provider = provider; _logger = logger; _manager = manager; }
这里要注意的就是IServiceProvider provider
这是 DI容器,当咱们在切实处理事件的时候咱们选择从DI获取处理器的实例,而不是反射建立,这要作的好处在于,处理器能够依赖于其它东西,而且能够是单例的
public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { _manager.AddSubscription<T, TH>(); } public void Unsubscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { _manager.RemoveSubscription<T, TH>(); }
订阅和取消订阅很简单,由于咱们是InMemory的因此只调用了manager的方法。
接下来就是最重要的Publish方法,实现Publish有两种方式:
为了简单起见,咱们先写个简单易懂的同步的
public void Publish(IntegrationEvent e) { // 首先要拿到集成事件的Type信息 var eventType = e.GetType(); // 获取属于这个事件的处理器列表,可能有不少,注意得到的是SubscriptionInfo var handlers = _manager.GetHandlersForEvent(eventType.FullName); // 不解释循环 foreach (var handlerInfo in handlers) { // 从DI中获取类型的实例 var handler = _provider.GetService(handlerInfo.HandlerType); // 拿到Handle方法 var method = handlerInfo.HandlerType.GetMethod("Handle"); // 调用方法 method.Invoke(handler, new object[] { e }); } }
OK,咱们的InMemoryEventBus就写好了!
要实践这个InMemoryEventBus,那么还须要一个IntegrationEvent
的子类,和一个IIntegrationEventHandler<T>
的实现类,这些都不难,例如咱们作一个添加用户的事件,A在添加用户后,发起一个事件并将新用户的名字做为事件数据,B去订阅事件,并在本身的处理器中处理名字信息。
思路是这样的:
AddUserEvent:IntegrationEvent
,里面有一个UserId和一个UserName
。AddUserEventHandler:IIntegrationEventHandler<AddUserEvent>
,在Handle
方法中输出UserId和Name到日志。注册DI,你要注册下面这些服务:
IEventBus=>InMemoryEventBus ISubscriptionsManager=>InMemorySubscriptionsManager AddUserEventHandler=>AddUserEventHandler
写一个Api接口或是什么,调用IEventBus的Publish方法,new 一个新的AddUserEvent
做为参数传进去。
OK!到这里一个切实可用的InMemoryEventBus就可使用了。