CAP是由咱们园子里的杨晓东大神开发出来的一套分布式事务的决绝方案,是.Net Core Community中的第一个千星项目(目前已经1656 Star),具备轻量级、易使用、高性能等特色。html
https://github.com/dotnetcore/CAPgit
本博客主要针对易用性这一点,展开叙述,一块儿看看CAP如何结合EF Core和RabbitMQ带领小白轻松走入分布式消息队列的世界。github
首先,你须要搭建一套RabbitMQ系统,搭建过程在此再也不叙述,若是你们以为麻烦,能够用我搭好的。sql
HostName: coderayu.cn UserName:guest Password:guest (仅仅可用做实验,数据丢失不负责)数据库
建立Asp.Net Core 项目,并引入Nuget包服务器
你能够运行如下下命令在你的项目中安装 CAP。网络
PM> Install-Package DotNetCore.CAP
若是你的消息队列使用的是 Kafka 的话,你能够:app
PM> Install-Package DotNetCore.CAP.Kafka
若是你的消息队列使用的是 RabbitMQ 的话,你能够:分布式
PM> Install-Package DotNetCore.CAP.RabbitMQ
CAP 提供了 Sql Server, MySql, PostgreSQL 的扩展做为数据库存储:函数
// 按需选择安装你正在使用的数据库 PM> Install-Package DotNetCore.CAP.SqlServer PM> Install-Package DotNetCore.CAP.MySql PM> Install-Package DotNetCore.CAP.PostgreSql
由于我采用的是EF Core,因此首先要建立一个DbContext上下文,代码以下:
public class CapDbContext:DbContext { public CapDbContext(DbContextOptions options) : base(options) { } }
首先须要在ConfigureServices函数中进行相关服务的注入,对应的操做和功能解释以下:
public void ConfigureServices(IServiceCollection services) { //注入DbContext上下文,若是用的是Mysql可能还须要添加Pomelo.EntityFrameworkCore.MySql这个Nuget包 services.AddDbContext<CapDbContext>(options => options.UseMySql("Server=127.0.0.1;Database=testcap;UserId=root;Password=123456;")); //配置CAP services.AddCap(x => { x.UseEntityFramework<CapDbContext>(); //启用操做面板 x.UseDashboard(); //使用RabbitMQ x.UseRabbitMQ(rb => { //rabbitmq服务器地址 rb.HostName = "coderayu.cn"; rb.UserName = "guest"; rb.Password = "guest"; //指定Topic exchange名称,不指定的话会用默认的 rb.ExchangeName = "cap.text.exchange"; }); //设置处理成功的数据在数据库中保存的时间(秒),为保证系统新能,数据会按期清理。 x.SucceedMessageExpiredAfter = 24 * 3600; //设置失败重试次数 x.FailedRetryCount = 5; }); services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1); }
最后还要再Congiure中启用CAP中间件
public void Configure(IApplicationBuilder app, IHostingEnvironment env) { if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } //启用cap中间件 app.UseCap(); app.UseMvc(); }
再程序包管理控制台中依此输入如下命令行
PM> Add-Migration Init
PM> update-database
若是成成功执行,那么打开数据库,就能够看到用来存储CAP发送和接收数据的表格了。
表格中每列的含义以下:
咱们直接在ValuesController的基础上进行改造。
在 Controller 中注入 ICapPublisher
而后使用 ICapPublisher
进行消息发送
private readonly ICapPublisher _publisher; public ValuesController(ICapPublisher publisher) { _publisher = publisher; }
发送消息
[HttpGet] public string Get(string message) { //"cap.test.queue"是发送的消息RouteKey,能够理解为消息管道的名称 _publisher.Publish("cap.test.queue",message); return "发送成功"; }
订阅消息
//"cap.test.queue"为发送消息时的RauteKey,也能够模糊匹配 //详情https://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html [CapSubscribe("cap.test.queue")] public void HandleMessage(string message) { Console.Write(DateTime.Now.ToString()+"收到消息:"+message); }
启动程序后,首先看到CAP启动成功
紧随其后,消费者也就是咱们的订阅方法在RabbitMQ服务器上注册成功。
发送消息,发送成功,以下
发送后,当即在控制台看到了订阅方法输出的结果。
在订阅方法中,若是抛出异常,那么CAP就会认为该条消息处理失败,会自动进行重试,重试次数在前方已经进行了配置。
咱们把订阅方法作一个改动,打印接收的信息到控制台中,并抛出异常
//"cap.test.queue"为发送消息时的RauteKey,也能够模糊匹配 //详情https://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html [CapSubscribe("cap.test.queue")] public void HandleMessage(string message) { Console.WriteLine(DateTime.Now.ToString()+"收到消息:"+message); throw new Exception("测试失败重试"); }
能够看到,当即进行了三次重试
但是在前面,咱们设置的失败重试次数是5次,为何这里只重试三次吗?是否是要叫晓东过来改BUG了呢?固然不是。
观察发现,CAP重试的前三次是当即进行的,然后面的重试,是每隔一段时间进行的,当在分布式通信的过程当中,可能出现了问题确实不会当即修复解决,可能过了必定时间,系统就自动恢复了,如网络抖动。
发送成功了五条消息,成功接收处理了三条,两条处理失败,处理失败的任务,咱们能够直接在面板中进行从新消费,可谓很是方便。
同时,处理失败的消息,点击消息的编号后,能够查看到消息的内容和异常缘由。
CAP如此强大,让消息队列这种高大上产品操做So Easy,学会了CAP,也能够吹牛说,我也懂分布式任务处理啦。
感谢晓东开发出如此强大的项目,同时感谢.Net Core Community。
参考 CAP Github wiki
https://github.com/dotnetcore/CAP/wiki
本博客Demo代码