在前面两篇文章中,我详细介绍了基本事件系统的实现,包括事件派发和订阅、经过事件处理器执行上下文来解决对象生命周期问题,以及一个基于RabbitMQ的事件总线的实现。接下来对于事件驱动型架构的讨论,就须要结合一个实际的架构案例来进行分析。在领域驱动设计的讨论范畴,CQRS架构自己就是事件驱动的,所以,我打算首先介绍一下CQRS架构下相关部分的实现,而后再继续讨论事件驱动型架构实现的具体问题。html
固然,CQRS架构自己的实现也是根据实际状况的不一样,须要具体问题具体分析的,不只如此,CQRS架构的实现也是很是复杂的,毫不是一套文章一套案例可以解释清楚并涵盖所有的。因此,我不会把大部分篇幅放在CQRS架构实现的细节上,而是会着重介绍与咱们的主题相关的内容,并对无关的内容进行弱化。或许,在这个系列文章结束的时候,咱们会获得一个完整的、可以运行的CQRS架构系统,不过,这套系统极有可能仅供技术研讨和学习使用,没法直接用于生产环境。git
基于这样的前提,咱们今天首先看一下CQRS架构中聚合与聚合根的实现,或许你会以为目前讨论的内容与你本打算关心的事件驱动架构没什么关系,而事实是,CQRS架构中聚合与聚合根的实现是彻底面向事件驱动的,而这部份内容也会为咱们以后的讨论作下铺垫。不只如此,我还会在本文讨论一些基于.NET/C#的软件架构设计的思考与实践(请注意文章中我添加了Note字样而且字体加粗的句子),所以,我仍是会推荐你继续读完这篇文章。github
早在2010年,我针对CQRS架构总结过一篇文章,题目是:《EntityFramework之领域驱动设计实践【扩展阅读】:CQRS体系结构模式》,固然,这篇文章跟Entity Framework本没啥关系,只是延续了领域驱动设计这一话题进行的扩展讨论罢了。这篇文章介绍了CQRS架构模式所产生的背景、结构,以及相关的一些概念,好比:最近很是流行的词语:“事件溯源”、解决事件溯源性能问题的“快照”、用于存取事件数据的“事件存储(Event Store)”,还有从新认识了什么叫作“对象的状态”,等等。此外,在后续的博文中,我也常常对CQRS架构中的实现细节作些探讨,有兴趣的读者能够翻看我过去的博客文章。整体上讲,CQRS架构基本符合下图所描述的结构:sql
看上去是否是特别复杂?没错,特别复杂,并且每一个部分均可以使用不一样的工具、框架,以不一样的形式进行实现。整个架构甚至能够是语言、平台异构的,还能够跟外部系统进行整合,实现大数据分析、呈现等等,玩法可谓之五花八门,这些通通都不在咱们的讨论范围以内。咱们今天打算讨论的,就是上图右上部分“领域模型”框框里的主题:CQRS架构中的聚合与聚合根。数据库
说到聚合与聚合根,了解过领域驱动设计(DDD)的读者确定对这两个概念很是熟悉。一般状况下,具备相同生命周期,组合起来可以共同表述一种领域概念的一组模型对象,就能够组成一个聚合。在每一个聚合中,衔接各个领域模型对象,并向外提供统一访问聚合的对象,就是聚合根。聚合中的全部对象,离开聚合根,就不能完整地表述一个领域概念。好比:收货地址没法离开客户,订单详情没法离开订单,库存没法离开货品等等。因此从定义上来看,一个聚合大概就是这样:编程
好吧,对这些概念比较熟悉的读者来讲,我在此算是多啰嗦了几句。接下来,让咱们结合CQRS架构中命令处理器对领域模型的更改过程来看看,除了以上这些常规特征以外,聚合与聚合根还有哪些特殊之处。当命令处理器接到操做命令时,便开始对领域模型进行更改,步骤以下:设计模式
接下来在事件消息总线和事件处理器中将会发生的事情,咱们从此还会讨论,这里就很少说了。从这个过程,咱们不可贵出:缓存
听起来是否是很是复杂?确实如此。那咱们就先从领域事件入手,逐步实现CQRS中的聚合与聚合根。安全
领域事件,顾名思义,就是从领域模型中产生的事件消息。概念上很简单,好比,客户登陆网站,就会由客户登陆实体产生一个事件派发出去,例如CustomerLoggedOnEvent,表示客户登陆这件事已经发生了。虽然在DDD的实践中,领域事件更多地在CQRS架构中被讨论,其实即使是非事件驱动型架构,也能够经过领域模型来发布消息,达到系统解耦的目的。架构
延续以前的设计,咱们的领域事件继承了IEvent接口,并增长了三个属性/方法,此外,为了编程方便,咱们实现了领域事件的抽象类,UML类图以下:
图中的绿色部分就是在以前咱们的事件模型上新加的接口和类,用以表述领域事件的概念。其中:
好了,若是说咱们将发生在某聚合上的领域事件保存到关系型数据库,那么,当须要得到该聚合的全部领域事件时,只须要下面一句SQL就好了:
SELECT * FROM [Events] WHERE [AggregateRootId]=aggregateRootId AND [AggregateRootType]=aggregateRootType ORDER BY [Sequence] ASC
这就是最简单的事件存储数据库的实现了。不过,咱们暂时不介绍这些内容。
事实上,与标准的事件(IEvent接口)相比,除了上面三个主要的属性以外,领域事件还能够包含更多的属性和方法,这就要看具体的需求和设计了。不过目前为止,咱们定义这三个属性已经够用了,不要把问题搞得太复杂。
有了领域事件的基本模型,咱们开始设计CQRS下的聚合。
因为外界访问聚合都是经过聚合根来实现的,所以,针对聚合的操做都会被委托给聚合根来处理。好比,当用户地址发生变化时,服务层会调用Customer.ChangeAddress方法,这个方法就会产生一个领域事件,并经过内联的事件处理机制更改聚合中Address值对象中的状态。因而,从技术角度,聚合的设计也就是聚合根的实现。
首先须要设计的是与聚合相关的概念所表述的接口、类及其之间的关系。结合领域驱动设计中的概念,咱们获得下面的设计:
其中,实体(IEntity)、聚合根(IAggregateRoot)都是你们耳熟能详的领域驱动设计的概念。因为实体都是经过Id进行惟一标识,因此,IEntity会有一个id的属性,为了简单起见,咱们使用Guid做为它的类型。聚合根(IAggregateRoot)继承于IEntity接口,有趣的是,在咱们目前的场景中,IAggregateRoot并不包含任何成员,它仅仅是一个空接口,在整个框架代码中,它仅做为泛型的类型约束。Note:这种作法其实也是很是常见的一种框架设计模式。具备事件溯源能力的聚合根(IAggregateRootWithEventSourcing)又继承于IAggregateRoot接口,而且有以下三个成员:
此外,你还发现咱们还有两个神奇的接口:IPurgable和IPersistedVersionSetter。这两个接口的职责是:
Note:为何不将这两个接口中的方法直接放在IAggregateRootWithEventSourcing中呢?是由于单一职责原则。聚合自己不该该存在所谓之“清空缓存”或者“设置保存版本号”这样的概念,这样的概念对于技术人员来讲比较容易理解,但是若是将这些技术细节加入领域模型中,就会污染领域模型,形成领域专家没法理解领域模型,这是违背面向对象分析与设计的单一职责原则的,也违背了领域驱动设计的原则。那么,即便把这些方法经过额外的接口独立出去,实现了IAggregateRootWithEventSourcing接口的类型,不仍是要实现这两个接口中的方法吗?这样,聚合的访问者不仍是能够访问这两个额外的方法吗?的确如此,这些接口是须要被实现的,可是咱们可使用C#中接口的显式实现,这样的话,若是不将IAggregateRootWithEventSourcing强制转换成IPurgable或者IPersistedVersionSetter的话,是没法直接经过聚合根对象自己来访问这些方法的,这起到了很是好的保护做用。接口的显式实如今软件系统的框架设计中也是经常使用手段。
在上面的类图中,IAggregateRootWithEventSourcing最终由AggregateRootWithEventSourcing抽象类实现。不要抱怨类的名字太长,它有助于咱们理解这一类型在咱们的领域模型中的角色和功能。下面的代码列出了该抽象类的主要部分的实现:
public abstract class AggregateRootWithEventSourcing : IAggregateRootWithEventSourcing { private readonly Lazy<Dictionary<string, MethodInfo>> registeredHandlers; private readonly Queue<IDomainEvent> uncommittedEvents = new Queue<IDomainEvent>(); private Guid id; private long persistedVersion = 0; private object sync = new object(); protected AggregateRootWithEventSourcing() : this(Guid.NewGuid()) { } protected AggregateRootWithEventSourcing(Guid id) { registeredHandlers = new Lazy<Dictionary<string, MethodInfo>>(() => { var registry = new Dictionary<string, MethodInfo>(); var methodInfoList = from mi in this.GetType().GetMethods(BindingFlags.Public | BindingFlags.NonPublic | BindingFlags.Instance) let returnType = mi.ReturnType let parameters = mi.GetParameters() where mi.IsDefined(typeof(HandlesInlineAttribute), false) && returnType == typeof(void) && parameters.Length == 1 && typeof(IDomainEvent).IsAssignableFrom(parameters[0].ParameterType) select new { EventName = parameters[0].ParameterType.FullName, MethodInfo = mi }; foreach (var methodInfo in methodInfoList) { registry.Add(methodInfo.EventName, methodInfo.MethodInfo); } return registry; }); Raise(new AggregateCreatedEvent(id)); } public Guid Id => id; long IPersistedVersionSetter.PersistedVersion { set => Interlocked.Exchange(ref this.persistedVersion, value); } public IEnumerable<IDomainEvent> UncommittedEvents => uncommittedEvents; public long Version => this.uncommittedEvents.Count + this.persistedVersion; void IPurgable.Purge() { lock (sync) { uncommittedEvents.Clear(); } } public void Replay(IEnumerable<IDomainEvent> events) { ((IPurgable)this).Purge(); events.OrderBy(e => e.Timestamp) .ToList() .ForEach(e => { HandleEvent(e); Interlocked.Increment(ref this.persistedVersion); }); } [HandlesInline] protected void OnAggregateCreated(AggregateCreatedEvent @event) { this.id = @event.NewId; } protected void Raise<TDomainEvent>(TDomainEvent domainEvent) where TDomainEvent : IDomainEvent { lock (sync) { // 首先处理事件数据。 this.HandleEvent(domainEvent); // 而后设置事件的元数据,包括当前事件所对应的聚合根类型以及 // 聚合的ID值。 domainEvent.AggregateRootId = this.id; domainEvent.AggregateRootType = this.GetType().AssemblyQualifiedName; domainEvent.Sequence = this.Version + 1; // 最后将事件缓存在“未提交事件”列表中。 this.uncommittedEvents.Enqueue(domainEvent); } } private void HandleEvent<TDomainEvent>(TDomainEvent domainEvent) where TDomainEvent : IDomainEvent { var key = domainEvent.GetType().FullName; if (registeredHandlers.Value.ContainsKey(key)) { registeredHandlers.Value[key].Invoke(this, new object[] { domainEvent }); } } }
上面的代码不算复杂,它根据上面的分析和描述,实现了IAggregateRootWithEventSourcing接口,篇幅缘由,就很少作解释了,不过有几点仍是能够鉴赏一下的:
如今,咱们已经实现了CQRS架构下的聚合与聚合根,虽然实际上这个结构有可能比咱们的实现更为复杂,可是目前的这个设计已经可以知足咱们进一步研究讨论的需求了。下面,咱们再更进一步,看看CQRS中仓储应该如何实现。
为何说是“初探”?由于咱们目前打算实现的仓储暂时不包含事件派发的逻辑,这部份内容我会在后续文章中讲解。首先看看,仓储的接口是什么样的。在CQRS架构中,仓储只具有两种操做:
你或许会问,那根据某个条件查询知足该条件的全部聚合对象呢?注意,这是CQRS架构中查询部分的职责,不属于咱们的讨论范围。
一般,仓储的接口定义以下:
public interface IRepository { Task SaveAsync<TAggregateRoot>(TAggregateRoot aggregateRoot) where TAggregateRoot : class, IAggregateRootWithEventSourcing; Task<TAggregateRoot> GetByIdAsync<TAggregateRoot>(Guid id) where TAggregateRoot : class, IAggregateRootWithEventSourcing; }
与以前领域事件的设计相似,咱们为仓储定义一个抽象类,全部仓储的实现都应该基于这个抽象类:
public abstract class Repository : IRepository { protected Repository() { } public async Task<TAggregateRoot> GetByIdAsync<TAggregateRoot>(Guid id) where TAggregateRoot : class, IAggregateRootWithEventSourcing { var domainEvents = await LoadDomainEventsAsync(typeof(TAggregateRoot), id); var aggregateRoot = ActivateAggregateRoot<TAggregateRoot>(); aggregateRoot.Replay(domainEvents); return aggregateRoot; } public async Task SaveAsync<TAggregateRoot>(TAggregateRoot aggregateRoot) where TAggregateRoot : class, IAggregateRootWithEventSourcing { var domainEvents = aggregateRoot.UncommittedEvents; await this.PersistDomainEventsAsync(domainEvents); aggregateRoot.PersistedVersion = aggregateRoot.Version; aggregateRoot.Purge(); } protected abstract Task<IEnumerable<IDomainEvent>> LoadDomainEventsAsync(Type aggregateRootType, Guid id); protected abstract Task PersistDomainEventsAsync(IEnumerable<IDomainEvent> domainEvents); private TAggregateRoot ActivateAggregateRoot<TAggregateRoot>() where TAggregateRoot : class, IAggregateRootWithEventSourcing { var constructors = from ctor in typeof(TAggregateRoot).GetTypeInfo().GetConstructors() let parameters = ctor.GetParameters() where parameters.Length == 0 || (parameters.Length == 1 && parameters[0].ParameterType == typeof(Guid)) select new { ConstructorInfo = ctor, ParameterCount = parameters.Length }; if (constructors.Count() > 0) { TAggregateRoot aggregateRoot; var constructorDefinition = constructors.First(); if (constructorDefinition.ParameterCount == 0) { aggregateRoot = (TAggregateRoot)constructorDefinition.ConstructorInfo.Invoke(null); } else { aggregateRoot = (TAggregateRoot)constructorDefinition.ConstructorInfo.Invoke(new object[] { Guid.NewGuid() }); } // 将AggregateRoot下的全部事件清除。事实上,在AggregateRoot的构造函数中,已经产生了AggregateCreatedEvent。 aggregateRoot.Purge(); return aggregateRoot; } return null; } }
代码也是很是简单、容易理解的:GetByIdAsync方法根据给定的聚合根类型以及ID值,从后台存储中读取全部属于该聚合的领域事件,并在聚合上进行回放,以便将聚合恢复到存储前的状态;SaveAsync方法则从聚合根上得到全部未被提交的领域事件,将这些事件保存到后台存储,而后设置聚合的“已保存版本”,最后清空未提交事件的缓存。剩下的就是如何实现LoadDomainEventsAsync以及PersistDomainEventsAsync两个方法了。而这两个方法,本来就应该是事件存储对象的职责范围了。
Note:你也许会问:若是某个聚合从开始到如今,已经发生了大量的领域事件了,那么这样一条条地将事件回放到聚合上,岂不是性能很是低下?没错,这个问题咱们能够经过快照来解决。在后续文章中我会介绍。你还会问:日积月累,事件存储系统中的事件数量岂不是会愈来愈多吗?须要删除吗?答案是:不删!不过能够对数据进行归档,或者依赖一些第三方框架来处理这个问题,可是,从领域驱动设计的角度,领域事件表明着整个领域模型系统中发生过的全部事情,事情既然已经发生,就没法再被抹去,所以,删除事件存储系统中的事件是不合理的。那数据量愈来愈大怎么办?答案是:或许,存储硬件设备要比业务数据更便宜。
仓储的实现咱们暂且探索到这一步,目前咱们只须要有一个正确的聚合保存、读取(经过领域事件重塑)的逻辑就能够了,并不须要关心事件自己是如何被读取被保存的。接下来,咱们在.NET Core的测试项目中,借助Moq框架,经过Mock一个假想的仓储,来验证整个系统从聚合、聚合根的实现到仓储设计的正确性。
Moq是一个很好的Mock框架,简单轻量,并且支持.NET Core,在单元测试的项目中使用Moq是一种很好的实践。Moq上手很是简单,只须要在单元测试项目上添加Moq的NuGet依赖包就能够开始着手编写测试用例了。为了测试咱们的聚合根以及仓储对聚合根保存、读取的设计,首先咱们定义一个简单的聚合:
public class Book : AggregateRootWithEventSourcing { public void ChangeTitle(string newTitle) { this.Raise(new BookTitleChangedEvent(newTitle)); } public string Title { get; private set; } [HandlesInline] private void OnTitleChanged(BookTitleChangedEvent @event) { this.Title = @event.NewTitle; } public override string ToString() { return Title; } }
Book类是一个聚合根,它继承AggregateRootWithEventSourcing抽象类,同时它有一个属性,Title,表示书的名称,而ChangeTitle方法(业务方法)会直接产生一个BookTitleChangedEvent领域事件,以后,OnTitleChanged成员函数会负责将领域事件中的NewTitle的值设置到Book聚合根的Title状态上,完成书本标题的更新。与之相关的BookTitleChangedEvent的定义以下:
public class BookTitleChangedEvent : DomainEvent { public BookTitleChangedEvent(string newTitle) { this.NewTitle = newTitle; } public string NewTitle { get; set; } public override string ToString() { return $"{Sequence} - {NewTitle}"; } }
首先,下面两个测试用例用于测试Book聚合自己产生领域事件的过程是否正确,若是正确,那么当Book自己本构造时,会产生一个AggregateCreatedEvent,若是更改书本的标题,则又会产生一个BookTitleChangedEvent,因此,第一个测试中,book的版本应该为1,而第二个则为2:
[Fact] public void CreateBookTest() { // Arrange & Act var book = new Book(); // Assert Assert.NotEqual(Guid.Empty, book.Id); Assert.Equal(1, book.Version); } [Fact] public void ChangeBookTitleEventTest() { // Arrange var book = new Book(); // Act book.ChangeTitle("Hit Refresh"); // Assert Assert.Equal("Hit Refresh", book.Title); Assert.Equal(2, book.UncommittedEvents.Count()); Assert.Equal(2, book.Version); }
接下来,测试仓储保存Book聚合的正确性,由于咱们没有实现一个有效的仓储实例,所以,这里借助Moq帮咱们动态生成。在下面的代码中,让Moq对仓储抽象类的PersisDomainEventsAsync受保护成员进行动态生成,指定当它被任何IEnumerable<IDomainEvent>做为参数调用时,都将这些事件保存到一个本地的List中,因而,最后只须要检查List中的领域事件是否符合咱们的要求就能够了。代码以下:
[Fact] public async Task PersistBookTest() { // Arrange var domainEventsList = new List<IDomainEvent>(); var mockRepository = new Mock<Repository>(); mockRepository.Protected().Setup<Task>("PersistDomainEventsAsync", ItExpr.IsAny<IEnumerable<IDomainEvent>>()) .Callback<IEnumerable<IDomainEvent>>(evnts => domainEventsList.AddRange(evnts)) .Returns(Task.CompletedTask); var book = new Book(); // Act book.ChangeTitle("Hit Refresh"); await mockRepository.Object.SaveAsync(book); // Assert Assert.Equal(2, domainEventsList.Count); Assert.Empty(book.UncommittedEvents); Assert.Equal(2, book.Version); }
同理,咱们还能够测试仓储读取聚合并恢复聚合状态的正确性,一样仍是使用Moq对仓储的LoadDomainEventsAsync进行Mock:
[Fact] public async Task RetrieveBookTest() { // Arrange var fakeId = Guid.NewGuid(); var domainEventsList = new List<IDomainEvent> { new AggregateCreatedEvent(fakeId), new BookTitleChangedEvent("Hit Refresh") }; var mockRepository = new Mock<Repository>(); mockRepository.Protected().Setup<Task<IEnumerable<IDomainEvent>>>("LoadDomainEventsAsync", ItExpr.IsAny<Type>(), ItExpr.IsAny<Guid>()) .Returns(Task.FromResult(domainEventsList.AsEnumerable())); // Act var book = await mockRepository.Object.GetByIdAsync<Book>(fakeId); // Assert Assert.Equal(fakeId, book.Id); Assert.Equal("Hit Refresh", book.Title); Assert.Equal(2, book.Version); Assert.Empty(book.UncommittedEvents); }
好了,其它的几个测试用例就很少作介绍了,使用Visual Studio运行一下测试而后查看结果就能够了:
本文又是一篇长篇幅的文章,好吧,要介绍的东西太多,并且这些内容又不能单独割开成多个主题,因此也就很难控制篇幅了。文章主要介绍了基于CQRS架构的聚合以及聚合根的设计与实现,同时引出了仓储的部分实现,这些内容也是为从此进一步讨论事件驱动型架构作准备。本文介绍的内容对于一个真实的CQRS系统实现来讲仍是有必定差距的,但整体结构也大体如此。文中还说起了快照的概念,这部份内容我从此在介绍事件存储的实现部分还会详细讨论,下一章打算扩展一下仓储自己,了解一下仓储对领域事件的派发,以及事件处理器对领域事件的处理。
本系列文章的源代码在https://github.com/daxnet/edasample这个Github Repo里,经过不一样的release tag来区分针对不一样章节的源代码。本文的源代码请参考chapter_4这个tag,以下: