CQRS之旅——旅程6(咱们系统的版本管理)

旅程6:咱们系统的版本管理

准备下一站:升级和迁移

“变化是生活的调味品。”威廉·考珀
复制代码

此阶段的最高目标是了解如何升级包含实现CQRS模式和事件源的限界上下文的系统。团队在这一阶段实现的用户场景包括对代码的更改和对数据的更改:更改了一些现有的数据模式并添加了新的数据模式。除了升级系统和迁移数据外,团队还计划在没有停机时间的状况下进行升级和迁移,以便在Microsoft Azure中运行实时系统。git

本章的工做术语定义:

本章使用了一些术语,咱们将在下面进行描述。有关更多细节和可能的替代定义,请参阅参考指南中的“深刻CQRS和ES”。github

  • 命令(Command):命令是要求系统执行更改系统状态的操做。命令是必须服从(执行)的一种指令,例如:MakeSeatReservation。在这个限界上下文中,命令要么来自用户发起请求时的UI,要么来自流程管理器(当流程管理器指示聚合执行某个操做时)。单个接收方处理一个命令。命令总线(command bus)传输命令,而后命令处理程序将这些命令发送到聚合。发送命令是一个没有返回值的异步操做。web

  • 事件(Event):一个事件,好比OrderConfirmed,描述了系统中发生的一些事情,一般是一个命令的结果。领域模型中的聚合引起事件。事件也能够来自其余限界上下文。多个订阅者能够处理特定的事件。聚合将事件发布到事件总线。处理程序在事件总线上注册特定类型的事件,而后将事件传递给订阅服务器。在订单和注册限界上下文中,订阅者是流程管理器和读取模型生成器。数据库

  • 幂等性(Idempotency):幂等性是一个操做的特性,这意味着该操做能够屡次应用而不改变结果。例如,“将x的值设置为10”的操做是幂等的,而“将x的值加1”的操做不是幂等的。在消息传递环境中,若是消息能够屡次传递而不改变结果,则消息是幂等的:这多是由于消息自己的性质,也多是由于系统处理消息的方式。c#

用户故事:

在这个过程的这个阶段,团队实现了下面描述的用户故事。windows

不停机升级

V2版本的目标是升级系统,包括任何须要的数据迁移,而不须要把系统停机。若是这在当前实现中不可行,那么停机时间应该最小化,而且应该修改系统,以便在未来支持零停机时间升级(从V3版本开始)。浏览器

Beth(业务经理)发言:
确保咱们可以在不停机的状况下进行升级,这对咱们在市场中的信誉相当重要。安全

显示剩余座位数量

目前,当注册者建立一个订单时,没有显示每种座位类型的剩余座位数量。当注册者选择购买座位时,UI应该显示此信息。服务器

处理不须要付费的座位

目前,当注册者选择不须要付费的座位时,UI流仍然会将注册者带到支付页面,即便不须要支付任何费用。系统应该检测何时没有支付,并调整流程,让注册者直接进入订单的确认页面。session

架构

该应用程序旨在部署到Microsoft Azure。在旅程的这个阶段,应用程序由两个角色组成,一个包含ASP.Net MVC Web应用程序的web角色和一个包含消息处理程序和领域对象的工做角色。应用程序在写端和读端都使用Azure SQL DataBase实例进行数据存储。应用程序使用Azure服务总线来提供其消息传递基础设施。下图展现了这个高级体系结构。

在研究和测试解决方案时,能够在本地运行它,可使用Azure compute emulator,也能够直接运行MVC web应用程序,并运行承载消息处理程序和领域域对象的控制台应用程序。在本地运行应用程序时,可使用本地SQL Server Express数据库,并使用一个在SQL Server Express数据库实现的简单的消息传递基础设施。

有关运行应用程序的选项的更多信息,请参见附录1“发布说明”。

模式和概念

在旅程的这个阶段,团队处理的大多数关键挑战都与如何最好地执行从V1到V2的迁移有关。本节将介绍其中的一些挑战。

处理“事件定义发生更改"的状况

当团队检查V2的发布需求,很明显,咱们须要改变在订单和注册限界上下文中使用的一些事件来适应一些新特性:RegistrationProcessManager将会改变,当订单有一个不须要付费的座位时系统将提供一个更好的用户体验。

订单和注册限界上下文使用事件源,所以在迁移到V2以后,事件存储将包含旧事件,但将开始保存新事件。当系统事件被重放时,系统必须能正确处理全部的旧事件和新事件。

团队考虑了两种方法来处理系统中的这类更改。

在基础设施中进行事件映射或过滤

在基础设施中映射和过滤事件消息是一种选择。此方法是对旧的事件消息和消息格式进行处理,在它们到达领域以前在基础设施的某个位置处理它们。您能够过滤掉再也不相关的旧消息,并使用映射将旧格式的消息转换为新格式。这种方法最初比较复杂,由于它须要对基础设施进行更改,可是它能够保持领域域的纯粹,领域只须要理解当前的新事件集合就能够了。

在聚合中处理多个版本的消息

在聚合中处理多个版本的消息是另外一种选择。在这种方法中,全部消息类型(包括旧消息和新消息)都传递到领域,每一个聚合必须可以处理旧消息和新消息。从短时间来看,这多是一个合适的策略,但它最终会致使域模型受到遗留事件处理程序的污染。

团队为V2版本选择了这个选项,由于它包含了最少数量的代码更改。

Jana(软件架构师)发言:
当前在聚合中处理旧事件和新事件并不妨碍您之后使用第一种选择:在基础设施中使用映射/过滤机制。

履行消息幂等性

V2版本中要解决的一个关键问题是使系统更加健壮。在V1版本中,在某些场景中,可能会屡次处理某些消息,致使系统中的数据不正确或不一致。

Jana(软件架构师)发言:
消息幂等性在任何使用消息传递的系统中都很重要,这不只仅是在实现CQRS模式或使用事件源的系统中。

在某些场景中,设计幂等消息是可能的,例如:使用“将座位配额设置为500”的消息,而不是“在座位配额中增长100”的消息。您能够安全地屡次处理第一个消息,但不能处理第二个消息。

然而,并不老是可以使用幂等消息,所以团队决定使用Azure服务总线的重复删除特性,以确保它只传递一次消息。团队对基础设施进行了一些更改,以确保Azure服务总线可以检测重复消息,并配置Azure服务总线来执行重复消息检测。

要了解Contoso是如何实现这一点的,请参阅下面的“不让命令消息重复”一节。此外,咱们须要考虑系统中的消息处理程序如何从队列和Topic检索消息。当前的方法使用Azure服务总线peek/lock机制。这是一个分红三个阶段的过程:

  1. 处理程序从队列或Topic检索消息,并在其中留下消息的锁定副本。其余客户端没法看到或访问锁定的消息。
  2. 处理程序处理消息。
  3. 处理程序从队列中删除锁定的消息。若是锁定的消息在固定时间后没有解锁或删除,则解锁该消息并使其可用,以便再次检索。

若是步骤因为某种缘由失败,这意味着系统能够不止一次地处理消息。

Jana(软件架构师)发言:
该团队计划在旅程的下一阶段解决这个问题(步骤失败的问题)。更多信息,请参见第7章“添加弹性和优化性能”。

阻止屡次处理事件

在V1中,在某些场景里,若是在处理事件时发生错误,系统可能屡次处理事件。为了不这种状况,团队修改了体系结构,以便每一个事件处理程序都有本身对Azure Topic的订阅。下图显示了两个不一样的模型。

在V1中,可能发生如下行为:

  1. EventProcessor实例从服务总线中的全部订阅者那里接收到OrderPlaced事件。
  2. EventProcessor实例有两个已注册的处理程序,RegistrationProcessManagerRouterOrderViewModelGenerator处理程序类,因此会在两个里都触发调用Handle方法。
  3. OrderViewModelGenerator类中的Handle方法执行成功。
  4. RegistrationProcessManagerRouter类中的Handle方法抛出异常。
  5. EventProcessor实例捕获到异常而后抛弃掉事件消息。消息将自动放回订阅中。
  6. EventProcessor实例第二次从全部订阅者那里接收到OrderPlaced事件。
  7. 事件又触发两个处理方法,致使RegistrationProcessManagerRouter类和OrderViewModelGenerator第二次处理事件消息。
  8. 每当RegistrationProcessManagerRouter类抛出异常时,OrderViewModelGenerator类都会触发处理该事件。

在V2模型中,若是处理程序类抛出异常,EventProcessor实例将事件消息放回与该处理程序类关联的订阅。重试逻辑如今只会致使EventProcessor实例重试引起异常的处理程序,所以没有其余处理程序会从新处理消息。

集成事件的持久化

在V1版本中提出的一个问题是,系统如何持久化从会议管理限界上下文发送到订单和注册限界上下文的集成事件。这些事件包括关于会议建立和发布的信息,以及座位类型和配额更改的详细信息。

在V1版本中,订单和注册上下文中的ConferenceViewModelGenerator类经过更新视图模型并向SeatsAvailability聚合发送命令来处理这些事件,以告诉它更改座位配额值。

这种方法意味着订单和注册限界上下文不存储任何历史记录,这可能会致使问题。例如,其余视图从这里中查找座椅类型描述时,这里只包含座椅类型描述的最新值。所以,在其余地方重播一组事件可能会从新生成另外一个包含不正确座椅类型描述的读取模型投影。

团队考虑了如下五个方法来纠正这种状况:

  • 将全部事件保存在原始限界上下文中(会议管理限界上下文中),并使用共享的事件存储,订单和注册限界上下文中能够访问该存储来重播这些事件。接收限界上下文能够重放事件流,直到它须要查看的以前的座椅类型描述时为止。
  • 当全部事件到达接收限界上下文(订单和注册限界上下文)时保存它们。
  • 让视图模型生成器中的命令处理程序保存事件,只选择它须要的那些。
  • 让视图模型生成器中的命令处理程序保存不一样的事件,实际上就是为此视图模型使用事件源。
  • 未来自全部限界上下文的全部命令和事件消息存储在消息日志中。

第一种选择并不老是可行的。在这种特殊状况下,它能够工做,由于同一个团队同时实现了限界上下文和基础设施,使得使用共享事件存储变得很容易。

Gary(CQRS专家)发言:
尽管从纯粹主义者的角度来看,第一个选项破坏了限界上下文之间的严格隔离,但在某些场景中,它多是一个可接受的实用解决方案。

第三种选择可能存在的风险是,所需的事件集合可能在将来发生变化。若是咱们如今不保存事件,它们将永远丢失。

尽管第五个选项存储了全部命令和事件,其中一些可能永远都不须要再次引用,但它确实提供了一个完整的日志,记录了系统中发生的全部事情。这对于故障诊断颇有用,还能够帮助您知足还没有肯定的需求。该团队选择了这个选项而不是选项二,由于它提供了一个更通用的机制,可能具备将来的好处。

持久化事件的目的是,当订单和注册上下文须要有关当前座位配额的信息时,能够回放这些事件,以便计算剩余座位的数量。要一致地计算这些数字,必须始终以相同的顺序回放事件。这种顺序有几种选择:

  • 会议管理限界上下文发送事件的顺序。
  • 订单和注册上下文接收事件的顺序。
  • 订单和注册上下文处理事件的顺序。

大多数状况下,这些顺序是相同的。没有什么正确的顺序。你只须要选择一个和它保持一致就好了。所以,选择由简单性决定。在本例中,最简单的方法是按照订单和注册限界上下文中处理程序接收事件的顺序持久化事件(第二个选项)。

Markus(软件开发人员)发言:
这种选择一般不会出如今事件源中。每一个聚合会都以固定的顺序建立事件,这就是系统用于持久存储事件的顺序。在此场景中,集成事件不是由单个聚合建立的。

为这些事件保存时间戳也有相似的问题。若是未来须要查看特定时间剩余的座位数量,那么时间戳可能会颇有用。这里的选择是,当事件在会议管理限界上下文中建立时,仍是在订单和注册限界上下文中接收时,应该建立时间戳?当会议管理限界上下文建立事件时,订单和注册限界上下文可能因为某种缘由离线。所以,团队决定在会议管理有界上下文发布事件时建立时间戳。

消息排序

团队建立并运行来验证V1版本的验收测试,凸显出了消息排序的一个潜在问题:执行会议管理限界上下文的验收测试向订单和注册限界上下文发送了一系列命令,这些命令有时会出现顺序错误。

Markus(软件开发人员)发言:
当人类用户真实测试系统的这一部分时,不太会注意到这种效果,由于发出命令的时间间隔要长得多,这使得消息不太可能无序地到达。

团队考虑了两种方法来确保消息以正确的顺序到达。

  • 第一个方法是使用消息会话,这是Azure服务总线的一个特性。若是您使用消息会话,这将确保会话内的消息以与它们发送时相同的顺序传递。
  • 第二种方法是修改应用程序中的处理程序,经过使用发送消息时添加到消息中的序列号或时间戳来检测无序消息。若是接收处理程序检测到一条无序消息,它将拒绝该消息,并在处理了在被拒绝消息以前发送的消息以后,将其放回稍后处理的队列或Topic。

在这种状况下,首选的解决方案是使用Azure服务总线消息会话,由于这只须要对现有代码进行更少的更改。这两种方法都会给消息传递带来一些额外的延迟,可是团队并不认为这会对系统的性能产生显著的影响。

实现细节

本节描述订单和注册限界上下文的实现的一些重要功能。您可能会发现拥有一份代码拷贝颇有用,这样您就能够继续学习了。您能够从Download center下载一个副本,或者在GitHub上查看存储库:github.com/mspnp/cqrs-…。您能够从GitHub上的Tags页面下载V2版本的代码。

备注:不要指望代码示例与参考实现中的代码彻底匹配。本章描述了CQRS过程当中的一个步骤,随着咱们了解更多并重构代码,实现可能会发生变化。
复制代码

**添加对“不须要支付的订单”的支持

作出这一改变有三个具体的目标,它们都是相关的。咱们但愿:

  • 修改RegistrationProcessManager类和相关聚合,以处理不须要支付的订单。
  • 修改UI中的导航,当订单不须要支付时跳过付款步骤。
  • 确保系统在升级到V2以后可以正确地工做,包括使用新事件和旧事件。

RegistrationProcessManager类的更改

在此以前,RegistrationProcessManager类在收到来自UI的注册者已完成支付的通知后发送了一个ConfirmOrderPayment命令。如今,若是有一个不须要支付订单,UI将直接向订单聚合发送一个ConfirmOrder命令。若是订单须要支付,RegistrationProcessManager类在从UI接收到成功支付的通知后,再向订单聚合发送一个ConfirmOrder命令。

Jana(软件架构师)发言:
注意,命令的名称已从ConfirmOrderPayment更改成ConfirmOrder。这反映了订单不须要知道任何关于付款的信息。它只须要知道订单已经确认。相似地,如今有一个新的OrderConfirmed事件用于替代旧的OrderPaymentConfirmed事件。

当订单聚合接收到ConfirmOrder命令时,它将引起一个OrderConfirmed事件。除被持久化外,该事件还由如下对象处理:

  • OrderViewModelGenerator类,它在其中更新读取模型中的订单状态。
  • SeatAssignments聚合,在其中初始化一个新的SeatAssignments实例。
  • RegistrationProcessManager类,它在其中触发一个提交座位预订的命令。

UI的更改

UI中的主要更改是在RegistrationController MVC控制器类中的SpecifyRegistrantAndPaymentDetails action里的。以前,此action方法返回InitiateRegistrationWithThirdPartyProcessorPayment(action result)。如今,若是Order对象的新IsFreeOfCharge属性为true,它将返回一个CompleteRegistrationWithoutPayment(action result)。不然,它返回一个CompleteRegistrationWithThirdPartyProcessorPayment(action result)。

[HttpPost]
public ActionResult SpecifyRegistrantAndPaymentDetails(AssignRegistrantDetails command, string paymentType, int orderVersion)
{
    ...

    var pricedOrder = this.orderDao.FindPricedOrder(orderId);
    if (pricedOrder.IsFreeOfCharge)
    {
        return CompleteRegistrationWithoutPayment(command, orderId);
    }

    switch (paymentType)
    {
        case ThirdPartyProcessorPayment:

            return CompleteRegistrationWithThirdPartyProcessorPayment(command, pricedOrder, orderVersion);

        case InvoicePayment:
            break;

        default:
            break;
    }

    ...
}
复制代码

CompleteRegistrationWithThirdPartyProcessorPayment将用户重定向到ThirdPartyProcessorPayment action,CompleteRegistrationWithoutPayment方法将用户直接重定向到ThankYou action。

数据迁移

会议管理限界上下文在其Azure SQL数据库实例中的PricedOrders表中存储来自订单和注册限界上下文的订单信息。之前,会议管理限界上下文接收OrderPaymentConfirmed事件,如今它接收OrderConfirmed事件,该事件包含一个附加的IsFreeOfCharge属性。这将成为数据库中的一个新列。

Markus(软件开发人员)发言:
在迁移过程当中,咱们不须要修改该表中的现有数据,由于布尔值的默认值为false。全部现有条目都是在系统支持不须要付费的订单以前建立的。

在迁移过程当中,任何正在运行的ConfirmOrderPayment命令均可能丢失,由于它们再也不由订单聚合处理。您应该验证当前的命令总线没有这些命令。

Poe(IT运维人员)发言:
咱们须要仔细计划如何部署V2版本,以便确保全部现有的、正在运行的ConfirmOrderPayment命令都由运行V1版本的工做角色实例处理。

系统将RegistrationProcessManager类实例的状态保存到SQL数据库表中。这个表的架构没有变化。迁移后您将看到的唯一更改是StateValue列中的一个新添加值。这反映了RegistrationProcessManager类中的ProcessState枚举中额外的PaymentConfirmationReceived值,以下面的代码示例所示:

public enum ProcessState
{
    NotStarted = 0,
    AwaitingReservationConfirmation = 1,
    ReservationConfirmationReceived = 2,
    PaymentConfirmationReceived = 3,
}
复制代码

在V1版本中,事件源系统为订单聚合保存的事件包括OrderPaymentConfirmed事件。所以,事件存储区包含此事件类型的实例。在V2版本中,OrderPaymentConfirmed事件被替换为OrderConfirmed事件。

团队决定在V2版本中,当反序列化事件时,不在基础设施级别映射和过滤事件。这意味着,当系统从事件存储中重播这些事件时,处理程序必须同时理解旧事件和新事件。下面的代码示例在SeatAssignmentsHandler类中显示了这一点:

static SeatAssignmentsHandler()
{
    Mapper.CreateMap<OrderPaymentConfirmed, OrderConfirmed>();
}

public SeatAssignmentsHandler(IEventSourcedRepository<Order> ordersRepo, IEventSourcedRepository<SeatAssignments> assignmentsRepo)
{
    this.ordersRepo = ordersRepo;
    this.assignmentsRepo = assignmentsRepo;
}

public void Handle(OrderPaymentConfirmed @event)
{
    this.Handle(Mapper.Map<OrderConfirmed>(@event));
}

public void Handle(OrderConfirmed @event)
{
    var order = this.ordersRepo.Get(@event.SourceId);
    var assignments = order.CreateSeatAssignments();
    assignmentsRepo.Save(assignments);
}
复制代码

您还能够在OrderViewModelGenerator类中看到一样的技术。

Order类中的方法略有不一样,由于这是持久化到事件存储中的事件之一。下面的代码示例显示了Order类中受保护构造函数的一部分:

protected Order(Guid id)
    : base(id)
{
    ...
    base.Handles<OrderPaymentConfirmed>(e => this.OnOrderConfirmed(Mapper.Map<OrderConfirmed>(e)));
    base.Handles<OrderConfirmed>(this.OnOrderConfirmed);
    ...
}
复制代码

Jana(软件架构师)发言:
以这种方式处理旧事件对于这个场景很是简单,由于唯一须要更改的是事件的名称。若是事件的属性也发生了变化,状况会更加复杂。未来,Contoso将考虑在基础设施中进行映射,以免遗留事件污染领域模型。

在UI中显示剩余座位

作出这一改变有三个具体的目标,它们都是相关的。咱们想要:

  • 修改系统,在会议系统的读模型中包含每一个座位类型的剩余座位数量信息。
  • 修改UI以显示每种座位类型的剩余座位数量。
  • 确保升级到V2后系统功能正常。

向读模型添加关于剩余座位数量的信息

系统要能显示剩余座位数量的信息来自两个地方:

  • 当业务客户建立新的座位类型或修改座位配额时,会议管理限界上下文将引起SeatCreatedSeatUpdated事件。
  • 在订单和注册限界上下文中,当注册者建立一个订单的时候,可用座位(SeatsAvailability)聚合将引起SeatsReserved、SeatsReservationCancelled和AvailableSeatsChanged事件。

备注:ConferenceViewModelGenerator类不使用SeatCreatedSeatUpdated事件。

订单和注册限界上下文中的ConferenceViewModelGenerator类如今处理这些事件,并使用它们来计算和存储读模型中的座位类型数量。下面的代码示例显示了ConferenceViewModelGenerator类中的相关处理程序:

public void Handle(AvailableSeatsChanged @event)
{
    this.UpdateAvailableQuantity(@event, @event.Seats);
}

public void Handle(SeatsReserved @event)
{
    this.UpdateAvailableQuantity(@event, @event.AvailableSeatsChanged);
}

public void Handle(SeatsReservationCancelled @event)
{
    this.UpdateAvailableQuantity(@event, @event.AvailableSeatsChanged);
}

private void UpdateAvailableQuantity(IVersionedEvent @event, IEnumerable<SeatQuantity> seats)
{
    using (var repository = this.contextFactory.Invoke())
    {
        var dto = repository.Set<Conference>().Include(x => x.Seats).FirstOrDefault(x => x.Id == @event.SourceId);
        if (dto != null)
        {
            if (@event.Version > dto.SeatsAvailabilityVersion)
            {
                foreach (var seat in seats)
                {
                    var seatDto = dto.Seats.FirstOrDefault(x => x.Id == seat.SeatType);
                    if (seatDto != null)
                    {
                        seatDto.AvailableQuantity += seat.Quantity;
                    }
                    else
                    {
                        Trace.TraceError("Failed to locate Seat Type read model being updated with id {0}.", seat.SeatType);
                    }
                }

                dto.SeatsAvailabilityVersion = @event.Version;

                repository.Save(dto);
            }
            else
            {
                Trace.TraceWarning ...
            }
        }
        else
        {
            Trace.TraceError ...
        }
    }
}
复制代码

UpdateAvailableQuantity方法将事件上的版本与读模型的当前版本进行比较,以检测可能的重复消息。

Markus(软件开发人员)发言:
此检查仅检测重复的消息,而不是超出序列的消息。

修改UI以显示剩余的座位数量

如今,当UI向会议的读模型查询座位类型列表时,列表包括当前可用的座位数量。下面的代码示例显示了RegistrationController MVC控制器如何使用SeatType类的AvailableQuantity

private OrderViewModel CreateViewModel()
{
    var seatTypes = this.ConferenceDao.GetPublishedSeatTypes(this.ConferenceAlias.Id);
    var viewModel =
        new OrderViewModel
        {
            ConferenceId = this.ConferenceAlias.Id,
            ConferenceCode = this.ConferenceAlias.Code,
            ConferenceName = this.ConferenceAlias.Name,
            Items =
                seatTypes.Select(
                    s =>
                        new OrderItemViewModel
                        {
                            SeatType = s,
                            OrderItem = new DraftOrderItem(s.Id, 0),
                            AvailableQuantityForOrder = s.AvailableQuantity,
                            MaxSelectionQuantity = Math.Min(s.AvailableQuantity, 20)
                        }).ToList(),
        };

    return viewModel;
}
复制代码

数据迁移

保存会议读模型数据的数据库有一个新列来保存用于检查重复事件的版本号,而保存座位类型读模型数据有一个新列来保存可用的座椅数量。

做为数据迁移的一部分,有必要为每一个可用座位(SeatsAvailability)聚合重放事件存储中的全部事件,以便正确计算可用数量。

不让命令消息重复

系统目前使用Azure服务总线传输消息。当系统从ConferenceProcessor类的启动代码初始化Azure服务总线时,它配置Topic来检测重复的消息,以下面的ServiceBusConfig类的代码示例所示:

private void CreateTopicIfNotExists() 
{     
    var topicDescription =         
        new TopicDescription(this.topic)         
        {             
            RequiresDuplicateDetection = true,
            DuplicateDetectionHistoryTimeWindow = topic.DuplicateDetectionHistoryTimeWindow,         
        };     
    try     
    {         
        this.namespaceManager.CreateTopic(topicDescription);     
    }     
    catch (MessagingEntityAlreadyExistsException) { } 
} 
复制代码
备注:您能够在Settings.xml文件中配置DuplicateDetectionHistoryTimeWindow
能够向Topic元素添加这个属性。默认值是1小时。
复制代码

可是,为了使重复检测工做正常,您必须确保每一个消息都有一个唯一的ID。下面的代码示例显示了MarkSeatsAsReserved命令:

public class MarkSeatsAsReserved : ICommand
{
    public MarkSeatsAsReserved()
    {
        this.Id = Guid.NewGuid();
        this.Seats = new List<SeatQuantity>();
    }

    public Guid Id { get; set; }

    public Guid OrderId { get; set; }

    public List<SeatQuantity> Seats { get; set; }

    public DateTime Expiration { get; set; }
}
复制代码

CommandBus类中的BuildMessage方法使用命令Id建立一个唯一的消息Id, Azure服务总线可使用这个消息Id来检测重复:

private BrokeredMessage BuildMessage(Envelope command) 
{ 
    var stream = new MemoryStream(); 
    ...

    var message = new BrokeredMessage(stream, true);
    if (!default(Guid).Equals(command.Body.Id))
    {
        message.MessageId = command.Body.Id.ToString();
    }

...

    return message;
} 
复制代码

保证消息顺序

团队决定使用Azure服务总线消息会话来保证系统中的消息顺序。

系统从ConferenceProcessor类中的OnStart方法配置Azure服务总线Topic和订阅。Settings.xml配置文件中的配置指定了具体的订阅使用会话。ServiceBusConfig类中的如下代码示例显示了系统如何建立和配置订阅。

private void CreateSubscriptionIfNotExists(NamespaceManager namespaceManager, TopicSettings topic, SubscriptionSettings subscription)
{
    var subscriptionDescription =
        new SubscriptionDescription(topic.Path, subscription.Name)
        {
            RequiresSession = subscription.RequiresSession
        };

    try
    {
        namespaceManager.CreateSubscription(subscriptionDescription);
    }
    catch (MessagingEntityAlreadyExistsException) { }
}
复制代码

如下来自SessionSubscriptionReceiver类的代码示例演示了如何使用会话接收消息:

private void ReceiveMessages(CancellationToken cancellationToken)
{
    while (!cancellationToken.IsCancellationRequested)
    {
        MessageSession session;
        try
        {
            session = this.receiveRetryPolicy.ExecuteAction<MessageSession>(this.DoAcceptMessageSession);
        }
        catch (Exception e)
        {
            ...
        }

        if (session == null)
        {
            Thread.Sleep(100);
            continue;
        }


        while (!cancellationToken.IsCancellationRequested)
        {
            BrokeredMessage message = null;
            try
            {
                try
                {
                    message = this.receiveRetryPolicy.ExecuteAction(() => session.Receive(TimeSpan.Zero));
                }
                catch (Exception e)
                {
                    ...
                }

                if (message == null)
                {
                    // If we have no more messages for this session, exit and try another.
                    break;
                }

                this.MessageReceived(this, new BrokeredMessageEventArgs(message));
            }
            finally
            {
                if (message != null)
                {
                    message.Dispose();
                }
            }
        }

        this.receiveRetryPolicy.ExecuteAction(() => session.Close());
    }
}

private MessageSession DoAcceptMessageSession()
{
    try
    {
        return this.client.AcceptMessageSession(TimeSpan.FromSeconds(45));
    }
    catch (TimeoutException)
    {
        return null;
    }
}
复制代码

Markus(软件开发人员)发言:
您可能会发现,将使用消息会话的ReceiveMessages方法的这个版本与SubscriptionReceiver类中的原始版本进行比较是颇有用的。

您必须确保当你发送消息包含一个会话ID,这样才能使用消息会话接收一条消息。系统使用事件的SourceID做为会话ID,以下面的代码示例所示的EventBus类中的BuildMessage方法:

var message = new BrokeredMessage(stream, true);
message.SessionId = @event.SourceId.ToString();
复制代码

经过这种方式,您能够确保以正确的顺序接收来自单个源的全部消息。

Poe(IT运维人员)发言:
在V2版本中,团队更改了系统建立Azure服务总线Topic和订阅的方式。以前,SubscriptionReceiver类建立了它们(若是它们还不存在)。如今,系统在应用程序启动时使用配置数据建立它们。这发生在启动过程的早期,以免在系统初始化订阅以前将消息发送到Topic时丢失消息的风险。

然而,只有当消息按正确的顺序传递到总线上时,会话才能保证按顺序传递消息。若是系统异步发送消息,则必须特别注意确保消息以正确的顺序放在总线上。在咱们的系统中,来自每一个单独聚合实例的事件按顺序到达是很重要的,可是咱们不关心来自不一样聚合实例的事件的顺序。所以,尽管系统异步发送事件,EventStoreBusPublisher实例仍然会在发送下一个事件以前等待前一个事件已发送的确认。如下来自TopicSender类的示例说明了这一点:

public void Send(Func<BrokeredMessage> messageFactory)
{
    var resetEvent = new ManualResetEvent(false);
    Exception exception = null;
    this.retryPolicy.ExecuteAction(
        ac =>
        {
            this.DoBeginSendMessage(messageFactory(), ac);
        },
        ar =>
        {
            this.DoEndSendMessage(ar);
        },
        () => resetEvent.Set(),
        ex =>
        {
            Trace.TraceError("An unrecoverable error occurred while trying to send a message:\r\n{0}", ex);
            exception = ex;
            resetEvent.Set();
        });

    resetEvent.WaitOne();
    if (exception != null)
    {
        throw exception;
    }
}
复制代码

Jana(软件架构师)发言:
此代码示例展现了系统如何使用Transient Fault Handling Application Block来让异步调用可靠。

有关消息排序和Azure服务总线的更多信息,请参见Microsoft Azure Queues and Microsoft Azure Service Bus Queues - Compared and Contrasted

有关异步发送消息和排序的信息,请参阅博客文章Microsoft Azure Service Bus Splitter and Aggregator

从会议管理限界上下文中持久化事件

团队决定建立一个包含全部发送的命令和事件的消息日志。这将使订单和注册限界上下文可以从会议管理限界上下文查询此日志,以获取其构建读模型所需的事件。这不是事件源,由于咱们没有使用这些事件来重建聚合的状态,尽管咱们使用相似的技术来捕获和持久化这些集成事件。

Gary(CQRS专家)发言:
此消息日志确保不会丢失任何消息,以便未来可以知足其余需求。

向消息添加额外元数据

系统如今将全部消息保存到消息日志中。为了方便查询特定命令或事件,系统如今向每一个消息添加了更多的元数据。之前,唯一的元数据是事件类型,如今,事件元数据包括事件类型、命名空间、程序集和路径。系统将元数据添加到EventBus类中的事件和CommandBus类中的命令中。

捕获消息并将消息持久化到消息日志中

系统使用Azure服务总线中对会议/命令和会议/事件topic的额外订阅来接收系统中每条消息的副本。而后,它将消息附加到Azure表存储中。下面的代码示例显示了AzureMessageLogWriter类的实例,它用于将消息保存到表中:

public class MessageLogEntity : TableServiceEntity 
{ 
    public string Kind { get; set; }     
    public string CorrelationId { get; set; }     
    public string MessageId { get; set; }     
    public string SourceId { get; set; }     
    public string AssemblyName { get; set; }     
    public string Namespace { get; set; }     
    public string FullName { get; set; }     
    public string TypeName { get; set; }     
    public string SourceType { get; set; }     
    public string CreationDate { get; set; }     
    public string Payload { get; set; } 
} 
复制代码

Kind属性指定消息是命令仍是事件。MessageId和CorrelationId属性由消息传递基础设施设置的,其他属性是从消息元数据中设置的。

下面的代码示例显示了这些消息的分区和RowKey的定义:

PartitionKey = message.EnqueuedTimeUtc.ToString("yyyMM"),
RowKey = message.EnqueuedTimeUtc.Ticks.ToString("D20") + "_" + message.MessageId
复制代码

注意,RowKey保存了消息最初发送的顺序,并添加到消息ID上,以确保唯一性,以防两条消息同时入队。

Jana(软件架构师)发言:
这与事件存储不一样,在事件存储区中,分区键标识聚合实例,而RowKey标识聚合的版本号。

数据迁移

当Contoso将系统从V1迁移到V2时,它将使用消息日志在订单和注册限界上下文中重建会议和价格订单的读模型。

Gary(CQRS专家)发言:
Contoso能够在须要重建与聚合无关的事件构建的读模型时来使用消息日志,例如来自会议管理限界上下文的集成事件。

会议读模型包含会议的信息,并包含来自会议管理限界上下文的ConferenceCreated、ConferenceUpdated、ConferencePublished、ConferenceUnpublished、SeatCreated和SeatUpdated事件的信息。

价格订单读模型持有来自于SeatCreated和SeatUpdated事件的信息,这些事件来自于会议管理限界上下文。

然而,在V1中,这些事件消息没有被持久化,所以读模型不能在V2中从新填充。为了解决这个问题,团队实现了一个数据迁移实用程序,它使用一种最佳方法来生成包含要存储在消息日志中的丢失数据的事件。例如,在迁移到V2以后,消息日志不包含ConferenceCreated事件,所以迁移实用程序在会议管理限界上下文使用的数据库中找到这些信息,并建立丢失的事件。您能够在MigrationToV2项目的Migrator类中的GeneratePastEventLogMessagesForConferenceManagement方法中看到这是如何完成的。

Markus(软件开发人员)发言:
您能够在这个类中看到,Contoso还将全部现有的事件源事件复制到消息日志中。

以下面所示,Migrator类中的RegenerateViewModels方法从新构建读取的模型。它经过调用Query方法从消息日志中检索全部事件,而后使用ConferenceViewModelGeneratorPricedOrderViewModelUpdater类来处理消息。

internal void RegenerateViewModels(AzureEventLogReader logReader, string dbConnectionString)
{
    var commandBus = new NullCommandBus();

    Database.SetInitializer<ConferenceRegistrationDbContext>(null);

    var handlers = new List<IEventHandler>();
    handlers.Add(new ConferenceViewModelGenerator(() => new ConferenceRegistrationDbContext(dbConnectionString), commandBus));
    handlers.Add(new PricedOrderViewModelUpdater(() => new ConferenceRegistrationDbContext(dbConnectionString)));

    using (var context = new ConferenceRegistrationMigrationDbContext(dbConnectionString))
    {
        context.UpdateTables();
    }

    try
    {
        var dispatcher = new MessageDispatcher(handlers);
        var events = logReader.Query(new QueryCriteria { });

        dispatcher.DispatchMessages(events);
    }
    catch
    {
        using (var context = new ConferenceRegistrationMigrationDbContext(dbConnectionString))
        {
            context.RollbackTablesMigration();
        }

        throw;
    }
}
复制代码

Jana(软件架构师)发言:
查询可能不会很快,由于它将从多个分区检索实体。

注意这个方法如何使用NullCommandBus实例来接收来自ConferenceViewModelGenerator实例的任何命令,由于咱们只是在这里从新构建读模型。

之前,PricedOrderViewModelGenerator使用ConferenceDao类来获取关于座位的信息。如今,它是自治的,并直接处理SeatCreatedSeatUpdated事件来维护这些信息。做为迁移的一部分,必须将此信息添加到读模型中。在前面的代码示例中,PricedOrderViewModelUpdater类只处理SeatCreatedSeatUpdated事件,并将缺失的信息添加到价格订单读模型中。

从V1迁移到V2

从V1迁移到V2须要更新已部署的应用程序代码并迁移数据。在生产环境中执行迁移以前,应该始终在测试环境中演练迁移。如下是所需步骤:

  1. 将V2版本部署到Azure的staging环境中。V2版本有一个MaintenanceMode属性,最初设置为true。在此模式下,应用程序向用户显示一条消息,说明站点当前正在进行维护,而工做角色将不处理消息。
  2. 准备好以后,将V2版本(仍然处于维护模式,MaintenanceMode为true)切换到Azure生产环境中。
  3. 让V1版本(如今在staging环境中运行)运行几分钟,以确保全部正在运行的消息都完成了它们的处理。
  4. 运行迁移程序来迁移数据(参见下面)。
  5. 成功完成数据迁移后,将每种工做角色的MaintenanceMode属性更改成false。
  6. V2版本如今运行在Azure中。

Jana(软件架构师)发言:
团队考虑使用单独的应用程序在升级过程当中向用户显示一条消息,告诉他们站点正在进行维护。然而,在V2版本中使用MaintenanceMode属性提供了一个更简单的过程,并为应用程序添加了一个潜在有用的新特性。

Poe(IT运维人员)发言:
因为对事件存储的更改,不可能执行从V1到V2的无停机升级。然而,团队所作的更改将确保从V2迁移到V3将不须要停机时间。

Markus(软件开发人员)发言:
团队对迁移实用程序应用了各类优化,例如批处理操做,以最小化停机时间。

下面几节总结了从V1到V2的数据迁移。这些步骤中的一些在前面已经讨论过,涉及到应用程序的特定更改或加强。

团队为V2引入的一个更改是,将全部命令和事件消息的副本保存在消息日志中,以便做为将来的证据,经过捕获未来可能使用的全部内容来保证应用程序的安全性。迁移过程考虑到了这个新特性。

由于迁移过程复制了大量的数据,因此您应该在Azure工做角色中运行迁移过程,以最小化成本。迁移实用程序是一个控制台应用程序,所以您可使用Azure和远程桌面服务。有关如何在Azure角色实例中运行应用程序的信息,请参见Using Remote Desktop with Microsoft Azure Roles

Poe(IT运维人员)发言:
在一些组织中,安全策略不容许您在Azure生产环境使用远程桌面服务。可是,您只须要一个在迁移期间承载远程桌面会话的工做角色,您能够在迁移完成后删除它。您还能够将迁移代码做为工做角色而不是控制台应用程序运行,并确保它记录迁移的状态,以便您验证。

为会议管理限界上下文生成过去的日志消息

迁移过程的一部分是在可能的状况下从新建立V1版本处理后丢弃的消息,而后将它们添加到消息日志中。在V1版本中,全部从会议管理限界上下文发送到订单和注册限界上下文的集成事件都以这种方式丢失了。系统不能从新建立全部丢失的事件,但能够建立表示迁移时系统状态的事件。

有关更多信息,请参见本章前面的“从会议管理限界上下文中持久化事件”一节。

迁移事件源里的事件

在V2版本中,事件存储为每一个事件存储额外的元数据,以便于查询事件。迁移过程将全部事件从现有事件存储复制到具备新模式的新事件存储。

Jana(软件架构师)发言:
原始事件不会以任何方式更新,而是被视为不可变的。

同时,系统将全部这些事件的副本添加到V2版本中引入的消息日志中。

有关更多信息,请参见MigrationToV2项目中Migrator类中的MigrateEventSourcedAndGeneratePastEventLogs

重建读模型**

V2版本包括对订单和注册限界上下文中读模型定义的几个更改。MigrationToV2项目在订单和注册限界上下文中从新构建会议的读模型和价格订单的读模型。

有关更多信息,请参见本章前面的“从会议管理限界上下文中持久化事件”一节。

对测试的影响

在这个过程的这个阶段,测试团队继续扩展验收测试集合。他们还建立了一组测试来验证数据迁移过程。

再说SpecFlow

以前,这组SpecFlow测试以两种方式实现:经过自动化web浏览器模拟用户交互,或者直接在MVC控制器上操做。这两种方法都有各自的优缺点,咱们在第4章“扩展和加强订单和注册限界上下文”中讨论过。

在与另外一位专家讨论了这些测试以后,团队还实现了第三种方法。从领域驱动设计(DDD)方法的角度来看,UI不是领域模型的一部分,核心团队的重点应该是在领域专家的帮助下理解领域,并在领域中实现业务逻辑。UI只是机械部分,用于使用户可以与领域进行交互。所以,验收测试应该包括验证领域模型是否以领域专家指望的方式工做。所以,团队使用SpecFlow建立了一组验收测试,这些测试旨在在不影响系统UI部分的状况下测试领域。

下面的代码示例显示了SelfRegistrationEndToEndWithDomain.feature文件,该文件在Conference.AcceptanceTests项目中的Features\Domain\Registration文件夹里,注意When和Then子句怎么使用命令和事件的。

Gary(CQRS专家)发言:
一般,若是您的领域模型只使用聚合,您会指望When子句发送命令,Then子句查看事件或异常。然而,在本例中,领域模型包含一个经过发送命令来响应事件的流程管理器。测试将检查是否发送了全部预期的命令,并引起了全部预期的事件。

Feature: Self Registrant end to end scenario for making a Registration for a Conference site with Domain Commands and Events
    In order to register for a conference
    As an Attendee
    I want to be able to register for the conference, pay for the Registration Order and associate myself with the paid Order automatically


Scenario: Make a reservation with the selected Order Items
Given the list of the available Order Items for the CQRS summit 2012 conference
    | seat type | rate | quota |
    | General admission | $199 | 100 |
    | CQRS Workshop | $500 | 100 |
    | Additional cocktail party | $50 | 100 |
And the selected Order Items
    | seat type | quantity |
    | General admission | 1 |
    | Additional cocktail party | 1 |
When the Registrant proceeds to make the Reservation
    # command:RegisterToConference
Then the command to register the selected Order Items is received 
    # event: OrderPlaced
And the event for Order placed is emitted
    # command: MakeSeatReservation
And the command for reserving the selected Seats is received
    # event: SeatsReserved
And the event for reserving the selected Seats is emitted
    # command: MarkSeatsAsReserved
And the command for marking the selected Seats as reserved is received
    # event: OrderReservationCompleted 
And the event for completing the Order reservation is emitted
    # event: OrderTotalsCalculated
And the event for calculating the total of $249 is emitted
复制代码

下面的代码示例显示了feature文件的一些步骤实现。这些步骤使用命令总线发送命令。

[When(@"the Registrant proceed to make the Reservation")]
public void WhenTheRegistrantProceedToMakeTheReservation()
{
    registerToConference = ScenarioContext.Current.Get<RegisterToConference>();
    var conferenceAlias = ScenarioContext.Current.Get<ConferenceAlias>();

    registerToConference.ConferenceId = conferenceAlias.Id;
    orderId = registerToConference.OrderId;
    this.commandBus.Send(registerToConference);

    // Wait for event processing
    Thread.Sleep(Constants.WaitTimeout);
}

[Then(@"the command to register the selected Order Items is received")]
public void ThenTheCommandToRegisterTheSelectedOrderItemsIsReceived()
{
    var orderRepo = EventSourceHelper.GetRepository<Registration.Order>();
    Registration.Order order = orderRepo.Find(orderId);

    Assert.NotNull(order);
    Assert.Equal(orderId, order.Id);
}

[Then(@"the event for Order placed is emitted")]
public void ThenTheEventForOrderPlacedIsEmitted()
{
    var orderPlaced = MessageLogHelper.GetEvents<OrderPlaced>(orderId).SingleOrDefault();

    Assert.NotNull(orderPlaced);
    Assert.True(orderPlaced.Seats.All(
        os => registerToConference.Seats.Count(cs => cs.SeatType == os.SeatType && cs.Quantity == os.Quantity) == 1));
}
复制代码

在迁移过程当中发现的bug

当测试团队在迁移以后在系统上运行测试时,咱们发现订单和注册限界上下文中座位类型的数量与迁移以前的数量不一样。调查揭示了如下缘由。

若是会议从未发布过,则会议管理限界上下文容许业务客户删除座位类型,但不会引起集成事件向订单和注册限界上下文报告这一状况。因此,当业务客户建立新的座位类型时,订单和注册限界上下文从会议管理限界上下文接收事件,而不是当业务客户删除座位类型时。

迁移过程的一部分建立一组集成事件,以替换V1版本处理后丢弃的事件。它经过读取会议管理限界上下文使用的数据库来建立这些事件。此过程没有为已删除的座位类型建立集成事件。

总之,在V1版本中,已删除的座位类型错误地出如今订单和注册限界上下文的读模型中。在迁移到V2版本以后,这些已删除的座位类型没有出如今订单和注册限界上下文的读模型中。

Poe(IT运维人员)发言:
测试迁移过程不只验证迁移是否按预期运行,并且可能揭示应用程序自己的bug。

总结

在咱们旅程的这个阶段,咱们对系统进行了版本控制,并完成了V2伪生产版本。这个新版本包含了一些额外的功能和特性,好比支持不须要付费的订单和在UI中显示更多信息。

咱们还对基础设施作了一些改变。例如,咱们使更多的消息具备幂等性,如今持久化集成事件。下一章将描述咱们旅程的最后阶段,咱们将继续加强基础设施,并在准备发布V3版本时增强系统。

相关文章
相关标签/搜索