事件总线这个概念对你来讲可能很陌生,但提到观察者(发布-订阅)模式,你也许就很熟悉。事件总线是对发布-订阅模式的一种实现。它是一种集中式事件处理机制,容许不一样的组件之间进行彼此通讯而又不须要相互依赖,达到一种解耦的目的。
从上图可知,核心就4个角色:html
实现事件总线的关键是:算法
以上源于我在事件总线知多少(1)中对于EventBus的分析和简单总结。基于以上的简单认知,咱们来梳理下eShopOnContainers中EventBus的实现机制·。数据库
咱们直接以上帝视角,来看下其实现机制,上类图。
json
咱们知道事件的本质是:事件源+事件处理。
针对事件源,其定义了IntegrationEvent
基类来处理。默认仅包含一个guid和一个建立日期,具体的事件能够经过继承该类,来完善事件的描述信息。服务器
这里有必要解释下Integration Event(集成事件)。由于在微服务中事件的消费再也不局限于当前领域内,而是多个微服务可能共享同一个事件,因此这里要和DDD中的领域事件区分开来。集成事件可用于跨多个微服务或外部系统同步领域状态,这是经过在微服务以外发布集成事件来实现的。app
针对事件处理,其本质是对事件的反应,一个事件可引发多个反应,因此,它们之间是一对多的关系。
eShopOnContainers中抽象了两个事件处理的接口:异步
两者都定义了一个Handle
方法用于响应事件。不一样之处在于方法参数的类型:
第一个接受的是一个强类型的IntegrationEvent
。第二个接收的是一个动态类型dynamic
。
为何要单独提供一个事件源为dynamic
类型的接口呢?
不是每个事件源都须要详细的事件信息,因此一个强类型的参数约束就没有必要,经过dynamic
能够简化事件源的构建,更趋于灵活。async
有了事件源和事件处理,接下来就是事件的注册和订阅了。为了方便进行订阅管理,系统提供了额外的一层抽象IEventBusSubscriptionsManager
,其用于维护事件的订阅和注销,以及订阅信息的持久化。其默认的实现InMemoryEventBusSubscriptionsManager
就是使用内存进行存储事件源和事件处理的映射字典。
从类图中看InMemoryEventBusSubscriptionsManager
中定义了一个内部类SubscriptionInfo
,其主要用于表示事件订阅方的订阅类型和事件处理的类型。分布式
咱们来近距离看下InMemoryEventBusSubscriptionsManager
的定义:函数
//InMemoryEventBusSubscriptionsManager.cs //定义的事件名称和事件订阅的字典映射(1:N) private readonly Dictionary<string, List<SubscriptionInfo>> _handlers; //保存全部的事件处理类型 private readonly List<Type> _eventTypes; //定义事件移除后事件 public event EventHandler<string> OnEventRemoved; //构造函数初始化 public InMemoryEventBusSubscriptionsManager() { _handlers = new Dictionary<string, List<SubscriptionInfo>>(); _eventTypes = new List<Type>(); } //添加动态类型事件订阅(须要手动指定事件名称) public void AddDynamicSubscription<TH>(string eventName) where TH : IDynamicIntegrationEventHandler { DoAddSubscription(typeof(TH), eventName, isDynamic: true); } //添增强类型事件订阅(事件名称为事件源类型) public void AddSubscription<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T> { var eventName = GetEventKey<T>(); DoAddSubscription(typeof(TH), eventName, isDynamic: false); if (!_eventTypes.Contains(typeof(T))) { _eventTypes.Add(typeof(T)); } } //移除动态类型事件订阅 public void RemoveDynamicSubscription<TH>(string eventName) where TH : IDynamicIntegrationEventHandler { var handlerToRemove = FindDynamicSubscriptionToRemove<TH>(eventName); DoRemoveHandler(eventName, handlerToRemove); } //移除强类型事件订阅 public void RemoveSubscription<T, TH>() where TH : IIntegrationEventHandler<T> where T : IntegrationEvent { var handlerToRemove = FindSubscriptionToRemove<T, TH>(); var eventName = GetEventKey<T>(); DoRemoveHandler(eventName, handlerToRemove); }
添加了这么一层抽象,即符合了单一职责原则,又完成了代码重用。IEventBus
的具体实现经过注入对IEventBusSubscriptionsManager
的依赖,便可完成订阅管理。
你这里可能会好奇,为何要暴露一个OnEventRemoved
事件?这里先按住不表,留给你们思考。
微服务的一大特色就是分布式。若须要作到动一发而牵全身,就须要一个持久化的集中式的EventBus。这就要求各个微服务内部虽然分别持有一个对EventBus的引用,但它们背后都必须链接着同一个用于持久化的数据源。
那你可能会说:那这个很好实现,使用同一个数据库就行了。为何非要用个什么RabbitMQ?问的好!这就要去探讨下RabbitMQ是为了解决什么问题了。
RabbitMQ提供了可靠的消息机制、跟踪机制和灵活的消息路由,支持消息集群和分布式部署。适用于排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、CQRS等应用场景。
而关于RabbitMQ的具体使用,这里再也不展开,可参考RabbitMQ知多少。
集成RabbitMQ的关键在于理解其对消息的处理机制:
基于以上的认知,咱们再与EventBusRabbitMQ
源码亲密接触。
public class EventBusRabbitMQ : IEventBus, IDisposable { const string BROKER_NAME = "eshop_event_bus"; private readonly IRabbitMQPersistentConnection _persistentConnection; private readonly ILogger<EventBusRabbitMQ> _logger; private readonly IEventBusSubscriptionsManager _subsManager; private readonly ILifetimeScope _autofac; private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus"; private readonly int _retryCount; private IModel _consumerChannel; private string _queueName; public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger, ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null, int retryCount = 5) { _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); _queueName = queueName; _consumerChannel = CreateConsumerChannel(); _autofac = autofac; _retryCount = retryCount; _subsManager.OnEventRemoved += SubsManager_OnEventRemoved; } private void SubsManager_OnEventRemoved(object sender, string eventName) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } using (var channel = _persistentConnection.CreateModel()) { channel.QueueUnbind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName); if (_subsManager.IsEmpty) { _queueName = string.Empty; _consumerChannel.Close(); } } } //.... }
构造函数主要作了如下几件事:
IRabbitMQPersistentConnection
以便链接到对应的Broke。IEventBusSubscriptionsManager
,进行订阅管理。OnEventRemoved
事件,取消队列的绑定。(这也就回答了上面遗留的问题)private void DoInternalSubscription(string eventName) { var containsKey = _subsManager.HasSubscriptionsForEvent(eventName); if (!containsKey) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } using (var channel = _persistentConnection.CreateModel()) { channel.QueueBind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName); } } }
从上面咱们能够看到事件的订阅主要是进行rabbitmq队列的绑定。以eventName为routingKey进行路由。
public void Publish(IntegrationEvent @event) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } var policy = RetryPolicy.Handle<BrokerUnreachableException>() .Or<SocketException>() .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => { _logger.LogWarning(ex.ToString()); }); using (var channel = _persistentConnection.CreateModel()) { var eventName = @event.GetType() .Name; channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); var message = JsonConvert.SerializeObject(@event); var body = Encoding.UTF8.GetBytes(message); policy.Execute(() => { var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; // persistent channel.BasicPublish(exchange: BROKER_NAME, routingKey: eventName, mandatory:true, basicProperties: properties, body: body); }); } }
这里面有如下几个知识点:
DeliveryMode = 2
进行消息持久化mandatory: true
告知服务器当根据指定的routingKey和消息找不到对应的队列时,直接返回消息给生产者。private IModel CreateConsumerChannel() { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } var channel = _persistentConnection.CreateModel(); channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct"); channel.QueueDeclare(queue: _queueName, durable: true, exclusive: false,autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += async (model, ea) => { var eventName = ea.RoutingKey; var message = Encoding.UTF8.GetString(ea.Body); await ProcessEvent(eventName, message); channel.BasicAck(ea.DeliveryTag, multiple:false); }; channel.BasicConsume(queue: _queueName, autoAck: false, consumer: consumer); channel.CallbackException += (sender, ea) => { _consumerChannel.Dispose(); _consumerChannel = CreateConsumerChannel(); }; return channel; }
以上代码演示了如建立消费信道进行消息处理的步骤:
Received
事件委托处理消息接收事件channel.BasicConsume
启动监听private async Task ProcessEvent(string eventName, string message) { if (_subsManager.HasSubscriptionsForEvent(eventName)) { using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) { var subscriptions = _subsManager.GetHandlersForEvent(eventName); foreach (var subscription in subscriptions) { if (subscription.IsDynamic) { var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; dynamic eventData = JObject.Parse(message); await handler.Handle(eventData); } else { var eventType = _subsManager.GetEventTypeByName(eventName); var integrationEvent = JsonConvert.DeserializeObject(message, eventType); var handler = scope.ResolveOptional(subscription.HandlerType); var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); } } } } }
以上代码主要包括如下知识点:
以上介绍了EventBus的实现要点,那各个微服务是如何集成呢?
1. 注册IRabbitMQPersistentConnection
服务用于设置RabbitMQ链接
services.AddSingleton<IRabbitMQPersistentConnection>(sp => { var logger = sp.GetRequiredService<ILogger<DefaultRabbitMQPersistentConnection>>(); //... return new DefaultRabbitMQPersistentConnection(factory, logger, retryCount); });
2. 注册单例模式的IEventBusSubscriptionsManager
用于订阅管理
services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
3. 注册单例模式的EventBusRabbitMQ
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp => { var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>(); var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>(); var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>(); var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>(); var retryCount = 5; if (!string.IsNullOrEmpty(Configuration["EventBusRetryCount"])) { retryCount = int.Parse(Configuration["EventBusRetryCount"]); } return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount); });
完成了以上集成,就能够在代码中使用事件总线,进行事件的发布和订阅。
4. 发布事件
若要发布事件,须要根据是否须要事件源(参数传递)来决定是否须要申明相应的集成事件,须要则继承自IntegrationEvent
进行申明。而后在须要发布事件的地方进行实例化,并经过调用IEventBus
的实例的Publish
方法进行发布。
//事件源的声明 public class ProductPriceChangedIntegrationEvent : IntegrationEvent { public int ProductId { get; private set; } public decimal NewPrice { get; private set; } public decimal OldPrice { get; private set; } public ProductPriceChangedIntegrationEvent(int productId, decimal newPrice, decimal oldPrice) { ProductId = productId; NewPrice = newPrice; OldPrice = oldPrice; } }
//声明事件源 var priceChangedEvent = new ProductPriceChangedIntegrationEvent(1001, 200.00, 169.00) //发布事件 _eventBus.Publish(priceChangedEvent)
5. 订阅事件
若要订阅事件,须要根据须要处理的事件类型,申明对应的事件处理类,继承自IIntegrationEventHandler
或IDynamicIntegrationEventHandler
,并注册到IOC容器。而后建立IEventBus
的实例调用Subscribe
方法进行显式订阅。
//定义事件处理 public class ProductPriceChangedIntegrationEventHandler : IIntegrationEventHandler<ProductPriceChangedIntegrationEvent> { public async Task Handle(ProductPriceChangedIntegrationEvent @event) { //do something } }
//事件订阅 var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>(); eventBus.Subscribe<ProductPriceChangedIntegrationEvent, ProductPriceChangedIntegrationEventHandler>();
6. 跨服务事件消费
在微服务中跨服务事件消费很广泛,这里有一点须要说明的是若是订阅的强类型事件非当前微服务中订阅的事件,须要复制定义订阅的事件类型。换句话说,好比在A服务发布的TestEvent
事件,B服务订阅该事件,一样须要在B服务复制定义一个TestEvent
。
这也是微服务的一个通病,重复代码。
经过一步一步的源码梳理,咱们发现eShopOnContainers中事件总线的整体实现思路与引言部分的介绍十分契合。因此对于事件总线,不要以为高深,明确参与的几个角色以及基本的实现步骤,那么不论是基于RabbitMQ实现也好仍是基于Azure Service Bus也好,万变不离其宗!