在《当咱们在讨论CQRS时,咱们在讨论些神马》中,咱们讨论了当使用CQRS的过程当中,须要关心的一些问题。其中与CQRS关联最为紧密的模式莫过于Event Sourcing了,CQRS与ES的结合,为咱们构造高性能、可扩展系统提供了基本思路。本文将介绍
Kanasz Robert在《Introduction to CQRS》中的示例项目Diary.CQRS。html
该项目为Kanasz Robert为了介绍CQRS模式而写的一个测试项目,原始项目能够经过访问《Introduction to CQRS》来获取,因为项目版本比较旧,没有使用nuget管理程序包等,致使下载之后并不能正常运行,我下载了这个项目,升级到Visual Studio 2017,从新引用了StructMap框架(使用nuget),移除了Web层报错的代码,并上传到博客园,能够从这里下载:Diary.CQRS.rarweb
Diary.CQRS项目的场景为日记本管理,提供了新增、编辑、删除、列表等功能,整个解决方案分为三个项目:数据库
这是一个很好的入门项目,功能简单、结构清晰,概念覆盖全面。若是CQRS是一个城堡,那么Diary.CQRS则是打开第一重门的钥匙,接下来让咱们一块儿推开这扇门吧。安全
运行项目,最早看到的是一个Web页面,以下图:架构
很简单,只有一个Add按钮,当咱们点击之后,会进入添加的页面:并发
咱们填上一些内容,而后点击Save按钮,就会返回到列表页,咱们能够看到已添加的条目:app
而后咱们进行编辑操做,点击列表中的Edit按钮,跳转到编辑页面:框架
虽然页面中显示的是Add,但确实是Edit页面。咱们编辑之后点击Save按钮,而后返回列表页便可看到编辑后的内容。ide
在列表页中,若是咱们点击Delete按钮,则会删除改条目。函数
到此为止,咱们已经看到了这个项目的全部页面,一个简单的CURD操做。咱们继续看它的代码(在HomeController中)。
public ActionResult Index() { ViewBag.Model = ServiceLocator.ReportDatabase.GetItems(); return View(); }
经过ServiceLocator定位ReportDatabase,并从ReportDatabase中获取全部条目。
public ActionResult Add() { return View(); } [HttpPost] public ActionResult Add(DiaryItemDto item) { ServiceLocator.CommandBus.Send(new CreateItemCommand(Guid.NewGuid(), item.Title, item.Description, -1, item.From, item.To)); return RedirectToAction("Index"); }
两个方法:
public ActionResult Edit(Guid id) { var item = ServiceLocator.ReportDatabase.GetById(id); var model = new DiaryItemDto() { Description = item.Description, From = item.From, Id = item.Id, Title = item.Title, To = item.To, Version = item.Version }; return View(model); } [HttpPost] public ActionResult Edit(DiaryItemDto item) { ServiceLocator.CommandBus.Send(new ChangeItemCommand(item.Id, item.Title, item.Description, item.From, item.To, item.Version)); return RedirectToAction("Index"); }
仍然是两个方法:
public ActionResult Delete(Guid id) { var item = ServiceLocator.ReportDatabase.GetById(id); ServiceLocator.CommandBus.Send(new DeleteItemCommand(item.Id, item.Version)); return RedirectToAction("Index"); }
对于删除操做来讲,它没有视图页面,接收到请求之后,先获取该记录,建立并发送DeleteImteCommand命令,而后返回到Index页面
题外话:对于改变数据状态的操做,使用Get请求是不可取的,可能存在安全隐患
经过上面的代码,你会发现全部的操做都是从ServiceLocator发起的,经过它咱们可以定位到CommandBus和ReportDatabase,从而进行相应的操做,咱们在接下来会介绍ServiceLocator类。
Diary.CQRS.Configuration 项目中定义了ServiceLocator类,这个类的做用是完成IoC容器的服务注册、服务定位功能。例如咱们能够经过ServiceLocator获取到CommandBus实例、获取ReportDatabase实例。
ServiceLocator使用StructureMap做为依赖注入框架,提供了服务注册、服务导航的功能。ServiceLocator类经过静态构造函数完成对服务注册和服务实例化工做:
static ServiceLocator() { if (!_isInitialized) { lock (_lockThis) { ContainerBootstrapper.BootstrapStructureMap(); _commandBus = ObjectFactory.GetInstance<ICommandBus>(); _reportDatabase = ObjectFactory.GetInstance<IReportDatabase>(); _isInitialized = true; } } }
首先调用ContainerBootstrapper.BootstrapStructureMap()方法,这个方法里面包含了对将服务添加到容器的代码;而后使用容器建立CommandBus和ReportDatabase的实例。
ServiceLocator的重要之处在于对外暴露了两个相当重要的实例,分别处理CQRS中的Command和Query。
为何没有Event相关操做呢?到目前为止咱们尚未涉及到,由于对于UI层来讲,用户的意图都是经过Command表示的,而数据的状态变化才会触发Event。
在ServiceLocator中定义了获取CommandBus和ReportDatabase的方法,咱们顺着这两个对象继续分析。
在基于消息的系统设计中,咱们常会看到总线的身影,Command也是一种消息,因此使用总线是再合适不过的了。CommandBus就是咱们在Diary.CQRS项目中用到的一种消息总线。
在Diary.CQRS中,它被定义在Messaging目录,在这个目录下面,还有与Event相关的EventBus,咱们稍后再进行介绍。
CommandBus实现ICommandBus接口,ICommandBus接口的定义以下:
public interface ICommandBus { void Send<T>(T command) where T : Command; }
它只包含了Send方法,用来将命令发送到对应的处理程序。
CommandBus是ICommand的实现,具体代码以下:
public class CommandBus:ICommandBus { private readonly ICommandHandlerFactory _commandHandlerFactory; public CommandBus(ICommandHandlerFactory commandHandlerFactory) { _commandHandlerFactory = commandHandlerFactory; } public void Send<T>(T command) where T : Command { var handler = _commandHandlerFactory.GetHandler<T>(); if (handler!=null) { handler.Execute(command); } else { throw new Exception(); } } }
在CommandBus中,显式依赖ICommandHandlerFactory类,经过构造函数进行注入。那么 _commandHandlerFactory 的做用是什么呢?咱们在Send方法中能够看到,经过 _commandHandlerFactory 能够获取到与Command对应的CommandHandler(命令处理程序),在程序的设计上,每个Command都会有一个对应的CommandHandler,而手工判断类型、实例化处理程序显然不符合使用习惯,此处采用工厂模式来获取命令处理程序。
当获取到与Command对应的CommandHandler后,调用handler的Execute方法,执行该命令。
截止目前为止,咱们又接触了三个概念:CommandHandlerFactory、CommandHandler、Command:
使用简单工厂模式,用来获取与命令对应的处理程序。它的代码在Utils文件夹中,它的做用是提供一种获取Handler的方式,因此它只能做为工具存在。
接口定义以下:
public interface ICommandHandlerFactory { ICommandHandler<T> GetHandler<T>() where T : Command; }
只有GetHandler一个方法,它的实现是 StructureMapCommandHandlerFactory,即经过StructureMap做为依赖注入框架来实现的,代码也比较简单,这里再也不贴出来了。
命令是表明用户的意图、并包含与意图相关的数据,好比用户想要添加一条数据,这即是一个意图,因而就有了CreateItemCommand,用户要在界面上填写添加操做必须的数据,因而就有了命令的属性。
关于命令的定义以下:
public interface ICommand { Guid Id { get; } } public class Command : ICommand { public Guid Id { get; private set; } public int Version { get; set; } public Command(Guid id, int version) { Id = id; Version = version; } }
Command类:实现了ICommand接口,并增长了Version属性,用来标记当前操做对应的聚合跟的版本。
为何要有版本的概念的?由于当使用ES模式的时候,数据库中的数据都是事件产生的数据镜像,保存了某个时间点的数据快照,若是要获取到最新的数据,则须要经过加载该聚合根对应的全部Event来回放到最新状态。若是引入版本的概念,每个Event对应一个版本,而景象中的数据也有一个版本,在进行回放的时候,能够仅加载高版本的Event进行回放,节省了系统资源,并提升了运行效率。
命令处理程序,它的做用是处理与它相对应的命令,处理CQRS的核心,接口定义以下:
public interface ICommandHandler<TCommand> where TCommand : Command { void Execute(TCommand command); }
它接收command做为参数,执行该命令的处理逻辑。每个命令都有一个与之对应的处理程序。
咱们再从新梳理一下流程,首先用户要新增一个数据,点击保存按钮后,生成CreateItemCommand命令,随后这个命令被发送到CommandBus中,CommandBus经过CommandHandlerFactory找到该Command的处理程序,此时在CommandBus的Send方法中,咱们有一个Command和CommandHandler,而后调用CommandHandler的Execute方法,即完成了该方法的处理。至此,Command的处理流程完结。
咱们来看一下CreateItemCommand的代码:
public class CreateItemCommand : Command { public string Title { get; internal set; } public string Description { get; internal set; } public DateTime From { get; internal set; } public DateTime To { get; internal set; } public CreateItemCommand(Guid aggregateId, string title, string description, int version, DateTime from, DateTime to) : base(aggregateId, version) { Title = title; Description = description; From = from; To = to; } }
它继承自Command基类,继承后即拥有了Id和Version属性,而后又定义了几个其它的属性。它只包含数据,与该命令对应的处理程序叫作CreateItemCommandHandler,代码以下:
public class CreateItemCommandHandler : ICommandHandler<CreateItemCommand> { private IRepository<DiaryItem> _repository; public CreateItemCommandHandler(IRepository<DiaryItem> repository) { _repository = repository; } public void Execute(CreateItemCommand command) { if (command == null) { throw new Exception(); } if (_repository == null) { throw new Exception(); } var aggregate = new DiaryItem(command.Id, command.Title, command.Description, command.From, command.To); aggregate.Version = -1; _repository.Save(aggregate, aggregate.Version); } }
这才是咱们要分析的核心,在Handler中,咱们看到了Repository,看到了DiaryItem聚合:
在上面的代码中,因为是新增,因此聚合的版本为-1,而后调用仓储的Save方法进行保存。咱们继续往下扒,看看仓储和聚合的实现。
对于Repository的定义,仍然先看一下接口中的定义,代码以下:
public interface IRepository<T> where T : AggregateRoot, new() { void Save(AggregateRoot aggregate, int expectedVersion); T GetById(Guid id); }
在仓储中只有两个方法:
关于IRepository的实现,代码在Repository.cs中,咱们拆开来进行介绍:
private readonly IEventStorage _eventStorage; private static object _lock = new object(); public Repository(IEventStorage eventStorage) { _eventStorage = eventStorage; }
首先是它的构造函数,强依赖IEventStorage,经过构造函数注入。EventStorage是事件的储存仓库,有个更为熟知的名字EventStore,咱们稍后进行介绍。
public T GetById(Guid id) { IEnumerable<Event> events; var memento = _eventStorage.GetMemento<BaseMemento>(id); if (memento != null) { events = _eventStorage.GetEvents(id).Where(e => e.Version >= memento.Version); } else { events = _eventStorage.GetEvents(id); } var obj = new T(); if (memento != null) { ((IOriginator)obj).SetMemento(memento); } obj.LoadsFromHistory(events); return obj; }
GetById(Guid id)方法经过Id获取一个聚合对象,获取一个聚合对象有如下几个步骤:
加载Event列表,加载到的事件列表将用来作事件回放。
若是获取到快照的话,则加载版本高于该快照版本的事件列表,若是没有获取到快照,则加载所有事件列表。此处在上面已经介绍过,经过快照的方式保存聚合对象,在获取数据时能够减小重放事件的数量,起到提升加载速度的做用。
var obj = new T();
。加载历史事件,完成重放。完成这个步骤之后,聚合根将更新到最新状态。
经过这几个步骤之后,咱们获得了一个最新状态的聚合根对象。
public void Save(AggregateRoot aggregate, int expectedVersion) { if (aggregate.GetUncommittedChanges().Any()) { lock (_lock) { var item = new T(); if (expectedVersion != -1) { item = GetById(aggregate.Id); if (item.Version != expectedVersion) { throw new Exception(); } } _eventStorage.Save(aggregate); } } }
Save方法,用来保存一个聚合根对象。在这个方法中,参数expectedVersion表示指望的版本,这里约定-1
为新增的聚合根,当聚合根为新增的时候,会直接调用EventStorage中的Save方法。
关于expectedVersion参数,咱们能够理解为对并发的控制,只有当expectedVersion与GetById获取到的聚合根对象的版本相同时才能进行保存操做。
在介绍Repository类的时候,咱们接触了两个新的概念:EventStorage和AggregateRoot,接下来咱们分别进行介绍。
AggregateRoot是聚合根,他表示一组强关联的领域对象,全部对象的状态变动只能经过聚合根来完成,这样能够保证数据的一致性,以及减小并发冲突。应用到EventSourcing模式中,聚合根的好处也是很明显的,咱们全部对数据状态的变动都经过聚合根完成,而每次变动,聚合根都会生成相应的事件,在进行事件回放的时候,又经过聚合根来完成历史事件的加载。由此咱们能够看到,聚合根对象应该具有生成事件、重放事件的能力。
咱们来看看聚合根基类的定义,在Domain文件夹中:
public abstract class AggregateRoot : IEventProvider{ // ...... }
首先这是一个抽象类,实现了IEventProvider接口,该接口的定义以下:
public interface IEventProvider { void LoadsFromHistory(IEnumerable<Event> history); IEnumerable<Event> GetUncommittedChanges(); }
它定义了两个方法,咱们分别进行说明:
为了实现这个接口,聚合根中定义了 List<Event> _changes
对象,用来临时存储全部未提交的事件,该对象在构造函数中进行初始化。
AggregateRoot中对于该事件的实现以下:
public void LoadsFromHistory(IEnumerable<Event> history) { foreach (var e in history) { ApplyChange(e, false); } Version = history.Last().Version; EventVersion = Version; } public IEnumerable<Event> GetUncommittedChanges() { return _changes; }
LoadsFromHistory方法遍历历史事件,并调用ApplyChange方法更新聚合根的状态,在完成更新后设置版本号为最后一个事件的版本。GetUncommittedChanges方法比较简单,返回对象的_changes事件列表。
接下来咱们看看ApplyChange方法,该方法有两个实现,代码以下:
protected void ApplyChange(Event @event) { ApplyChange(@event, true); } protected void ApplyChange(Event @event, bool isNew) { dynamic d = this; d.Handle(Converter.ChangeTo(@event, @event.GetType())); if (isNew) { _changes.Add(@event); } }
这两个方法定义为protected,只能被子类访问。咱们能够理解为,ApplyChange(Event @event)方法为简化操做,对第二个参数进行了默认为true的操做,而后调用ApplyChange(Event @event, bool isNew)方法。
在ApplyChange(Event @event, bool isNew)方法中,调用了聚合根的Handle方法,用来处理事件。若是isNew参数为true,则将事件添加到change列表中,若是为false,则认为是在进行事件回放,因此不进行事件的添加。
须要注意的是,聚合根的Handle方法,与EventHandler不一样,当Event产生之后,首先由它对应的聚合根进行处理,所以聚合根要具有处理该事件的能力,如何具有呢?聚合根要实现IHandle接口,该接口的定义以下:
public interface IHandle<TEvent> where TEvent:Event { void Handle(TEvent e); }
这里能够看出,IHandle接口是泛型的,它只对一个具体的Event类型生效,在代码上的体现以下:
public class DiaryItem : AggregateRoot, IHandle<ItemCreatedEvent>, IHandle<ItemRenamedEvent>, IHandle<ItemFromChangedEvent>, IHandle<ItemToChangedEvent>, IHandle<ItemDescriptionChangedEvent>, IOriginator { //...... }
最后,聚合根还定义了清除全部事件的方法,代码以下:
public void MarkChangesAsCommitted() { _changes.Clear(); }
MarkChangesAsCommitted()方法用来清空事件列表。
终于到咱们今天的另一个核心内容了,Event是ES中的一等公民,全部的状态变动最终都以Event的形式进行存储,当咱们要查看聚合根最新状态的时候,能够经过事件回放来获取。咱们来看看Event的定义:
public interface IEvent { Guid Id { get; } }
IEvent接口定义了一个事件必须拥有惟一的Id进行标识。而后Event实现了IEvent接口:
public class Event:IEvent { public int Version; public Guid AggregateId { get; set; } public Guid Id { get; private set; } }
能够看到,除了Id属性外,还添加了两个字段Version和AggregateId。AggregateId表示该事件关联的聚合根Id,经过该Id能够获取到惟一的聚合根对象;Version表示事件发生时该事件的版本,每次产生新的事件,Version都会进行累加。
从而能够知道,在EventStorage中,聚合根Id对应的全部Event中的Version是顺序累加的,按照Version进行排序能够获得事件发生的前后顺序。
顾名思义,EventStorage是用来存储Event的地方。在Diary.CQRS中,EventStorage的定义以下:
public interface IEventStorage { IEnumerable<Event> GetEvents(Guid aggregateId); void Save(AggregateRoot aggregate); T GetMemento<T>(Guid aggregateId) where T : BaseMemento; void SaveMemento(BaseMemento memento); }
Diary.CQRS中使用InMemory的方式实现了EventStorage,属性和构造函数以下:
private List<Event> _events; private List<BaseMemento> _mementoes; private readonly IEventBus _eventBus; public InMemoryEventStorage(IEventBus eventBus) { _events = new List<Event>(); _mementoes = new List<BaseMemento>(); _eventBus = eventBus; }
当Event生成后,它并无立刻存入EventStorage,而是在Repository显示调用Save方法时,仓储将存储权交给了EventStorage,EventStorage是事件仓库,事件仓储在存储时进行了以下操做:
Save方法的代码以下:
public void Save(AggregateRoot aggregate) { var uncommittedChanges = aggregate.GetUncommittedChanges(); var version = aggregate.Version; foreach (var @event in uncommittedChanges) { version++; if (version > 2) { if (version % 3 == 0) { var originator = (IOriginator)aggregate; var memento = originator.GetMemento(); memento.Version = version; SaveMemento(memento); } } @event.Version = version; _events.Add(@event); } foreach (var @event in uncommittedChanges) { var desEvent = Converter.ChangeTo(@event, @event.GetType()); _eventBus.Publish(desEvent); } }
至此Event的处理流程就算完结了。此时全部的操做都是在主库完成的,当事件被发布之后,订阅了该事件的全部Handler都将会被触发。
在Diary.CQRS项目中,EventHandler都被用来处理ReportDatabase了。
当你使用ES模式时,都存在一个严重问题,那就是数据查询的问题。当用户进行数据检索是,必然会使用各类查询条件,然而不管那种事件仓库都很难知足复杂查询。为了解决此问题,ReportDatabase就显得格外重要。
ReportDatabase的做用被定义为获取数据、应对数据查询、生成报表等,它的结构与主库不一样,能够根据不一样的业务场景进行定义。
ReportDatabase的数据不是经过业务逻辑进行更新的,它经过订阅Event进行更新。在本示例中ReportDatabase实现的很简单,接口定义以下:
public interface IReportDatabase { DiaryItemDto GetById(Guid id); void Add(DiaryItemDto item); void Delete(Guid id); List<DiaryItemDto> GetItems(); }
实现上,经过内存中维护一个列表,每次接收到事件之后,都对相应数据进行更新,此处不在贴出。
在上文中已经介绍过Event,而针对Event的处理,实现逻辑上与Command很是类似,惟一的区别是,命令只能够有一个对应的处理程序,而事件则能够有多个处理程序。因此在EventHandlerFactory中获取处理程序的方法返回了EventHandler列表,代码以下:
public IEnumerable<IEventHandler<T>> GetHandlers<T>() where T : Event { var handlers = GetHandlerType<T>(); var lstHandlers = handlers.Select(handler => (IEventHandler<T>)ObjectFactory.GetInstance(handler)).ToList(); return lstHandlers; }
在EventBus中,若是一个事件没有处理程序也不会引起错误,若是有一个或多个处理程序,则会以此调用他们的Handle方法,代码以下:
public void Publish<T>(T @event) where T : Event { var handlers = _eventHandlerFactory.GetHandlers<T>(); foreach (var eventHandler in handlers) { eventHandler.Handle(@event); } }
Diary.CQRS是一个典型的CQRS+ES演示项目,经过对该项目的分析,咱们能了解到Command、AggregateRoot、Event、EventStorage、ReportDatabase的基础知识,了解他们相互关系,尤为是如何进行事件存储、如何进行事件回放的内容。
另外,咱们发如今使用CQRS+ES的过程当中,项目的复杂度增长了不少,咱们不可避免的要使用EventStore、Messaging等架构,从而影响那些不了解CQRS的团队成员的加入,所以在应用到实际项目的时候,要适可而止,慎重选择,避免过分设计。
因为这是一个示例,项目代码中存在不少不够严谨的地方,你们在学习的过程当中应进行甄别。
因为本人的知识有限,若是内容中存在不许确或错误的地方,还请不吝赐教!