上一篇【.Net Core微服务入门全纪录(五)——Ocelot-API网关(下)】中已经完成了Ocelot + Consul的搭建,这一篇简单说一下EventBus。html
贴一段引用:前端
事件总线是对观察者(发布-订阅)模式的一种实现。它是一种集中式事件处理机制,容许不一样的组件之间进行彼此通讯而又不须要相互依赖,达到一种解耦的目的。android
若是没有接触过EventBus,可能不太好理解。其实EventBus在客户端开发中应用很是普遍(android,ios,web前端等),用于多个组件(或者界面)之间的相互通讯,懂的人都懂。。。ios
就拿当前的项目举例,咱们有一个订单服务,一个产品服务。客户端有一个下单功能,当用户下单时,调用订单服务的下单接口,那么下单接口须要调用产品服务的减库存接口,这涉及到服务与服务之间的调用。那么服务之间又怎么调用呢?直接RESTAPI?或者效率更高的gRPC?可能这二者各有各的使用场景,可是他们都存在一个服务之间的耦合问题,或者难以作到异步调用。git
试想一下:假设咱们下单时调用订单服务,订单服务须要调用产品服务,产品服务又要调用物流服务,物流服务再去调用xx服务 等等。。。若是每一个服务处理时间须要2s,不使用异步的话,那这种体验可想而知。github
若是使用EventBus的话,那么订单服务只须要向EventBus发一个“下单事件”就能够了。产品服务会订阅“下单事件”,当产品服务收到下单事件时,本身去减库存就行了。这样就避免了两个服务之间直接调用的耦合性,而且真正作到了异步调用。web
既然涉及到多个服务之间的异步调用,那么就不得不提分布式事务。分布式事务并非微服务独有的问题,而是全部的分布式系统都会存在的问题。
关于分布式事务,能够查一下“CAP原则”和“BASE理论”了解更多。当今的分布式系统更多的会追求事务的最终一致性。sql
下面使用国人开发的优秀项目“CAP”,来演示一下EventBus的基本使用。之因此使用“CAP”是由于它既能解决分布式系统的最终一致性,同时又是一个EventBus,它具有EventBus的全部功能!
做者介绍:https://www.cnblogs.com/savorboard/p/cap.htmldocker
在Docker中准备一下须要的环境,首先是数据库,数据库我使用PostgreSQL,用别的也行。CAP支持:SqlServer,MySql,PostgreSql,MongoDB。
关于在Docker中运行PostgreSQL能够看个人另外一篇博客:http://www.javashuo.com/article/p-kiulcfug-ma.html数据库
而后是MQ,这里我使用RabbitMQ,Kafka也能够。
Docker运行RabbitMQ:
docker pull rabbitmq:management docker run -d -p 15672:15672 -p 5672:5672 --name rabbitmq rabbitmq:management
默认用户:guest,密码:guest
环境准备就完成了,Docker就是这么方便。。。
为了模拟以上业务,须要修改大量代码,下面代码若有遗漏的直接去github找。
NuGet安装:
Microsoft.EntityFrameworkCore Microsoft.EntityFrameworkCore.Tools Npgsql.EntityFrameworkCore.PostgreSQL
CAP相关:
DotNetCore.CAP DotNetCore.CAP.RabbitMQ DotNetCore.CAP.PostgreSql
Order.API/Controllers/OrdersController.cs增长下单接口:
[Route("[controller]")] [ApiController] public class OrdersController : ControllerBase { private readonly ILogger<OrdersController> _logger; private readonly IConfiguration _configuration; private readonly ICapPublisher _capBus; private readonly OrderContext _context; public OrdersController(ILogger<OrdersController> logger, IConfiguration configuration, ICapPublisher capPublisher, OrderContext context) { _logger = logger; _configuration = configuration; _capBus = capPublisher; _context = context; } [HttpGet] public IActionResult Get() { string result = $"【订单服务】{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}——" + $"{Request.HttpContext.Connection.LocalIpAddress}:{_configuration["ConsulSetting:ServicePort"]}"; return Ok(result); } /// <summary> /// 下单 发布下单事件 /// </summary> /// <param name="order"></param> /// <returns></returns> [Route("Create")] [HttpPost] public async Task<IActionResult> CreateOrder(Models.Order order) { using (var trans = _context.Database.BeginTransaction(_capBus, autoCommit: true)) { //业务代码 order.CreateTime = DateTime.Now; _context.Orders.Add(order); var r = await _context.SaveChangesAsync() > 0; if (r) { //发布下单事件 await _capBus.PublishAsync("order.services.createorder", new CreateOrderMessageDto() { Count = order.Count, ProductID = order.ProductID }); return Ok(); } return BadRequest(); } } }
Order.API/MessageDto/CreateOrderMessageDto.cs:
/// <summary> /// 下单事件消息 /// </summary> public class CreateOrderMessageDto { /// <summary> /// 产品ID /// </summary> public int ProductID { get; set; } /// <summary> /// 购买数量 /// </summary> public int Count { get; set; } }
Order.API/Models/Order.cs订单实体类:
public class Order { [Key] [DatabaseGenerated(DatabaseGeneratedOption.Identity)] public int ID { get; set; } /// <summary> /// 下单时间 /// </summary> [Required] public DateTime CreateTime { get; set; } /// <summary> /// 产品ID /// </summary> [Required] public int ProductID { get; set; } /// <summary> /// 购买数量 /// </summary> [Required] public int Count { get; set; } }
Order.API/Models/OrderContext.cs数据库Context:
public class OrderContext : DbContext { public OrderContext(DbContextOptions<OrderContext> options) : base(options) { } public DbSet<Order> Orders { get; set; } protected override void OnModelCreating(ModelBuilder modelBuilder) { } }
Order.API/appsettings.json增长数据库链接字符串:
"ConnectionStrings": { "OrderContext": "User ID=postgres;Password=pg123456;Host=host.docker.internal;Port=5432;Database=Order;Pooling=true;" }
Order.API/Startup.cs修改ConfigureServices方法,添加Cap配置:
public void ConfigureServices(IServiceCollection services) { services.AddControllers(); services.AddDbContext<OrderContext>(opt => opt.UseNpgsql(Configuration.GetConnectionString("OrderContext"))); //CAP services.AddCap(x => { x.UseEntityFramework<OrderContext>(); x.UseRabbitMQ("host.docker.internal"); }); }
以上是订单服务的修改。
Product.API/Controllers/ProductsController.cs增长减库存接口:
[Route("[controller]")] [ApiController] public class ProductsController : ControllerBase { private readonly ILogger<ProductsController> _logger; private readonly IConfiguration _configuration; private readonly ICapPublisher _capBus; private readonly ProductContext _context; public ProductsController(ILogger<ProductsController> logger, IConfiguration configuration, ICapPublisher capPublisher, ProductContext context) { _logger = logger; _configuration = configuration; _capBus = capPublisher; _context = context; } [HttpGet] public IActionResult Get() { string result = $"【产品服务】{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}——" + $"{Request.HttpContext.Connection.LocalIpAddress}:{_configuration["ConsulSetting:ServicePort"]}"; return Ok(result); } /// <summary> /// 减库存 订阅下单事件 /// </summary> /// <param name="message"></param> /// <returns></returns> [NonAction] [CapSubscribe("order.services.createorder")] public async Task ReduceStock(CreateOrderMessageDto message) { //业务代码 var product = await _context.Products.FirstOrDefaultAsync(p => p.ID == message.ProductID); product.Stock -= message.Count; await _context.SaveChangesAsync(); } }
Product.API/MessageDto/CreateOrderMessageDto.cs:
/// <summary> /// 下单事件消息 /// </summary> public class CreateOrderMessageDto { /// <summary> /// 产品ID /// </summary> public int ProductID { get; set; } /// <summary> /// 购买数量 /// </summary> public int Count { get; set; } }
Product.API/Models/Product.cs产品实体类:
public class Product { [Key] [DatabaseGenerated(DatabaseGeneratedOption.Identity)] public int ID { get; set; } /// <summary> /// 产品名称 /// </summary> [Required] [Column(TypeName = "VARCHAR(16)")] public string Name { get; set; } /// <summary> /// 库存 /// </summary> [Required] public int Stock { get; set; } }
Product.API/Models/ProductContext.cs数据库Context:
public class ProductContext : DbContext { public ProductContext(DbContextOptions<ProductContext> options) : base(options) { } public DbSet<Product> Products { get; set; } protected override void OnModelCreating(ModelBuilder modelBuilder) { base.OnModelCreating(modelBuilder); //初始化种子数据 modelBuilder.Entity<Product>().HasData(new Product { ID = 1, Name = "产品1", Stock = 100 }, new Product { ID = 2, Name = "产品2", Stock = 100 }); } }
Product.API/appsettings.json增长数据库链接字符串:
"ConnectionStrings": { "ProductContext": "User ID=postgres;Password=pg123456;Host=host.docker.internal;Port=5432;Database=Product;Pooling=true;" }
Product.API/Startup.cs修改ConfigureServices方法,添加Cap配置:
public void ConfigureServices(IServiceCollection services) { services.AddControllers(); services.AddDbContext<ProductContext>(opt => opt.UseNpgsql(Configuration.GetConnectionString("ProductContext"))); //CAP services.AddCap(x => { x.UseEntityFramework<ProductContext>(); x.UseRabbitMQ("host.docker.internal"); }); }
以上是产品服务的修改。
订单服务和产品服务的修改到此就完成了,看着修改不少,其实功能很简单。就是各自增长了本身的数据库表,而后订单服务增长了下单接口,下单接口会发出“下单事件”。产品服务增长了减库存接口,减库存接口会订阅“下单事件”。而后客户端调用下单接口下单时,产品服务会减去相应的库存,功能就这么简单。
关于EF数据库迁移之类的基本使用就不介绍了。使用Docker从新构建镜像,运行订单服务,产品服务:
docker build -t orderapi:1.1 -f ./Order.API/Dockerfile . docker run -d -p 9060:80 --name orderservice orderapi:1.1 --ConsulSetting:ServicePort="9060" docker run -d -p 9061:80 --name orderservice1 orderapi:1.1 --ConsulSetting:ServicePort="9061" docker run -d -p 9062:80 --name orderservice2 orderapi:1.1 --ConsulSetting:ServicePort="9062" docker build -t productapi:1.1 -f ./Product.API/Dockerfile . docker run -d -p 9050:80 --name productservice productapi:1.1 --ConsulSetting:ServicePort="9050" docker run -d -p 9051:80 --name productservice1 productapi:1.1 --ConsulSetting:ServicePort="9051" docker run -d -p 9052:80 --name productservice2 productapi:1.1 --ConsulSetting:ServicePort="9052"
最后 Ocelot.APIGateway/ocelot.json 增长一条路由配置:
好了,进行到这里,整个环境就有点复杂了。确保咱们的PostgreSQL,RabbitMQ,Consul,Gateway,服务实例都正常运行。
服务实例运行成功后,数据库应该是这样的:
产品表种子数据:
cap.published表和cap.received表是由CAP自动生成的,它内部是使用本地消息表+MQ来实现异步确保。
此次使用Postman做为客户端调用下单接口(9070是以前的Ocelot网关端口):
订单库published表:
订单库order表:
产品库received表:
产品库product表:
再试一下:
OK,完成。虽然功能很简单,可是咱们实现了服务的解耦,异步调用,和最终一致性。
注意,上面的例子纯粹是为了说明EventBus的使用,实际中的下单流程绝对不会这么作的!但愿你们不要较真。。。
可能有人会说若是下单成功,可是库存不足致使减库存失败了怎么办,是否是要回滚订单表的数据?若是产生这种想法,说明尚未真正理解最终一致性的思想。首先下单前确定会检查一下库存数量,既然容许下单那么必然是库存充足的。这里的事务是指:订单保存到数据库,和下单事件保存到cap.published表(保存到cap.published表理论上就可以发送到MQ)这两件事情,要么一同成功,要么一同失败。若是这个事务成功,那么就能够认为这个业务流程是成功的,至于产品服务的减库存是否成功那就是产品服务的事情了(理论上也应该是成功的,由于消息已经确保发到了MQ,产品服务必然会收到消息),CAP也提供了失败重试,和失败回调机制。
若是非要数据回滚也是能实现的,CAP的ICapPublisher.Publish方法提供一个callbackName参数,当减库存时,能够触发这个回调。其本质也是经过发布订阅完成,这是不推荐的作法,就不详细说了,有兴趣本身研究一下。
另外,CAP没法保证消息不重复,实际使用中须要本身考虑一下消息的重复过滤和幂等性。
这一篇内容有点多,不知道有没有表达清楚,有问题欢迎评论交流,若有不对之处还望你们指出。
下一篇计划写一下受权认证相关的内容。
代码放在:https://github.com/xiajingren/NetCoreMicroserviceDemo
未完待续...