咱们先来看看CQRS架构,你对下图的架构还有印象吗?每一个组件的功能都还清楚吗?若是有疑问,请查考文章《微服务实战(五):落地微服务架构到直销系统(构建高性能大并发系统)》。前端
1.前端用户调用一个下单Command WebApi,传递下单命令;下单Command WebApi接受到下单命令后,将下单命令数据投递到一个命令队列中,向前端用户返回一个信息。web
2.下单Command Handler WebApi侦听命令队列中的下单命令,而后调用领域对象逻辑,将执行的结果也就是Order对象的当前状态持久化到Event Store中。json
3.下单Command Handler WebApi将下单相关信息经过事件的方式发布到一个事件队列中。(用户事件处理器最终将订单信息更新到业务库中)api
public class CreateOrderCommand:BaseEvent
{
public OrderDTO orderdto { get; set; }
public CreateOrderCommand() { }
public CreateOrderCommand(OrderDTO orderdto)
{
this.orderdto = orderdto;
this.AggregationRootId = Guid.NewGuid();
this.AssemblyQualifiedAggreateRooType = typeof(Orders).AssemblyQualifiedName;
this.AssemblyQualifiedCommandAndEventType = this.GetType().AssemblyQualifiedName;
}
}
复制代码
[Produces("application/json")]
[Route("api/Order")]
public class OrderController : Controller
{
private readonly IEventBus commandbus;
public OrderController(IEventBus commandbus)
{
this.commandbus = commandbus;
}
[HttpPost]
[Route("CreateOrderCmd")]
public ResultEntity<bool> CreateOrderCmd([FromBody] OrderDTO orderdto)
{
var result = new ResultEntity<bool>();
try
{
var createordercommand = new CreateOrderCommand(orderdto);
//发布命令到命令总线
commandbus.Publish(createordercommand);
result.IsSuccess = true;
result.Msg = "下单处理中!";
}
catch(Exception error)
{
result.ErrorCode = 200;
result.Msg = error.Message;
}
return result;
}
}
复制代码
固然须要定义要注入的命令总线:微信
public void ConfigureServices(IServiceCollection services)
{
services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1);
//定义要发布命令的命令总线
services.AddSingleton<IEventHandlerExecutionContext>(new EventHandlerExecutionContext(services));
var connectionFactory = new ConnectionFactory { HostName = "localhost" };
services.AddSingleton<IEventBus>(sp => new RabbitMqEB(connectionFactory,
sp.GetRequiredService<IEventHandlerExecutionContext>(), "exchange1", "direct", "ordercreatecommandqueue", 1));
}
复制代码
//侦听订单建立命令队列里的消息
services.AddSingleton<IEventHandlerExecutionContext>(new EventHandlerExecutionContext(services));
var connectionFactory = new ConnectionFactory { HostName = "localhost" };
services.AddSingleton<IEventBus>(sp => new RabbitMqEB(connectionFactory,
sp.GetRequiredService<IEventHandlerExecutionContext>(), "exchange1", "direct", "ordercreatecommandqueue", 2));
//订阅建立订单命令
var commandbuss = app.ApplicationServices.GetServices<IEventBus>();
var commandbus = commandbuss.ToList()[0];
commandbus.Subscribe<CreateOrderCommand, OrderCreateCommandHandler>();
复制代码
public class OrderCreateCommandHandler : IEventHandler
{
private readonly IServiceProvider iserviceprovider;
public OrderCreateCommandHandler()
{
var iwebhost = FindIWebHost.GetWwebHost("OrderCommandHandler.WebApi");
iserviceprovider = iwebhost.Services;
}
public Task<bool> HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent
{
var orderdtocommand = @event as CreateOrderCommand;
var orderdto = orderdtocommand.orderdto;
var orderid = orderdtocommand.AggregationRootId;
Orders order = new Orders();
var productskus = new List<ProductSKU>();
for (int i = 0; i < orderdto.ProductSPUNames.Count; i++)
{
var productsku = new ProductSKU();
productsku.ProductSPUName = orderdto.ProductSPUNames[i];
productsku.DealerPrice = orderdto.ProductDealerPrices[i];
productsku.PV = orderdto.ProductPVS[i];
productsku.Id = orderdto.ProductSKUIds[i];
productsku.Spec = orderdto.ProductSepcs[i];
productskus.Add(productsku);
}
var contact = new Contact();
contact.ContactName = orderdto.ContactName;
contact.ContactTel = orderdto.ContactTel;
contact.Province = orderdto.Privence;
contact.City = orderdto.City;
contact.Zero = orderdto.Zero;
contact.Street = orderdto.Street;
//完成业务逻辑
var orders = order.CreateOrders(orderid, orderdto.DealerId, productskus, orderdto.Counts,
contact);
var ordercreateevent = new OrderCreateEvent();
ordercreateevent.AggregationRootId = orders.Id;
ordercreateevent.AssemblyQualifiedAggreateRooType = orderdtocommand.AssemblyQualifiedAggreateRooType;
ordercreateevent.AssemblyQualifiedCommandAndEventType = orderdtocommand.AssemblyQualifiedCommandAndEventType;
ordercreateevent.CreateDate = orders.OrderDateTime;
ordercreateevent.Id = orders.Id;
ordercreateevent.OrderDateTime = orders.OrderDateTime;
ordercreateevent.OrderDealerId = orders.OrderDealerId;
ordercreateevent.OrderItems = orders.OrderItems;
ordercreateevent.OrderStreet = orders.OrderStreet;
ordercreateevent.OrderTotalPrice = orders.OrderTotalPrice;
ordercreateevent.OrderTotalPV = orders.OrderTotalPV;
ordercreateevent.Code = orders.Code;
ordercreateevent.Telephone = orders.Telephone;
ordercreateevent.Version = 0;
//对建立订单事件持久化事件存储
try
{
new DomainAndEventStorage().SaveEvent(ordercreateevent);
var eventbuss = iserviceprovider.GetServices(typeof(IEventBus))
as IEnumerable<IEventBus>;
var eventbusls = eventbuss.ToList();
var eventbus = eventbusls[1];
//发布到事件队列,用于将来持久化到业务库中
eventbus.Publish(ordercreateevent);
}
catch(Exception error)
{
throw error;
}
return Task.FromResult(true);
}
}
复制代码
微服务实战视频请关注微信公众号:msshcj架构