在上文中,咱们讨论了事件处理器中对象生命周期的问题,在进入新的讨论以前,首先让咱们总结一下,咱们已经实现了哪些内容。下面的类图描述了咱们已经实现的组件及其之间的关系,貌似系统已经变得愈来愈复杂了。html
其中绿色的部分就是上文中新实现的部分,包括一个简单的Event Store,一个事件处理器执行上下文的接口,以及一个基于ASP.NET Core依赖注入框架的执行上下文的实现。接下来,咱们打算淘汰PassThroughEventBus,而后基于RabbitMQ实现一套新的事件总线。git
根据前面的结论,事件总线的执行须要依赖于事件处理器执行上下文,也就是上面类图中PassThroughEventBus对于IEventHandlerExecutionContext的引用。更具体些,是在事件总线订阅某种类型的事件时,须要将事件处理器注册到IEventHandlerExecutionContext中。那么在实现RabbitMQ时,也会有着相似的设计需求,即RabbitMQEventBus也须要依赖IEventHandlerExecutionContext接口,以保证事件处理器生命周期的合理性。github
为此,咱们新建一个基类:BaseEventBus,并将这部分公共的代码提取出来,须要注意如下几点:sql
BaseEventBus的代码以下:shell
public abstract class BaseEventBus : IEventBus { protected readonly IEventHandlerExecutionContext eventHandlerExecutionContext; protected BaseEventBus(IEventHandlerExecutionContext eventHandlerExecutionContext) { this.eventHandlerExecutionContext = eventHandlerExecutionContext; } public abstract Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default) where TEvent : IEvent; public abstract void Subscribe<TEvent, TEventHandler>() where TEvent : IEvent where TEventHandler : IEventHandler<TEvent>; // Disposable接口实现代码省略 }
在上面的代码中,PublishAsync和Subscribe方法是抽象方法,以便子类根据不一样的须要来实现。数据库
接下来就是调整PassThroughEventBus,使其继承于BaseEventBus:json
public sealed class PassThroughEventBus : BaseEventBus { private readonly EventQueue eventQueue = new EventQueue(); private readonly ILogger logger; public PassThroughEventBus(IEventHandlerExecutionContext context, ILogger<PassThroughEventBus> logger) : base(context) { this.logger = logger; logger.LogInformation($"PassThroughEventBus构造函数调用完成。Hash Code:{this.GetHashCode()}."); eventQueue.EventPushed += EventQueue_EventPushed; } private async void EventQueue_EventPushed(object sender, EventProcessedEventArgs e) => await this.eventHandlerExecutionContext.HandleEventAsync(e.Event); public override Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default) { return Task.Factory.StartNew(() => eventQueue.Push(@event)); } public override void Subscribe<TEvent, TEventHandler>() { if (!this.eventHandlerExecutionContext.HandlerRegistered<TEvent, TEventHandler>()) { this.eventHandlerExecutionContext.RegisterHandler<TEvent, TEventHandler>(); } } // Disposable接口实现代码省略 }
代码都很简单,也就很少作说明了,接下来,咱们开始实现RabbitMQEventBus。架构
首先须要新建一个.NET Standard 2.0的项目,使用.NET Standard 2.0的项目模板所建立的项目,能够同时被.NET Framework 4.6.1或者.NET Core 2.0的应用程序所引用。建立新的类库项目的目的,是由于RabbitMQEventBus的实现须要依赖RabbitMQ C#开发库这个外部引用。所以,为了保证框架核心的纯净和稳定,须要在新的类库项目中实现RabbitMQEventBus。app
Note:对于RabbitMQ及其C#库的介绍,本文就再也不涉及了,网上有不少资料和文档,博客园有不少朋友在这方面都有使用经验分享,RabbitMQ官方文档也写得很是详细,固然是英文版的,若是英语比较好的话,建议参考官方文档。框架
如下就是在EdaSample案例中,RabbitMQEventBus的实现,咱们先读一读代码,再对这部分代码作些分析。
public class RabbitMQEventBus : BaseEventBus { private readonly IConnectionFactory connectionFactory; private readonly IConnection connection; private readonly IModel channel; private readonly string exchangeName; private readonly string exchangeType; private readonly string queueName; private readonly bool autoAck; private readonly ILogger logger; private bool disposed; public RabbitMQEventBus(IConnectionFactory connectionFactory, ILogger<RabbitMQEventBus> logger, IEventHandlerExecutionContext context, string exchangeName, string exchangeType = ExchangeType.Fanout, string queueName = null, bool autoAck = false) : base(context) { this.connectionFactory = connectionFactory; this.logger = logger; this.connection = this.connectionFactory.CreateConnection(); this.channel = this.connection.CreateModel(); this.exchangeType = exchangeType; this.exchangeName = exchangeName; this.autoAck = autoAck; this.channel.ExchangeDeclare(this.exchangeName, this.exchangeType); this.queueName = this.InitializeEventConsumer(queueName); logger.LogInformation($"RabbitMQEventBus构造函数调用完成。Hash Code:{this.GetHashCode()}."); } public override Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default(CancellationToken)) { var json = JsonConvert.SerializeObject(@event, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All }); var eventBody = Encoding.UTF8.GetBytes(json); channel.BasicPublish(this.exchangeName, @event.GetType().FullName, null, eventBody); return Task.CompletedTask; } public override void Subscribe<TEvent, TEventHandler>() { if (!this.eventHandlerExecutionContext.HandlerRegistered<TEvent, TEventHandler>()) { this.eventHandlerExecutionContext.RegisterHandler<TEvent, TEventHandler>(); this.channel.QueueBind(this.queueName, this.exchangeName, typeof(TEvent).FullName); } } protected override void Dispose(bool disposing) { if (!disposed) { if (disposing) { this.channel.Dispose(); this.connection.Dispose(); logger.LogInformation($"RabbitMQEventBus已经被Dispose。Hash Code:{this.GetHashCode()}."); } disposed = true; base.Dispose(disposing); } } private string InitializeEventConsumer(string queue) { var localQueueName = queue; if (string.IsNullOrEmpty(localQueueName)) { localQueueName = this.channel.QueueDeclare().QueueName; } else { this.channel.QueueDeclare(localQueueName, true, false, false, null); } var consumer = new EventingBasicConsumer(this.channel); consumer.Received += async (model, eventArgument) => { var eventBody = eventArgument.Body; var json = Encoding.UTF8.GetString(eventBody); var @event = (IEvent)JsonConvert.DeserializeObject(json, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All }); await this.eventHandlerExecutionContext.HandleEventAsync(@event); if (!autoAck) { channel.BasicAck(eventArgument.DeliveryTag, false); } }; this.channel.BasicConsume(localQueueName, autoAck: this.autoAck, consumer: consumer); return localQueueName; } }
阅读上面的代码,须要注意如下几点:
在Customer服务中,使用RabbitMQEventBus就很是简单了,只须要引用RabbitMQEventBus的程序集,而后在Startup.cs文件的ConfigureServices方法中,替换PassThroughEventBus的使用便可:
public void ConfigureServices(IServiceCollection services) { this.logger.LogInformation("正在对服务进行配置..."); services.AddMvc(); services.AddTransient<IEventStore>(serviceProvider => new DapperEventStore(Configuration["mssql:connectionString"], serviceProvider.GetRequiredService<ILogger<DapperEventStore>>())); var eventHandlerExecutionContext = new EventHandlerExecutionContext(services, sc => sc.BuildServiceProvider()); services.AddSingleton<IEventHandlerExecutionContext>(eventHandlerExecutionContext); // services.AddSingleton<IEventBus, PassThroughEventBus>(); var connectionFactory = new ConnectionFactory { HostName = "localhost" }; services.AddSingleton<IEventBus>(sp => new RabbitMQEventBus(connectionFactory, sp.GetRequiredService<ILogger<RabbitMQEventBus>>(), sp.GetRequiredService<IEventHandlerExecutionContext>(), RMQ_EXCHANGE, queueName: RMQ_QUEUE)); this.logger.LogInformation("服务配置完成,已注册到IoC容器!"); }
Note:一种更好的作法是经过配置文件来配置IoC容器,在曾经的Microsoft Patterns and Practices Enterprise Library Unity Container中,使用配置文件是很方便的。这样只须要Customer服务可以经过配置文件来配置IoC容器,同时只须要让Customer服务依赖(注意,不是程序集引用)于不一样的事件总线的实现便可,无需对Customer服务从新编译。
下面来验证一下效果。首先确保RabbitMQ已经配置并启动稳当,我是安装在本地机器上,使用默认安装。首先启动ASP.NET Core Web API,而后经过Powershell发起两次建立Customer的请求:
查看一下数据库是否更新正常:
并检查一下日志信息:
RabbitMQ中Exchange的信息:
本文提供了一种RabbitMQEventBus的实现,目前来讲是够用的,并且这种实现是可使用在实际项目当中的。在实际使用中,或许也会碰到一些与RabbitMQ自己有关的问题,这就须要具体问题具体分析了。此外,本文没有涉及事件消息丢失、重发而后保证最终一致性的问题,这些内容会在后面讨论。从下文开始,咱们着手逐步实现CQRS架构的领域事件和事件存储部分。
本系列文章的源代码在https://github.com/daxnet/edasample这个Github Repo里,经过不一样的release tag来区分针对不一样章节的源代码。本文的源代码请参考chapter_3这个tag,以下:
欢迎访问个人博客新站:http://sunnycoding.net。