最新文档地址:https://github.com/dotnetcore/CAP/wikihtml
不少同窗想对CAP的机制以及用法等想有一个详细的了解,因此花了将近两周时间写了这份中文的CAP文档,对 CAP 还不知道的同窗能够先看一下这篇文章。mysql
本文档为 CAP 文献(Wiki),本文献同时提供中文和英文版本,英文版本目前还在翻译中,会放到Github Wiki 中。git
CAP 是一个遵循 .NET Standard 标准库的C#库,用来处理分布式事务以及提供EventBus的功能,她具备轻量级,高性能,易使用等特色。github
目前 CAP 使用的是 .NET Standard 1.6 的标准进行开发,目前最新预览版本已经支持 .NET Standard 2.0.sql
CAP 的应用场景主要有如下两个:数据库
分布式事务是在分布式系统中不可避免的一个硬性需求,而目前的分布式事务的解决方案也无外乎就那么几种,在了解 CAP 的分布式事务方案前,能够阅读如下 这篇文章。json
CAP 没有采用两阶段提交(2PC)这种事务机制,而是采用的 本地消息表+MQ 这种经典的实现方式,这种方式又叫作 异步确保。api
CAP 实现了 EventBus 中的发布/订阅,它具备 EventBus 的全部功能。也就是说你能够像使用 EventBus 同样来使用 CAP,另外 CAP 的 EventBus 是具备高可用性的,这是什么意思呢?网络
CAP 借助于本地消息表来对 EventBus 中的消息进行了持久化,这样能够保证 EventBus 发出的消息是可靠的,当消息队列出现宕机或者链接失败的状况时,消息也不会丢失。架构
使用一下命令来引用CAP的NuGet包:
PM> Install-Package DotNetCore.CAP
根据使用的不一样类型的消息队列,来引入不一样的扩展包:
PM> Install-Package DotNetCore.CAP.RabbitMQ PM> Install-Package DotNetCore.CAP.Kafka
根据使用的不一样类型的数据库,来引入不一样的扩展包:
PM> Install-Package DotNetCore.CAP.SqlServer PM> Install-Package DotNetCore.CAP.MySql
在 ASP.NET Core 程序中,你能够在 Startup.cs
文件 ConfigureServices()
中配置 CAP 使用到的服务:
public void ConfigureServices(IServiceCollection services) { services.AddDbContext<AppDbContext>(); services.AddCap(x => { // If your SqlServer is using EF for data operations, you need to add the following configuration: // Notice: You don't need to config x.UseSqlServer(""") again! x.UseEntityFramework<AppDbContext>(); // If you are using Dapper,you need to add the config: x.UseSqlServer("Your ConnectionStrings"); // If your Message Queue is using RabbitMQ you need to add the config: x.UseRabbitMQ("localhost"); // If your Message Queue is using Kafka you need to add the config: x.UseKafka("localhost"); }); }
在 Configure()
中配置启动 CAP :
public void Configure(IApplicationBuilder app) { app.UseCap(); }
CAP 的 API 接口只有一个,就是 ICapPublisher
接口,你能够从 DI 容器中获取到该接口的实例进行调用。
你可使用 ICapPublisher
接口中的 Publish<T>
或者 PublishAsync<T>
方法来发送消息:
public class PublishController : Controller { private readonly ICapPublisher _publisher; //在构造函数中获取接口实例 public PublishController(ICapPublisher publisher) { _publisher = publisher; } [Route("~/checkAccount")] public async Task<IActionResult> PublishMessage() { await _publisher.PublishAsync("xxx.services.account.check", new Person { Name = "Foo", Age = 11 }); return Ok(); } }
下面是PublishAsync这个接口的签名:
PublishAsync<T>(string name,T object)
默认状况下,在调用此方法的时候 CAP 将在内部建立事务,而后将消息写入到 Cap.Published
这个消息表。
事务在 CAP 具备重要做用,它是保证消息可靠性的一个基石。 在发送一条消息到消息队列的过程当中,若是不使用事务,咱们是没有办法保证咱们的业务代码在执行成功后消息已经成功的发送到了消息队列,或者是消息成功的发送到了消息队列,可是业务代码确执行失败。
这里的失败缘由多是多种多样的,好比链接异常,网络故障等等。
只有业务代码和CAP的Publish代码必须在同一个事务中,才可以保证业务代码和消息代码同时成功或者失败。
如下是两种使用事务进行Publish的代码:
using (var transaction = dbContext.Database.BeginTransaction()) { await _publisher.PublishAsync("xxx.services.account.check", new Person { Name = "Foo", Age = 11 }); // 你的业务代码。 transaction.Commit(); }
你的业务代码能够位于 Publish 以前或者以后,只须要保证在同一个事务。
当CAP检测到 Publish 是在EF事务区域内的时候,将使用当前的事务上下文进行消息的存储。
其中,发送的内容会序列化为Json存储到消息表中。
var connString = "数据库链接字符串"; using (var connection = new MySqlConnection(connString)) { connection.Open(); using (var transaction = connection.BeginTransaction()) { await _publisher.PublishAsync("xxx.services.bar", new Person { Name = "Foo", Age = 11 }, connection, transaction); // 你的业务代码。 transaction.Commit(); } }
在 Dapper 中,因为不能获取到事务上下文,因此须要用户手动的传递事务上下文到CAP中。
注意:消息端在方法实现的过程当中须要实现幂等性。
使用 CapSubscribeAttribute
来订阅 CAP 发布出去的消息。
[CapSubscribe("xxx.services.bar")] public void BarMessageProcessor() { }
这里,你也可使用多个 CapSubscribe[""]
来同时订阅多个不一样的消息 :
[CapSubscribe("xxx.services.bar")] [CapSubscribe("xxx.services.foo")] public void BarAndFooMessageProcessor() { }
其中,xxx.services.bar
为订阅的消息名称,内部实现上,这个名称在不一样的消息队列具备不一样的表明。 在 Kafka 中,这个名称即为 Topic Name。 在RabbitMQ 中,为 RouteKey。
RabbitMQ 中的 RouteKey 支持绑定键表达式写法,有两种主要的绑定键:
*(星号)能够代替一个单词.
# (井号) 能够代替0个或多个单词.
好比在下面这个图中(P为发送者,X为RabbitMQ中的Exchange,C为消费者,Q为队列)
在这个示例中,咱们将发送一条关于动物描述的消息,也就是说 Name(routeKey) 字段中的内容包含 3 个单词。第一个单词是描述速度的(celerity),第二个单词是描述颜色的(colour),第三个是描述哪一种动物的(species),它们组合起来相似:“
. . ”。 而后在使用
CapSubscribe
绑定的时候,Q1绑定为CapSubscribe["*.orange.*"]
, Q2 绑定为CapSubscribe["*.*.rabbit"]
和[CapSubscribe["lazy.#]
。那么,当发送一个名为 "quick.orange.rabbit" 消息的时候,这两个队列将会同时收到该消息。一样名为
lazy.orange.elephant
的消息也会被同时收到。另外,名为 "quick.orange.fox" 的消息将仅会被发送到Q1队列,名为 "lazy.brown.fox" 的消息仅会被发送到Q2。"lazy.pink.rabbit" 仅会被发送到Q2一次,即便它被绑定了2次。"quick.brown.fox" 没有匹配到任何绑定的队列,因此它将会被丢弃。另一种状况,若是你违反约定,好比使用 4个单词进行组合,例如 "quick.orange.male.rabbit",那么它将匹配不到任何的队列,消息将会被丢弃。
可是,假如你的消息名为 "lazy.orange.male.rabbit",那么他们将会被发送到Q2,由于 #(井号)能够匹配 0 或者多个单词。
在 CAP 中,咱们把每个拥有 CapSubscribe[]
标记的方法叫作订阅者,你能够把订阅者进行分组。
组(Group),是订阅者的一个集合,每一组能够有一个或者多个消费者,可是一个订阅者只能属于某一个组。同一个组内的订阅者订阅的消息只能被消费一次。
若是你在订阅的时候没有指定组,CAP会将订阅者设置到一个默认的组 cap.default.group
。
如下是使用组进行订阅的示例:
[CapSubscribe("xxx.services.foo", Group = "moduleA")] public void FooMessageProcessor() { }
这里有几种状况可能须要知道:
① 消息发布的时候订阅方还未启动
Kafka:
当 Kafka 中,发布的消息存储于持久化的日志文件中,因此消息不会丢失,当订阅者所在的程序启动的时候会消费掉这些消息。
RabbitMQ:
在 RabbitMQ 中,应用程序首次启动会建立具备持久化的 Exchange 和 Queue,CAP 会针对每个订阅者Group会新建一个消费者队列,因为首次启动时候订阅者未启动的因此是没有队列的,消息没法进行持久化,这个时候生产者发的消息会丢失。
针对RabbitMQ的消息丢失的问题,有两种解决方式:
i. 部署应用程序以前,在RabbitMQ中手动建立具备durable特性的Exchange和Queue,默认状况他们的名字分别是(cap.default.topic, cap.default.group)。
ii. 提早运行一遍全部实例,让Exchange和Queue初始化。
咱们建议采用第 ii 种方案,由于很容易作到。
② 消息没有任何订阅者
若是你发送了一条个没有被任何订阅者订阅的消息,那么此消息将会被丢弃。
Cap 使用 Microsoft.Extensions.DependencyInjection 进行配置的注入,你也能够依赖于 DI 从json文件中读取配置。
你可使用以下方式来配置 CAP 中的一些配置项,例如
services.AddCap(capOptions => { capOptions.FailedCallback = //... });
CapOptions
提供了一下配置项:
NAME | DESCRIPTION | TYPE | DEFAULT |
---|---|---|---|
PollingDelay | 处理消息的线程默认轮询等待时间(秒) | int | 15 秒 |
QueueProcessorCount | 启动队列中消息的处理器个数 | int | 2 |
FailedMessageWaitingInterval | 轮询失败消息的间隔(秒) | int | 180 秒 |
FailedCallback | 执行失败消息时的回调函数,详情见下文 | Action | NULL |
CapOptions 提供了 FailedCallback
为处理失败的消息时的回调函数。当消息屡次发送失败后,CAP会将消息状态标记为Failed
,CAP有一个专门的处理者用来处理这种失败的消息,针对失败的消息会从新放入到队列中发送到MQ,在这以前若是FailedCallback
具备值,那么将首先调用此回调函数来告诉客户端。
FailedCallback 的类型为 Action<MessageType,string,string>
,第一个参数为消息类型(发送的仍是接收到),第二个参数为消息的名称(name),第三个参数为消息的内容(content)。
CAP 采用的是针对 CapOptions 进行扩展来实现RabbitMQ的配置功能,因此针对 RabbitMQ 的配置用法以下:
services.AddCap(capOptions => { capOptions.UseRabbitMQ(rabbitMQOption=>{ // rabbitmq options. }); });
RabbitMQOptions
提供了有关RabbitMQ相关的配置:
NAME | DESCRIPTION | TYPE | DEFAULT |
---|---|---|---|
HostName | 宿主地址 | string | localhost |
UserName | 用户名 | string | guest |
Password | 密码 | string | guest |
VirtualHost | 虚拟主机 | string | / |
Port | 端口号 | int | -1 |
TopicExchangeName | CAP默认Exchange名称 | string | cap.default.topic |
RequestedConnectionTimeout | RabbitMQ链接超时时间 | int | 30,000 毫秒 |
SocketReadTimeout | RabbitMQ消息读取超时时间 | int | 30,000 毫秒 |
SocketWriteTimeout | RabbitMQ消息写入超时时间 | int | 30,000 毫秒 |
QueueMessageExpires | 队列中消息自动删除时间 | int | (10天) 毫秒 |
CAP 采用的是针对 CapOptions 进行扩展来实现 Kafka 的配置功能,因此针对 Kafka 的配置用法以下:
services.AddCap(capOptions => { capOptions.UseKafka(kafkaOption=>{ // kafka options. // kafkaOptions.MainConfig.Add("", ""); }); });
KafkaOptions
提供了有关 Kafka 相关的配置,因为Kafka的配置比较多,因此此处使用的是提供的 MainConfig 字典来支持进行自定义配置,你能够查看这里来获取对配置项的支持信息。
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
若是你使用的是 EntityFramewrok,你用不到该配置项下的内容。
CAP 采用的是针对 CapOptions 进行扩展来实现 SqlServer 的配置功能,因此针对 SqlServer 的配置用法以下:
services.AddCap(capOptions => { capOptions.UseSqlServer(sqlserverOptions => { // sqlserverOptions.ConnectionString }); });
NAME | DESCRIPTION | TYPE | DEFAULT |
---|---|---|---|
Schema | Cap表架构 | string | Cap |
ConnectionString | 数据库链接字符串 | string | null |
若是你使用的是 EntityFramewrok,你用不到该配置项下的内容。
CAP 采用的是针对 CapOptions 进行扩展来实现 MySql 的配置功能,因此针对 MySql 的配置用法以下:
services.AddCap(capOptions => { capOptions.UseMySql(mysqlOptions => { // mysqlOptions.ConnectionString }); });
NAME | DESCRIPTION | TYPE | DEFAULT |
---|---|---|---|
TableNamePrefix | Cap表名前缀 | string | cap |
ConnectionString | 数据库链接字符串 | string | null |
随着微服务架构的流行,愈来愈多的人在尝试使用微服务来架构他们的系统,而在这其中咱们会遇到例如分布式事务的问题,为了解决这些问题,我没有发现简单而且易于使用的解决方案,因此我决定来打造这样一个库来解决这个问题。
最初 CAP 是为了解决分布式系统中的事务问题,她采用的是 异步确保 这种机制实现了分布式事务的最终一致性,更多这方面的信息能够查看第6节。
如今 CAP 除了解决分布式事务的问题外,她另一个重要的功能就是做为 EventBus 来使用,她具备 EventBus 的全部功能,而且提供了更加简化的方式来处理EventBus中的发布/订阅。
CAP 依靠本地数据库实现消息的持久化,CAP 使用这种方式来应对一切环境或者网络异常致使消息丢失的状况,消息的可靠性是分布式事务的基石,因此在任何状况下消息都不能丢失。
对于消息的持久化分为两种:
① 消息进入消息队列以前的持久化
在消息进入到消息队列以前,CAP使用本地数据库表对消息进行持久化,这样能够保证当消息队列出现异常或者网络错误时候消息是没有丢失的。
为了保证这种机制的可靠性,CAP使用和业务代码相同的数据库事务来保证业务操做和CAP的消息在持久化的过程当中是强一致的。也就是说在进行消息持久化的过程当中,任何一方发生异常状况数据库都会进行回滚操做。
② 消息进入到消息队列以后的持久化
消息进入到消息队列以后,CAP会启动消息队列的持久化功能,咱们须要说明一下在 RabbitMQ 和 Kafka 中CAP的消息是如何持久化的。
针对于 RabbitMQ 中的消息持久化,CAP 使用的是具备消息持久化功能的消费者队列,可是这里面可能有例外状况,参加 2.2.1 章节。
因为 Kafka 天生设计的就是使用文件进行的消息持久化,在因此在消息进入到Kafka以后,Kafka会保证消息可以正确被持久化而不丢失。
CAP 中消息的流转过程大体以下:
“ P ” 表明消息发送者(生产者)。 “ C ” 表明消息消费者(订阅者)。
CAP 采用最终一致性做为的一致性方案,此方案是遵循 CAP 理论,如下是CAP理论的描述。
C(一致性)一致性是指数据的原子性,在经典的数据库中经过事务来保障,事务完成时,不管成功或回滚,数据都会处于一致的状态,在分布式环境下,一致性是指多个节点数据是否一致;
A(可用性)服务一直保持可用的状态,当用户发出一个请求,服务能在必定的时间内返回结果;
P(分区容忍性)在分布式应用中,可能由于一些分布式的缘由致使系统没法运转,好的分区容忍性,使应用虽然是一个分布式系统,可是好像一个能够正常运转的总体
根据 “CAP”分布式理论, 在一个分布式系统中,咱们每每为了可用性和分区容错性,忍痛放弃强一致支持,转而追求最终一致性。大部分业务场景下,咱们是能够接受短暂的不一致的。
第 6 节将对此作进一步介绍。
CAP 封装了在 ASP.NET Core 中的使用依赖注入来获取 Publisher (ICapPublisher
)的接口。而启动方式相似于 “中间件” 的形式,经过在 Startup.cs 配置 ConfigureServices
和 Configure
进行启动。
当系统引入CAP以后并首次启动后,CAP会在客户端生成 3 个表,分别是 Cap.Published, Cap.Received, Cap.Queue。注意表名可能在不一样的数据库具备不一样的大小写区分,若是你在运行项目的时候没有显式的指定数据库生成架构(SQL Server)或者表名前缀(MySql)的话,默认状况下就是以上3个名字。
Cap.Published:这个表主要是用来存储 CAP 发送到MQ(Message Queue)的客户端消息,也就是说你使用 ICapPublisher
接口 Publish 的消息内容。
Cap.Received:这个表主要是用来存储 CAP 接收到 MQ(Message Queue) 的客户端订阅的消息,也就是使用 CapSubscribe[]
订阅的那些消息。
Cap.Queue: 这个表主要是CAP内部用来处理发送和接收消息的一个临时表,一般状况下,若是系统不出现问题,这个表将是空的。
Published
和 Received
表具备 StatusName 字段,这个字段用来标识当前消息的状态。目前共有 Scheduled,Enqueued,Processing,Successed,Failed 等几个状态。CAP 在处理消息的过程当中会依次从 Scheduled 到 Successed 来改变这些消息状态的值。若是是状态值为 Successed,表明该消息已经成功的发送到了 MQ 中。若是为 Failed 则表明消息发送失败,消息发送失败后 CAP 会对消息进行重试,直到成功。
关于数据清理: CAP 默认状况下会每隔一个小时将消息表的数据进行清理删除,避免数据量过多致使性能的下降。清理规则为 ExpiresAt 不为空而且小于当前时间的数据。
CAP 采用 JSON 格式进行消息传输,如下是消息的对象模型:
NAME | DESCRIPTION | TYPE |
---|---|---|
Id | 消息编号 | int |
Name | 消息名称 | string |
Content | 内容 | string |
Group | 所属消费组 | string |
Added | 建立时间 | DateTime |
ExpiresAt | 过时时间 | DateTime |
Retries | 重试次数 | int |
StatusName | 状态 | string |
对于 Cap.Received 中的消息,会多一个
Group
字段来标记所属的消费者组。
EventBus 采用 发布-订阅 风格进行组件之间的通信,它不须要显式在组件中进行注册。
上图是EventBus的一个Event的流程,关于 EventBus 的更多信息就不在这里介绍了...
在 CAP 中,为何说 CAP 实现了 EventBus 中的所有特性,由于 EventBus 具备的两个大功能就是发布和订阅, 在 CAP 中 使用了另一种优雅的方式来实现的,另一个 CAP 提供的强大功能就是消息的持久化,以及在任何异常状况下消息的可靠性,这是EventBus不具备的功能。
CAP 里面发送一个消息能够看作是一个 “Event”,一个使用了CAP的ASP.NET Core 应用程序既能够进行发送也能够进行订阅接收。
重试在实现分布式事务中具备重要做用,CAP 中会针对发送失败或者执行失败的消息进行重试。在整个 CAP 的设计过程当中有如下几处采用的重试策略。
① 消息发送重试
在消息发送过程当中,当出现 Broker 宕机或者链接失败的状况亦或者出现异常的状况下,这个时候 CAP 会对发送的重试,重试策略为默认 15 次失败重试,当15次事后仍然失败时,CAP会将此消息状态标记为失败。
② 消息消费重试
当 Consumer 接收到消息时,会执行消费者方法,在执行消费者方法出现异常时,会进行重试。这个重试策略和 ① 是相同的。
③ 失败消息重试
CAP 会按期针对 ① 和 ② 中状态为“失败的”消息进行重试,CAP会对他们进行从新“入队(Enqueue)”,入队时会将消息中的重试次数标记为0,状态置为 Enqueued。
针对于分布式事务的处理,CAP 采用的是“异步确保”这种方案。
异步确保这种方案又叫作本地消息表,这是一种经典的方案,方案最初来源于 eBay,参考资料见段末连接。这种方案目前也是企业中使用最多的方案之一。
相对于 TCC 或者 2PC/3PC 来讲,这个方案对于分布式事务来讲是最简单的,并且它是去中心化的。在TCC 或者 2PC 的方案中,必须具备事务协调器来处理每一个不一样服务之间的状态,而此种方案不须要事务协调器。
另外 2PC/TCC 这种方案若是服务依赖过多,会带来管理复杂性增长和稳定性风险增大的问题。试想若是咱们强依赖 10 个服务,9 个都执行成功了,最后一个执行失败了,那么是否是前面 9 个都要回滚掉?这个成本仍是很是高的。
可是,并非说 2PC 或者 TCC 这种方案很差,由于每一种方案都有其相对优点的使用场景和优缺点,这里就不作过多介绍了。
中文:http://www.cnblogs.com/savorboard/p/base-an-acid-alternative.html
英文:http://queue.acm.org/detail.cfm?id=1394128
暂无
本文地址:http://www.cnblogs.com/savorboard/p/cap-document.html
做者博客:Savorboard 欢迎转载,请在明显位置给出出处及连接