当你在处理异步消息时,每一个单独的消息处理程序都是一个单独的handler,每一个handler之间互不影响。这时若是一个消息依赖另外一个消息的状态呢? 这时业务逻辑怎么处理?app
借用咱们上篇文章的业务场景,若是在Ship项目里须要发送一个ShipOrder Command。这个ShipOrder须要依赖Sales.OrderPlaced和Bill.OrderBilled Command的状态,目前咱们的两个单独的Message Handler都没有保持任何的状态字段,因此这时若是咱们须要完成这个业务模型,就须要跟踪他们的状态。框架
这个就是本篇文章要提的saga,定义在NServiceBus框架里,他的本质是一个消息驱动模型里的状态机,或者也能够理解为一系列消息处理程序用来共享状态的业务模型。我理解在消息队列里若是咱们要保证消息一致性一般会本身建立一张Event表,这里saga维持状态的角色有点像咱们这里的Event表。
好的,回到正题上,若是咱们须要在Shipping Service里发送一个ShipOrder,发送他以前须要肯定OrderPlaced和OrderBilled的状态,确保这两个消息都收到之后才能发送ShipOrder。异步
固然,我暂且理解Saga的目的是为了处理在长时间运行的任务里保证数据一致性这样的一个角色。async
saga状态主要是告诉NServiceBus在处理数据一致性的判断逻辑,这里须要继承抽象类ContainSagaData,在咱们这个业务场景中则主要是判断OrderPlaced和OrderBilled消息是否已经接收到并处理。ide
public class ShippingPolicyData:ContainSagaData { public string OrderId { get; set; } public bool IsOrderPlaced { get; set; } public bool IsOrderBilled { get; set; } }
有了状态之后,咱们还须要一个“handler”来告诉NServiceBus,在这个handler里主要用来处理消息数据一致性,我看了官方文档后,他们建议咱们这里的handler角色使用Policy后缀命名,固然我觉的也能够用Saga后缀命名,好比ShippingPolicy或者ShippingSaga。
同时这里咱们这个handler觉色还要继承Saga
public class ShipPolicy:Saga<ShippingPolicyData>, IAmStartedByMessages<OrderPlaced>, IAmStartedByMessages<OrderBilled> //均可以建立Saga实例 { private static ILog log = LogManager.GetLogger<ShipPolicy>(); protected override void ConfigureHowToFindSaga(SagaPropertyMapper<ShippingPolicyData> mapper) { mapper.ConfigureMapping<OrderPlaced>(t=>t.OrderId).ToSaga(sagaData=>sagaData.OrderId); mapper.ConfigureMapping<OrderBilled>(t=>t.OrderId).ToSaga(sagaData=>sagaData.OrderId); } public Task Handle(OrderPlaced message, IMessageHandlerContext context) { log.Info("OrderPlaced message received "); this.Data.IsOrderPlaced = true; return ProcessOrder(context); } public Task Handle(OrderBilled message, IMessageHandlerContext context) { log.Info("OrderBilled message received"); this.Data.IsOrderBilled = true; return ProcessOrder(context); } private async Task ProcessOrder(IMessageHandlerContext context) { if (Data.IsOrderBilled && Data.IsOrderPlaced) { await context.SendLocal(new ShipOrder() { OrderId = Data.OrderId }); MarkAsComplete(); } } }
这个类里你会发现还实现了接口IAmStartedByMessages
到这里也就是咱们的OrderPlaced和OrderBIlled消息都收到了,业务逻辑符合要求,能够发送ShipOrder消息了,也就是用户建立了订单,付了款,能够发货了。
code
新建ShipOrder类blog
public class ShipOrder:ICommand { public string OrderId { get; set; } }
新建ShipOrderHandler继承
public class ShipOrderHandler:IHandleMessages<ShipOrder> { private static ILog log = LogManager.GetLogger<ShipOrderHandler>(); public Task Handle(ShipOrder message, IMessageHandlerContext context) { log.Info($"Order [{message.OrderId}] - Successfully shipped"); return Task.CompletedTask; } }
运行Shipping项目,看到下图,则说明程序运行成功,咱们这个业务场景里OrderPlaced消息确定先接受到,OrderBilled消息后接受到。
https://docs.particular.net/tutorials/nservicebus-sagas/1-getting-started/
https://docs.particular.net/nservicebus/sagas/