由于enode框架的思想是,一次修改只能新建或修改一个聚合根;那么,若是一个用户请求要涉及多个聚合根的新建或修改该怎么办呢?本文的目的就是要分析清楚这个问题在enode框架下是如何解决的。若是想直接经过看代码的朋友,能够直接下载源代码,源码中共有三个例子,BankTransferSagaSample这个例子就是本文所用的例子。node
saga的由来web
saga这个术语,可能不少人都还很陌生。saga的提出,最先是为了解决可能会长时间运行的分布式事务(long-running process)的问题。所谓long-running的分布式事务,是指那些企业业务流程,须要跨应用、跨企业来完成某个事务,甚至在事务流程中还须要有手工操做的参与,这类事务的完成时间可能以分计,以小时计,甚至可能以天计。这类事务若是按照事务的ACID的要求去设计,势必形成系统的可用性大大的下降。试想一个由两台服务器一块儿参与的事务,服务器A发起事务,服务器B参与事务,B的事务须要人工参与,因此处理时间可能很长。若是按照ACID的原则,要保持事务的隔离性、一致性,服务器A中发起的事务中使用到的事务资源将会被锁定,不容许其余应用访问到事务过程当中的中间结果,直到整个事务被提交或者回滚。这就形成事务A中的资源被长时间锁定,系统的可用性将不可接受。服务器
而saga,则是一种基于补偿的消息驱动的用于解决long-running process的一种解决方案。目标是为了在确保系统高可用的前提下尽可能确保数据的一致性。仍是上面的例子,若是用saga来实现,那就是这样的流程:服务器A的事务先执行,若是执行顺利,那么事务A就先行提交;若是提交成功,那么就开始执行事务B,若是事务B也执行顺利,则事务B也提交,整个事务就算完成。可是若是事务B执行失败,那事务B自己须要回滚,这时由于事务A已经提交,因此须要执行一个补偿操做,将已经提交的事务A执行的操做做反操做,恢复到未执行前事务A的状态。这样的基于消息驱动的实现思路,就是saga。咱们能够看出,saga是牺牲了数据的强一致性,仅仅实现了最终一致性,可是提升了系统总体的可用性。架构
CQRS架构下的saga (process manager)并发
上面一段,咱们知道了saga的由来,如今咱们再看一下CQRS架构下,saga是用来作什么的。虽然都叫saga,可是实际上在CQRS架构下,人们每每用saga来解决DDD中多个聚合或多个bounded context之间的通讯问题。DDD中有bounded context的概念。一个bounded context表明一个上下文边界,一个bounded context中可能包含一个或多个聚合。而saga就是用来实现bounded context之间的通讯,或者是聚合之间的通讯。在经典DDD中,咱们一般用领域服务来实现多个聚合的协调,并最终经过事务的方式来提交全部聚合的修改;这样作的后果是,1:用到了事务;2.一个事务涉及了多个聚合的更改;这样作没什么很差,在条件容许的状况下(好比不会出现分布式事务的状况下或者并发修改的请求数不高的状况下),这样作没什么特别大的问题。惟一的问题是,这样作会增长并发冲突的概率。如今的web应用,每每都是多用户在同时向系统发送各类处理请求,因此咱们不难想到,一个事务中涉及到的聚合越多,那并发冲突的可能性就越高。不只如此,若是你的聚合很大,包含了不少的子实体和不少的方法,那该聚合持久化时产生并发冲突的概率也会相对较高;而系统的并发冲突将直接影响系统的可用性;因此,通常的建议是,咱们应该尽可能将聚合设计的小,且尽可能一次只修改一个聚合;这样咱们就不须要事务,还能把并发冲突的可能性降到最低;固然,单个聚合持久化时也还会存在并发冲突,但这点相对很容易解决,由于单个聚合是数据一致性的最小单元,因此咱们能够彻底不须要事务,经过乐观锁就能解决并发覆盖的问题;关于这个问题的讨论,你们若是还有兴趣或者想了解的更加深刻,我推荐看一下Effective Aggregate Design这篇论文,共三个部分,其做者是《implementing domain-driven design》一书的做者。框架
因此,经过上面的分析,咱们知道了“聚合应该设计的小,且一次修改只修改一个聚合”这样一条不成文的原则。固然你必定有不少理由认为不该该这样,欢迎你们讨论。那么若是要遵循这样的原则,那咱们须要一种机制来解决多个聚合之间的通讯的问题。在CQRS的架构下,人们也都把这种机制叫作saga,但由于这种CQRS架构下的saga的语义已经不是上面一段中介绍的saga了。因此,微软也把cqrs架构下的saga叫作process manager,具体能够看一下微软的一个CQRS架构的开源项目的例子;process manager这个名字咱们应该很容易理解,即流程管理器。事实上,一个saga所作的事情就是和一个流程同样的事情。只不过传统的流程,都有一个流程定义,当用户发起一个流程时,就会产生一个流程实例,该流程实例会严格按照流程定义的流向来进行流转,驱动流程流转的每每是人的操做,好比审批操做。而process manager,也是一个流程,只不过这个流程是由消息驱动的。一个典型的process manager会响应事件,而后产生新的命令去执行下一步操做。用过NServiceBus的人应该知道,NServiceBus中就内置了saga的机制,咱们能够很轻松的利用它来实现分布式的消息驱动的long-running process;dom
如何用enode框架来实现saga异步
为了说明问题,我就以经典的银行转帐为例子来说解吧,由于转帐的核心业务你们都很清楚,因此咱们就没有了业务上理解的不一致,咱们就能专心思考如何实现的问题了。可是,为了便于下面的分析,我仍是简单定义一下本例中的银行转帐的核心业务流程。注意:实际的转帐业务流程远比我定义的复杂,我这里重点是为了分析如何实现一个会涉及多个聚合修改的的业务场景。核心业务描述以下:分布式
两个银行帐号,一个是源帐号,一个是目标帐号;函数
用户点击确认转帐按钮后,指定数目的钱会从源帐号转入到目标帐号;
整个转帐过程有两个阶段:1)钱从源帐号转出;2)钱转入到目标帐号;若是一切顺利,那转帐流程就结束了;
若是源帐号的当前余额不足,则转出操做会失败,系统记录错误日志,转帐流程结束;
若是钱转入到目标帐号时出现异常,则须要回滚源帐号已转出的钱,同时记录错误日志,回滚完成后,转帐流程结束;
思路分析:
经过上面的需求,咱们知道,应该有一个聚合根,表示银行帐号,我设计为BankAccount;BankAccount有转出钱和转入钱的行为职责。另外,根据上面第5条需求,BankAccount可能会有回滚转出钱的行为职责;另外,固然一个银行帐号还会有一个表示当前余额的状态属性;
因为咱们是经过saga的思想来实现转帐流程,那咱们具体该如何设计此saga呢?saga在CQRS架构中的做用是响应事件,产生command,从而起到以事件消息驱动的原理来控制流程流转的做用;转帐流程如何才能结束会由saga来决定。那么saga要响应的事件哪里来呢?很明显,就是从流程中涉及到的聚合根里来,本例就是响应BankAccount的事件;当BankAccount的转出事件或转入事件发生后,会被saga响应,而后saga会作出响应,决定下一步该怎么走。
saga是聚合根吗?或者说saga属于领域层的东西吗?这个问题很重要,我以为没有明确的答案。并且我也没有从各类资料上明确看到saga是属于ddd中的应用层仍是领域层仍是其余层。如下是我我的的一些思考:
关于认为saga应该属于领域层的缘由的一些思考:和经典的DDD作类比,经典DDD的书本上,会有一个银行转帐的领域服务,该领域服务完成转帐的核心业务逻辑;而一些外围的逻辑,如记录日志、发送邮件或短信通知等功能,则在外围的应用层服务中作掉;因此按照这个理解,假如咱们设计一个saga,来实现转帐的核心业务逻辑,那我以为saga也是一个聚合根。由于saga是一个流程,职责是控制转帐的过程,它有一个全局惟一的流程ID(一次转帐就会产生一个转帐流程,流程ID是流程的全局惟一标识),属于领域层,saga能够理解为是领域中对行为过程的建模。固然saga与普通的聚合根稍有区别,普通的聚合根咱们一般是根据名词法则去识别,而saga则是从交互行为或者流程的角度去识别;这点就比如经典DDD的领域模型中有聚合根和领域服务同样,聚合根是数据的建模,领域服务是交互行为的建模;
关于认为saga不该该属于领域层的缘由的一些思考:按照saga在CQRS架构下的定义,它会接受响应event,而后发送command。而command是应用层的东西,因此就会致使domain层依赖于应用层,显然不太合理;
关于认为saga不该该属于应用层的缘由的一些思考:由于saga是流程,且有流程状态,有状态就须要保存,这样就变成应用层中的saga须要保存状态,这种作法合理吗?值得咱们深思;另外,按照经典DDD的架构中对应用层的职责定义,应用层应该是很薄的,更加不会出现须要保存状态的属于应用层的对象;
经过上面第3点的一些讨论,我我的会把saga设计在领域层,设计为聚合根,可是,我会对saga的实现作一些调整:1)saga聚合根不会直接响应事件,而是通过一个中间command来过分;2)saga聚合根也不会直接发送command,取而代之的是像普通聚合根同样也产生事件,这种事件表达的意思是“发送某某command的意图已下达”,而后外层的event handler接受到这样的事件后,会发送对应的command给command service;这样一来,saga聚合根就和普通的聚合根无任何差异,听上去感受很难以想象,稍后咱们结合代码一块儿看一下具体实现吧。
若是saga也是一个聚合根,那不是和BankAccount平级了,那BankAccount产生的事件如何传递给saga呢?显然,咱们还缺乏同样东西,就是须要把流程中涉及到修改的聚合根产生的事件传递给saga聚合根的event handler。这种event handler自己无业务逻辑,他们的职责是监听聚合根产生的事件,而后将event转化为command,而后将command发送到command service,从而最后通知到对应的saga,而后saga就开始处理该事件,好比决定接下来该如何处理;
代码实现:
BankAccount聚合根的设计
/// <summary>银行帐号聚合根 /// </summary> [Serializable] public class BankAccount : AggregateRoot<Guid>, IEventHandler<AccountOpened>, //银行帐户已开 IEventHandler<Deposited>, //钱已存入 IEventHandler<TransferedOut>, //钱已转出 IEventHandler<TransferedIn>, //钱已转入 IEventHandler<TransferOutRolledback> //转出已回滚 { /// <summary>帐号(卡号) /// </summary> public string AccountNumber { get; private set; } /// <summary>拥有者 /// </summary> public string Owner { get; private set; } /// <summary>当前余额 /// </summary> public double Balance { get; private set; } public BankAccount() : base() { } public BankAccount(Guid accountId, string accountNumber, string owner) : base(accountId) { RaiseEvent(new AccountOpened(Id, accountNumber, owner)); } /// <summary>存款 /// </summary> /// <param name="amount"></param> public void Deposit(double amount) { RaiseEvent(new Deposited(Id, amount, string.Format("向帐户{0}存入金额{1}", AccountNumber, amount))); } /// <summary>转出 /// </summary> /// <param name="targetAccount"></param> /// <param name="processId"></param> /// <param name="transferInfo"></param> public void TransferOut(BankAccount targetAccount, Guid processId, TransferInfo transferInfo) { //这里判断当前余额是否足够 if (Balance < transferInfo.Amount) { throw new Exception(string.Format("帐户{0}余额不足,不能转帐!", AccountNumber)); } RaiseEvent(new TransferedOut(processId, transferInfo, string.Format("{0}向帐户{1}转出金额{2}", AccountNumber, targetAccount.AccountNumber, transferInfo.Amount))); } /// <summary>转入 /// </summary> /// <param name="sourceAccount"></param> /// <param name="processId"></param> /// <param name="transferInfo"></param> public void TransferIn(BankAccount sourceAccount, Guid processId, TransferInfo transferInfo) { RaiseEvent(new TransferedIn(processId, transferInfo, string.Format("{0}从帐户{1}转入金额{2}", AccountNumber, sourceAccount.AccountNumber, transferInfo.Amount))); } /// <summary>回滚转出 /// </summary> /// <param name="processId"></param> /// <param name="transferInfo"></param> public void RollbackTransferOut(Guid processId, TransferInfo transferInfo) { RaiseEvent(new TransferOutRolledback(processId, transferInfo, string.Format("帐户{0}取消转出金额{1}", AccountNumber, transferInfo.Amount))); } void IEventHandler<AccountOpened>.Handle(AccountOpened evnt) { AccountNumber = evnt.AccountNumber; Owner = evnt.Owner; } void IEventHandler<Deposited>.Handle(Deposited evnt) { Balance += evnt.Amount; } void IEventHandler<TransferedOut>.Handle(TransferedOut evnt) { Balance -= evnt.TransferInfo.Amount; } void IEventHandler<TransferedIn>.Handle(TransferedIn evnt) { Balance += evnt.TransferInfo.Amount; } void IEventHandler<TransferOutRolledback>.Handle(TransferOutRolledback evnt) { Balance += evnt.TransferInfo.Amount; } }
转帐流程TransferProcess聚合根的设计
/// <summary>转帐流程状态 /// </summary> public enum ProcessState { NotStarted, Started, TransferOutRequested, TransferInRequested, RollbackTransferOutRequested, Completed, Aborted } /// <summary>转帐信息值对象,包含了转帐的基本信息 /// </summary> [Serializable] public class TransferInfo { public Guid SourceAccountId { get; private set; } public Guid TargetAccountId { get; private set; } public double Amount { get; private set; } public TransferInfo(Guid sourceAccountId, Guid targetAccountId, double amount) { SourceAccountId = sourceAccountId; TargetAccountId = targetAccountId; Amount = amount; } } /// <summary>银行转帐流程聚合根,负责控制整个转帐的过程,包括遇到异常时的回滚处理 /// </summary> [Serializable] public class TransferProcess : AggregateRoot<Guid>, IEventHandler<TransferProcessStarted>, //转帐流程已开始 IEventHandler<TransferOutRequested>, //转出的请求已发起 IEventHandler<TransferInRequested>, //转入的请求已发起 IEventHandler<RollbackTransferOutRequested>, //回滚转出的请求已发起 IEventHandler<TransferProcessCompleted>, //转帐流程已正常完成 IEventHandler<TransferProcessAborted> //转帐流程已异常终止 { /// <summary>当前转帐流程状态 /// </summary> public ProcessState State { get; private set; } public TransferProcess() : base() { } public TransferProcess(BankAccount sourceAccount, BankAccount targetAccount, TransferInfo transferInfo) : base(Guid.NewGuid()) { RaiseEvent(new TransferProcessStarted(Id, transferInfo, string.Format("转帐流程启动,源帐户:{0},目标帐户:{1},转帐金额:{2}", sourceAccount.AccountNumber, targetAccount.AccountNumber, transferInfo.Amount))); RaiseEvent(new TransferOutRequested(Id, transferInfo)); } /// <summary>处理已转出事件 /// </summary> /// <param name="transferInfo"></param> public void HandleTransferedOut(TransferInfo transferInfo) { RaiseEvent(new TransferInRequested(Id, transferInfo)); } /// <summary>处理已转入事件 /// </summary> /// <param name="transferInfo"></param> public void HandleTransferedIn(TransferInfo transferInfo) { RaiseEvent(new TransferProcessCompleted(Id, transferInfo)); } /// <summary>处理转出失败的状况 /// </summary> /// <param name="transferInfo"></param> public void HandleFailedTransferOut(TransferInfo transferInfo) { RaiseEvent(new TransferProcessAborted(Id, transferInfo)); } /// <summary>处理转入失败的状况 /// </summary> /// <param name="transferInfo"></param> public void HandleFailedTransferIn(TransferInfo transferInfo) { RaiseEvent(new RollbackTransferOutRequested(Id, transferInfo)); } /// <summary>处理转出已回滚事件 /// </summary> /// <param name="transferInfo"></param> public void HandleTransferOutRolledback(TransferInfo transferInfo) { RaiseEvent(new TransferProcessAborted(Id, transferInfo)); } void IEventHandler<TransferProcessStarted>.Handle(TransferProcessStarted evnt) { State = ProcessState.Started; } void IEventHandler<TransferOutRequested>.Handle(TransferOutRequested evnt) { State = ProcessState.TransferOutRequested; } void IEventHandler<TransferInRequested>.Handle(TransferInRequested evnt) { State = ProcessState.TransferInRequested; } void IEventHandler<RollbackTransferOutRequested>.Handle(RollbackTransferOutRequested evnt) { State = ProcessState.RollbackTransferOutRequested; } void IEventHandler<TransferProcessCompleted>.Handle(TransferProcessCompleted evnt) { State = ProcessState.Completed; } void IEventHandler<TransferProcessAborted>.Handle(TransferProcessAborted evnt) { State = ProcessState.Aborted; } }
响应BankAccount聚合根所发生的事件的event handler设计
/// <summary>事件订阅者,用于监听和响应银行帐号聚合根产生的事件 /// </summary> public class BankAccountEventHandler : IEventHandler<AccountOpened>, //银行帐户已开 IEventHandler<Deposited>, //钱已存入 IEventHandler<TransferedOut>, //钱已转出 IEventHandler<TransferedIn>, //钱已转入 IEventHandler<TransferOutRolledback> //转出已回滚 { private ICommandService _commandService; public BankAccountEventHandler(ICommandService commandService) { _commandService = commandService; } void IEventHandler<AccountOpened>.Handle(AccountOpened evnt) { Console.WriteLine(string.Format("建立银行帐户{0}", evnt.AccountNumber)); } void IEventHandler<Deposited>.Handle(Deposited evnt) { Console.WriteLine(evnt.Description); } void IEventHandler<TransferedOut>.Handle(TransferedOut evnt) { Console.WriteLine(evnt.Description); //响应已转出事件,发送“处理已转出事件”的命令 _commandService.Send(new HandleTransferedOut { ProcessId = evnt.ProcessId, TransferInfo = evnt.TransferInfo }); } void IEventHandler<TransferedIn>.Handle(TransferedIn evnt) { Console.WriteLine(evnt.Description); //响应已转入事件,发送“处理已转入事件”的命令 _commandService.Send(new HandleTransferedIn { ProcessId = evnt.ProcessId, TransferInfo = evnt.TransferInfo }); } void IEventHandler<TransferOutRolledback>.Handle(TransferOutRolledback evnt) { Console.WriteLine(evnt.Description); //响应转出已回滚事件,发送“处理转出已回滚事件”的命令 _commandService.Send(new HandleTransferOutRolledback { ProcessId = evnt.ProcessId, TransferInfo = evnt.TransferInfo }); } }
响应TransferProcess聚合根所发生的事件的event handler设计
/// <summary>事件订阅者,用于监听和响应转帐流程聚合根产生的事件 /// </summary> public class TransferProcessEventHandler : IEventHandler<TransferProcessStarted>, //转帐流程已开始 IEventHandler<TransferOutRequested>, //转出的请求已发起 IEventHandler<TransferInRequested>, //转入的请求已发起 IEventHandler<RollbackTransferOutRequested>, //回滚转出的请求已发起 IEventHandler<TransferProcessCompleted>, //转帐流程已完成 IEventHandler<TransferProcessAborted> //转帐流程已终止 { private ICommandService _commandService; public TransferProcessEventHandler(ICommandService commandService) { _commandService = commandService; } void IEventHandler<TransferProcessStarted>.Handle(TransferProcessStarted evnt) { Console.WriteLine(evnt.Description); } void IEventHandler<TransferOutRequested>.Handle(TransferOutRequested evnt) { //响应“转出的命令请求已发起”这个事件,发送“转出”命令 _commandService.Send(new TransferOut { ProcessId = evnt.ProcessId, TransferInfo = evnt.TransferInfo }, (result) => { //这里是command的异步回调函数,若是有异常,则发送“处理转出失败”的命令 if (result.Exception != null) { Console.WriteLine(result.Exception.Message); _commandService.Send(new HandleFailedTransferOut { ProcessId = evnt.ProcessId, TransferInfo = evnt.TransferInfo }); } }); } void IEventHandler<TransferInRequested>.Handle(TransferInRequested evnt) { //响应“转入的命令请求已发起”这个事件,发送“转入”命令 _commandService.Send(new TransferIn { ProcessId = evnt.ProcessId, TransferInfo = evnt.TransferInfo }, (result) => { //这里是command的异步回调函数,若是有异常,则发送“处理转入失败”的命令 if (result.Exception != null) { Console.WriteLine(result.Exception.Message); _commandService.Send(new HandleFailedTransferIn { ProcessId = evnt.ProcessId, TransferInfo = evnt.TransferInfo }); } }); } void IEventHandler<RollbackTransferOutRequested>.Handle(RollbackTransferOutRequested evnt) { //响应“回滚转出的命令请求已发起”这个事件,发送“回滚转出”命令 _commandService.Send(new RollbackTransferOut { ProcessId = evnt.ProcessId, TransferInfo = evnt.TransferInfo }); } void IEventHandler<TransferProcessCompleted>.Handle(TransferProcessCompleted evnt) { Console.WriteLine("转帐流程已正常完成!"); } void IEventHandler<TransferProcessAborted>.Handle(TransferProcessAborted evnt) { Console.WriteLine("转帐流程已异常终止!"); } }
BankAccount聚合根相关的command handlers
/// <summary>银行帐户相关命令处理 /// </summary> public class BankAccountCommandHandlers : ICommandHandler<OpenAccount>, //开户 ICommandHandler<Deposit>, //存钱 ICommandHandler<TransferOut>, //转出 ICommandHandler<TransferIn>, //转入 ICommandHandler<RollbackTransferOut> //回滚转出 { public void Handle(ICommandContext context, OpenAccount command) { context.Add(new BankAccount(command.AccountId, command.AccountNumber, command.Owner)); } public void Handle(ICommandContext context, Deposit command) { context.Get<BankAccount>(command.AccountId).Deposit(command.Amount); } public void Handle(ICommandContext context, TransferOut command) { var sourceAccount = context.Get<BankAccount>(command.TransferInfo.SourceAccountId); var targetAccount = context.Get<BankAccount>(command.TransferInfo.TargetAccountId); sourceAccount.TransferOut(targetAccount, command.ProcessId, command.TransferInfo); } public void Handle(ICommandContext context, TransferIn command) { var sourceAccount = context.Get<BankAccount>(command.TransferInfo.SourceAccountId); var targetAccount = context.Get<BankAccount>(command.TransferInfo.TargetAccountId); targetAccount.TransferIn(sourceAccount, command.ProcessId, command.TransferInfo); } public void Handle(ICommandContext context, RollbackTransferOut command) { context.Get<BankAccount>(command.TransferInfo.SourceAccountId).RollbackTransferOut(command.ProcessId, command.TransferInfo); } }
TransferProcess聚合根相关的command handlers
/// <summary>银行转帐流程相关命令处理 /// </summary> public class TransferProcessCommandHandlers : ICommandHandler<StartTransfer>, //开始转帐 ICommandHandler<HandleTransferedOut>, //处理已转出事件 ICommandHandler<HandleTransferedIn>, //处理已转入事件 ICommandHandler<HandleFailedTransferOut>, //处理转出失败 ICommandHandler<HandleFailedTransferIn>, //处理转入失败 ICommandHandler<HandleTransferOutRolledback> //处理转出已回滚事件 { public void Handle(ICommandContext context, StartTransfer command) { var sourceAccount = context.Get<BankAccount>(command.TransferInfo.SourceAccountId); var targetAccount = context.Get<BankAccount>(command.TransferInfo.TargetAccountId); context.Add(new TransferProcess(sourceAccount, targetAccount, command.TransferInfo)); } public void Handle(ICommandContext context, HandleTransferedOut command) { context.Get<TransferProcess>(command.ProcessId).HandleTransferedOut(command.TransferInfo); } public void Handle(ICommandContext context, HandleTransferedIn command) { context.Get<TransferProcess>(command.ProcessId).HandleTransferedIn(command.TransferInfo); } public void Handle(ICommandContext context, HandleFailedTransferOut command) { context.Get<TransferProcess>(command.ProcessId).HandleFailedTransferOut(command.TransferInfo); } public void Handle(ICommandContext context, HandleFailedTransferIn command) { context.Get<TransferProcess>(command.ProcessId).HandleFailedTransferIn(command.TransferInfo); } public void Handle(ICommandContext context, HandleTransferOutRolledback command) { context.Get<TransferProcess>(command.ProcessId).HandleTransferOutRolledback(command.TransferInfo); } }