Tip: 此篇已加入.NET Core微服务基础系列文章索引html
下面的文字来自CAP的Wiki文档:https://github.com/dotnetcore/CAP/wikigit
CAP 是一个在分布式系统中(SOA,MicroService)实现事件总线及最终一致性(分布式事务)的一个开源的 C# 库,她具备轻量级,高性能,易使用等特色。咱们能够轻松的在基于 .NET Core 技术的分布式系统中引入CAP,包括但限于 ASP.NET Core 和 ASP.NET Core on .NET Framework。github
CAP 的应用场景主要有如下两个:sql
CAP 同时支持使用 RabbitMQ 或 Kafka 进行底层之间的消息发送,咱们不须要具有 RabbitMQ 或者 Kafka 的使用经验,仍然能够轻松的将CAP集成到项目中。数据库
CAP 目前支持使用 Sql Server,MySql,PostgreSql 数据库的项目;api
CAP 同时支持使用 EntityFrameworkCore 和 Dapper 的项目,能够根据须要选择不一样的配置方式;网络
CAP的做者为园友savorboard(杨晓东),成都地区的.NET社区领导者,棒棒哒!app
这次试验仍然和上一篇基于MassTransit的案例同样(实际上是我懒得再改,直接拿来复用),共有四个MicroService应用程序,当用户下订单时会经过CAP做为事件总线发布消息,做为订阅者的库存和配送服务会接收到消息并消费消息。这次试验会采用RabbitMQ做为消息队列,采用MSSQL做为关系型数据库(同时CAP也是支持MSSQL的)。异步
准备工做:为全部服务经过NuGet安装CAP及其相关包async
PM> Install-Package DotNetCore.CAP下面是RabbitMQ的支持包
PM> Install-Package DotNetCore.CAP.RabbitMQ下面是MSSQL的支持包
PM> Install-Package DotNetCore.CAP.SqlServer
(1)启动配置:这里主要须要给CAP指定数据库(它会在这个数据库中建立本地消息表Published和Received)以及使用到的消息队列(这里是RabbitMQ)
public void ConfigureServices(IServiceCollection services) { services.AddMvc(); // Repository services.AddScoped<IOrderRepository, OrderRepository>(); // EF DbContext services.AddDbContext<OrderDbContext>(); // Dapper-ConnString services.AddSingleton(Configuration["DB:OrderDB"]); // CAP services.AddCap(x => { x.UseEntityFramework<OrderDbContext>(); // EF x.UseSqlServer(Configuration["DB:OrderDB"]); // SQL Server x.UseRabbitMQ(cfg => { cfg.HostName = Configuration["MQ:Host"]; cfg.VirtualHost = Configuration["MQ:VirtualHost"]; cfg.Port = Convert.ToInt32(Configuration["MQ:Port"]); cfg.UserName = Configuration["MQ:UserName"]; cfg.Password = Configuration["MQ:Password"]; }); // RabbitMQ // Below settings is just for demo x.FailedRetryCount = 2; x.FailedRetryInterval = 5; }); ...... } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime lifetime) { ...... app.UseMvc(); // CAP app.UseCap(); ...... }
(2)Controller:这里会调用Repository去实现业务逻辑和发送消息
[Route("api/Order")] public class OrderController : Controller { public IOrderRepository OrderRepository { get; } public OrderController(IOrderRepository OrderRepository) { this.OrderRepository = OrderRepository; } [HttpPost] public string Post([FromBody]OrderDTO orderDTO) { var result = OrderRepository.CreateOrderByDapper(orderDTO).GetAwaiter().GetResult(); return result ? "Post Order Success" : "Post Order Failed"; } }
(3)Repository:这里实现了两种方式:EF和Dapper(基于ADO.NET),其中EF方式中不须要传transaction(当CAP检测到 Publish 是在EF事务区域内的时候,将使用当前的事务上下文进行消息的存储),而基于ADO.NET方式中须要传transaction(因为不能获取到事务上下文,因此须要用户手动的传递事务上下文到CAP中)。
public class OrderRepository : IOrderRepository { public OrderDbContext DbContext { get; } public ICapPublisher CapPublisher { get; } public string ConnStr { get; } // For Dapper use public OrderRepository(OrderDbContext DbContext, ICapPublisher CapPublisher, string ConnStr) { this.DbContext = DbContext; this.CapPublisher = CapPublisher; this.ConnStr = ConnStr; } public async Task<bool> CreateOrderByEF(IOrder order) { using (var trans = DbContext.Database.BeginTransaction()) { var orderEntity = new Order() { ID = GenerateOrderID(), OrderUserID = order.OrderUserID, OrderTime = order.OrderTime, OrderItems = null, ProductID = order.ProductID // For demo use }; DbContext.Orders.Add(orderEntity); await DbContext.SaveChangesAsync(); // When using EF, no need to pass transaction var orderMessage = new OrderMessage() { ID = orderEntity.ID, OrderUserID = orderEntity.OrderUserID, OrderTime = orderEntity.OrderTime, OrderItems = null, ProductID = orderEntity.ProductID // For demo use }; await CapPublisher.PublishAsync(EventConstants.EVENT_NAME_CREATE_ORDER, orderMessage); trans.Commit(); } return true; } public async Task<bool> CreateOrderByDapper(IOrder order) { using (var conn = new SqlConnection(ConnStr)) { conn.Open(); using (var trans = conn.BeginTransaction()) { // business code here string sqlCommand = @"INSERT INTO [dbo].[Orders](OrderID, OrderTime, OrderUserID, ProductID) VALUES(@OrderID, @OrderTime, @OrderUserID, @ProductID)"; order.ID = GenerateOrderID(); await conn.ExecuteAsync(sqlCommand, param: new { OrderID = order.ID, OrderTime = DateTime.Now, OrderUserID = order.OrderUserID, ProductID = order.ProductID }, transaction: trans); // For Dapper/ADO.NET, need to pass transaction var orderMessage = new OrderMessage() { ID = order.ID, OrderUserID = order.OrderUserID, OrderTime = order.OrderTime, OrderItems = null, MessageTime = DateTime.Now, ProductID = order.ProductID // For demo use }; await CapPublisher.PublishAsync(EventConstants.EVENT_NAME_CREATE_ORDER, orderMessage, trans); trans.Commit(); } } return true; } private string GenerateOrderID() { // TODO: Some business logic to generate Order ID return Guid.NewGuid().ToString(); } private string GenerateEventID() { // TODO: Some business logic to generate Order ID return Guid.NewGuid().ToString(); } }
这里摘抄一段CAP wiki中关于事务的一段介绍:
事务在 CAP 具备重要做用,它是保证消息可靠性的一个基石。 在发送一条消息到消息队列的过程当中,若是不使用事务,咱们是没有办法保证咱们的业务代码在执行成功后消息已经成功的发送到了消息队列,或者是消息成功的发送到了消息队列,可是业务代码确执行失败。
这里的失败缘由多是多种多样的,好比链接异常,网络故障等等。
只有业务代码和CAP的Publish代码必须在同一个事务中,才可以保证业务代码和消息代码同时成功或者失败。
换句话说,CAP会确保咱们这段逻辑中业务代码和消息代码都成功了,才会真正让事务commit。
(1)启动配置:这里主要是指定Subscriber
public void ConfigureServices(IServiceCollection services) { services.AddMvc(); // EF DbContext services.AddDbContext<StorageDbContext>(); // Dapper-ConnString services.AddSingleton(Configuration["DB:StorageDB"]); // Subscriber services.AddTransient<IOrderSubscriberService, OrderSubscriberService>(); // CAP services.AddCap(x => { x.UseEntityFramework<StorageDbContext>(); // EF x.UseSqlServer(Configuration["DB:StorageDB"]); // SQL Server x.UseRabbitMQ(cfg => { cfg.HostName = Configuration["MQ:Host"]; cfg.VirtualHost = Configuration["MQ:VirtualHost"]; cfg.Port = Convert.ToInt32(Configuration["MQ:Port"]); cfg.UserName = Configuration["MQ:UserName"]; cfg.Password = Configuration["MQ:Password"]; }); // RabbitMQ // Below settings is just for demo x.FailedRetryCount = 2; x.FailedRetryInterval = 5; }); ...... } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, IServiceProvider serviceProvider, IHostingEnvironment env, IApplicationLifetime lifetime) { ...... app.UseMvc(); // CAP app.UseCap(); ...... }
(2)实现Subscriber
首先定义一个接口,建议放到公共类库中
public interface IOrderSubscriberService { Task ConsumeOrderMessage(OrderMessage message); }
而后实现这个接口,记得让其实现ICapSubscribe接口,而后咱们就可使用 CapSubscribeAttribute
来订阅 CAP 发布出来的消息。
public class OrderSubscriberService : IOrderSubscriberService, ICapSubscribe { private readonly string _connStr; public OrderSubscriberService(string connStr) { _connStr = connStr; } [CapSubscribe(EventConstants.EVENT_NAME_CREATE_ORDER)] public async Task ConsumeOrderMessage(OrderMessage message) { await Console.Out.WriteLineAsync($"[StorageService] Received message : {JsonHelper.SerializeObject(message)}"); await UpdateStorageNumberAsync(message); } private async Task<bool> UpdateStorageNumberAsync(OrderMessage order) { //throw new Exception("test"); // just for demo use using (var conn = new SqlConnection(_connStr)) { string sqlCommand = @"UPDATE [dbo].[Storages] SET StorageNumber = StorageNumber - 1 WHERE StorageID = @ProductID"; int count = await conn.ExecuteAsync(sqlCommand, param: new { ProductID = order.ProductID }); return count > 0; } } }
*.CAP约定消息端在方法实现的过程当中须要实现幂等性,所谓幂等性就是指用户对于同一操做发起的一次请求或者屡次请求的结果是一致的,不会由于屡次点击而产生了反作用。这里我没有考虑,实际中须要首先进行验证,避免二次更新。
(1)启动配置:与StorageService高度相似,只是使用的不是同一个数据库
(2)实现Subscriber
public class OrderSubscriberService : IOrderSubscriberService, ICapSubscribe { private readonly string _connStr; public OrderSubscriberService(string connStr) { _connStr = connStr; } [CapSubscribe(EventConstants.EVENT_NAME_CREATE_ORDER)] public async Task ConsumeOrderMessage(OrderMessage message) { await Console.Out.WriteLineAsync($"[DeliveryService] Received message : {JsonHelper.SerializeObject(message)}"); await AddDeliveryRecordAsync(message); } private async Task<bool> AddDeliveryRecordAsync(OrderMessage order) { //throw new Exception("test"); // just for demo use using (var conn = new SqlConnection(_connStr)) { string sqlCommand = @"INSERT INTO [dbo].[Deliveries] (DeliveryID, OrderID, ProductID, OrderUserID, CreatedTime) VALUES (@DeliveryID, @OrderID, @ProductID, @OrderUserID, @CreatedTime)"; int count = await conn.ExecuteAsync(sqlCommand, param: new { DeliveryID = Guid.NewGuid().ToString(), OrderID = order.ID, OrderUserID = order.OrderUserID, ProductID = order.ProductID, CreatedTime = DateTime.Now }); return count > 0; } } }
(1)启动3个微服务,Check 数据库表状态
首先会看到在各个数据库中均建立了本地消息表,这两个表的含义以下:
Cap.Published:这个表主要是用来存储 CAP 发送到MQ(Message Queue)的客户端消息,也就是说你使用 ICapPublisher
接口 Publish 的消息内容。
Cap.Received:这个表主要是用来存储 CAP 接收到 MQ(Message Queue) 的客户端订阅的消息,也就是使用 CapSubscribe[]
订阅的那些消息。
而后看看各个表的数据,目前只有库存表有数据,由于咱们要作的只是更新。
(2)经过Postman发一个Post请求
(3)Check控制台输出的日志信息
(4)Check数据库中的业务表和消息表数据:能够看到发送者和接收者都执行成功了,若是其中任何一个参与者发生了异常或者链接不上,CAP会有默认的重试机制(默认是50次最大重试次数,每次重试间隔60s),当失败总次数达到默认失败总次数后,就不会进行重试了,咱们能够在 Dashboard 中查看消息失败的缘由,而后进行人工重试处理。
另外,因为CAP会在数据库中建立消息表,所以不免会考虑到其性能。CAP提供了一个数据清理的机制,默认状况下会每隔一个小时将消息表的数据进行清理删除,避免数据量过多致使性能的下降。清理规则为 ExpiresAt (字段名)不为空而且小于当前时间的数据。
本篇首先简单介绍了一下CAP这个开源项目,而后基于上一篇中的下订单的小案例来进行了基于CAP的改造,并经过一个实例的运行来看到告终果。固然,这个实例并不完美,不少点都没有考虑(好比消息端消费时的幂等性)和失败重试的场景实践等等等等。因为时间和精力的关系,目前只使用到这儿,之后有机会可以应用上会研究下CAP的源码,最后感谢杨晓东为.NET社区带来了一个优秀的开源项目!
Click Here => 点我点我
CAP - GitHub : https://github.com/dotnetcore/CAP
CAP - Wiki : https://github.com/dotnetcore/CAP/wiki
杨晓东,《BASE:一种ACID的替代方案》