Saga单词翻译过来是指尤指古代挪威或冰岛讲述冒险经历和英雄业绩的长篇故事,对,这里强调长篇故事。许多系统都存在长时间运行的业务流程,NServiceBus使用基于事件驱动的体系结构将容错性和可伸缩性融入这些业务处理过程当中。
固然一个单一接口调用则算不上一个长时间运行的业务场景,那么若是在给定的用例中有两个或多个调用,则应该考虑数据一致性的问题,这里有可能第一个接口调用成功,第二次调用则可能失败或者超时,Saga的设计以简单而健壮的方式处理这样的业务用例。mysql
先来经过一段代码简单认识一下Saga,在NServiceBus里,使用Saga的话则须要实现抽象类Saga
public class Saga:Saga<State>, IAmStartedByMessages<StartOrder>, IHandleMessages<CompleteOrder> { protected override void ConfigureHowToFindSaga(SagaPropertyMapper<State> mapper) { mapper.ConfigureMapping<StartOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId); mapper.ConfigureMapping<CompleteOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId); } public Task Handle(StartOrder message, IMessageHandlerContext context) { return Task.CompletedTask; } public Task Handle(CompleteOrder message, IMessageHandlerContext context) { MarkAsComplete(); return Task.CompletedTask; } }
长时间运行则意味着有状态,任何涉及多个网络调用的进程都须要一个临时状态,这个临时状态能够存储在内存中,序列化在磁盘中,也能够存储在分布式缓存中。在NServiceBus中咱们定义实体,继承抽象类ContainSagaData便可,默认状况下,全部公开访问的属性都会被持久化。数据库
public class State:ContainSagaData { public Guid OrderId { get; set; } }
在NServiceBus里,处理消息的有两种接口:IHandlerMessages
在前面的代码片断里,咱们看到已经实现了接口IAmStartedByMessages
若是你的业务用例中确实存在无序消息的状况,则还须要业务流程正常轮转,那么则须要多个messaeg都要事先接口IAmStartedByMessages接口,也就是说多个message均可以建立Saga实例。网络
在处理无序消息和多个消息类型的时候,就存在消息丢失的可能,必须在你的Saga状态完成之后,这个Saga实例又收到一条消息,但这时Saga状态已是完结状态,这条消息则仍然须要处理,这里则实现NServiceBus的IHandleSagaNotFound接口。app
public class SagaNotFoundHandler:IHandleSagaNotFound { public Task Handle(object message, IMessageProcessingContext context) { return context.Reply(new SagaNotFoundMessage()); } } public class SagaNotFoundMessage { }
当你的业务用例再也不须要Saga实例时,则调用MarkComplete()来结束Saga实例。这个方法在前面的代码片断中也能够看到,其实本质也就是设置Saga.Complete属性,这是个bool值,你在业务用例中也能够用此值来判断Saga流程是否结束。async
namespace NServiceBus { using System; using System.Threading.Tasks; using Extensibility; public abstract class Saga { /// <summary> /// The saga's typed data. /// </summary> public IContainSagaData Entity { get; set; } public bool Completed { get; private set; } internal protected abstract void ConfigureHowToFindSaga(IConfigureHowToFindSagaWithMessage sagaMessageFindingConfiguration); protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, DateTime at) where TTimeoutMessageType : new() { return RequestTimeout(context, at, new TTimeoutMessageType()); } protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, DateTime at, TTimeoutMessageType timeoutMessage) { if (at.Kind == DateTimeKind.Unspecified) { throw new InvalidOperationException("Kind property of DateTime 'at' must be specified."); } VerifySagaCanHandleTimeout(timeoutMessage); var options = new SendOptions(); options.DoNotDeliverBefore(at); options.RouteToThisEndpoint(); SetTimeoutHeaders(options); return context.Send(timeoutMessage, options); } protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, TimeSpan within) where TTimeoutMessageType : new() { return RequestTimeout(context, within, new TTimeoutMessageType()); } protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, TimeSpan within, TTimeoutMessageType timeoutMessage) { VerifySagaCanHandleTimeout(timeoutMessage); var sendOptions = new SendOptions(); sendOptions.DelayDeliveryWith(within); sendOptions.RouteToThisEndpoint(); SetTimeoutHeaders(sendOptions); return context.Send(timeoutMessage, sendOptions); } protected Task ReplyToOriginator(IMessageHandlerContext context, object message) { if (string.IsNullOrEmpty(Entity.Originator)) { throw new Exception("Entity.Originator cannot be null. Perhaps the sender is a SendOnly endpoint."); } var options = new ReplyOptions(); options.SetDestination(Entity.Originator); context.Extensions.Set(new AttachCorrelationIdBehavior.State { CustomCorrelationId = Entity.OriginalMessageId }); options.Context.Set(new PopulateAutoCorrelationHeadersForRepliesBehavior.State { SagaTypeToUse = null, SagaIdToUse = null }); return context.Reply(message, options); } //这个方法结束saga流程,标记Completed属性 protected void MarkAsComplete() { Completed = true; } void VerifySagaCanHandleTimeout<TTimeoutMessageType>(TTimeoutMessageType timeoutMessage) { var canHandleTimeoutMessage = this is IHandleTimeouts<TTimeoutMessageType>; if (!canHandleTimeoutMessage) { var message = $"The type '{GetType().Name}' cannot request timeouts for '{timeoutMessage}' because it does not implement 'IHandleTimeouts<{typeof(TTimeoutMessageType).FullName}>'"; throw new Exception(message); } } void SetTimeoutHeaders(ExtendableOptions options) { options.SetHeader(Headers.SagaId, Entity.Id.ToString()); options.SetHeader(Headers.IsSagaTimeoutMessage, bool.TrueString); options.SetHeader(Headers.SagaType, GetType().AssemblyQualifiedName); } } }
本机开发环境咱们使用LearningPersistence,可是投产的话则须要使用数据库持久化,这里咱们基于MySQL,SQL持久化须要引入NServiceBus.Persistence.Sql。SQL Persistence会生成几种关系型数据库的sql scripts,而后会根据你的断言配置选择所需数据库,好比SQL Server、MySQL、PostgreSQL、Oracle。
持久化Saga自动建立所需表结构,你只需手动配置便可,配置后编译成功后项目执行目录下会生成sql脚本,文件夹名称是NServiceBus.Persistence.Sql,下面会有Saga子目录。分布式
/* TableNameVariable */ set @tableNameQuoted = concat('`', @tablePrefix, 'Saga`'); set @tableNameNonQuoted = concat(@tablePrefix, 'Saga'); /* Initialize */ drop procedure if exists sqlpersistence_raiseerror; create procedure sqlpersistence_raiseerror(message varchar(256)) begin signal sqlstate 'ERROR' set message_text = message, mysql_errno = '45000'; end; /* CreateTable */ set @createTable = concat(' create table if not exists ', @tableNameQuoted, '( Id varchar(38) not null, Metadata json not null, Data json not null, PersistenceVersion varchar(23) not null, SagaTypeVersion varchar(23) not null, Concurrency int not null, primary key (Id) ) default charset=ascii; '); prepare script from @createTable; execute script; deallocate prepare script; /* AddProperty OrderId */ select count(*) into @exist from information_schema.columns where table_schema = database() and column_name = 'Correlation_OrderId' and table_name = @tableNameNonQuoted; set @query = IF( @exist <= 0, concat('alter table ', @tableNameQuoted, ' add column Correlation_OrderId varchar(38) character set ascii'), 'select \'Column Exists\' status'); prepare script from @query; execute script; deallocate prepare script; /* VerifyColumnType Guid */ set @column_type_OrderId = ( select concat(column_type,' character set ', character_set_name) from information_schema.columns where table_schema = database() and table_name = @tableNameNonQuoted and column_name = 'Correlation_OrderId' ); set @query = IF( @column_type_OrderId <> 'varchar(38) character set ascii', 'call sqlpersistence_raiseerror(concat(\'Incorrect data type for Correlation_OrderId. Expected varchar(38) character set ascii got \', @column_type_OrderId, \'.\'));', 'select \'Column Type OK\' status'); prepare script from @query; execute script; deallocate prepare script; /* WriteCreateIndex OrderId */ select count(*) into @exist from information_schema.statistics where table_schema = database() and index_name = 'Index_Correlation_OrderId' and table_name = @tableNameNonQuoted; set @query = IF( @exist <= 0, concat('create unique index Index_Correlation_OrderId on ', @tableNameQuoted, '(Correlation_OrderId)'), 'select \'Index Exists\' status'); prepare script from @query; execute script; deallocate prepare script; /* PurgeObsoleteIndex */ select concat('drop index ', index_name, ' on ', @tableNameQuoted, ';') from information_schema.statistics where table_schema = database() and table_name = @tableNameNonQuoted and index_name like 'Index_Correlation_%' and index_name <> 'Index_Correlation_OrderId' and table_schema = database() into @dropIndexQuery; select if ( @dropIndexQuery is not null, @dropIndexQuery, 'select ''no index to delete'';') into @dropIndexQuery; prepare script from @dropIndexQuery; execute script; deallocate prepare script; /* PurgeObsoleteProperties */ select concat('alter table ', table_name, ' drop column ', column_name, ';') from information_schema.columns where table_schema = database() and table_name = @tableNameNonQuoted and column_name like 'Correlation_%' and column_name <> 'Correlation_OrderId' into @dropPropertiesQuery; select if ( @dropPropertiesQuery is not null, @dropPropertiesQuery, 'select ''no property to delete'';') into @dropPropertiesQuery; prepare script from @dropPropertiesQuery; execute script; deallocate prepare script; /* CompleteSagaScript */
生成的表结构:
ide
Saga持久化须要依赖NServiceBus.Persistence.Sql。引入后须要实现SqlSaga抽象类,抽象类须要重写ConfigureMapping,配置Saga工做流程业务主键。
public class Saga:SqlSaga<State>, IAmStartedByMessages<StartOrder> { protected override void ConfigureMapping(IMessagePropertyMapper mapper) { mapper.ConfigureMapping<StartOrder>(message=>message.OrderId); } protected override string CorrelationPropertyName => nameof(StartOrder.OrderId); public Task Handle(StartOrder message, IMessageHandlerContext context) { Console.WriteLine($"Receive message with OrderId:{message.OrderId}"); MarkAsComplete(); return Task.CompletedTask; } } static async Task MainAsync() { Console.Title = "Client-UI"; var configuration = new EndpointConfiguration("Client-UI"); //这个方法开启自动建表、自动建立RabbitMQ队列 configuration.EnableInstallers(); configuration.UseSerialization<NewtonsoftSerializer>(); configuration.UseTransport<LearningTransport>(); string connectionString = "server=127.0.0.1;uid=root;pwd=000000;database=nservicebus;port=3306"; var persistence = configuration.UsePersistence<SqlPersistence>(); persistence.SqlDialect<SqlDialect.MySql>(); //配置mysql链接串 persistence.ConnectionBuilder(()=>new MySqlConnection(connectionString)); var instance = await Endpoint.Start(configuration).ConfigureAwait(false); var command = new StartOrder() { OrderId = Guid.NewGuid() }; await instance.SendLocal(command).ConfigureAwait(false); Console.ReadKey(); await instance.Stop().ConfigureAwait(false); }
在消息驱动类型的环境中,虽然传递的无链接特性能够防止在线等待过程当中消耗资源,可是毕竟等待时间须要有一个上线。在NServiceBus里已经提供了Timeout方法,咱们只需订阅便可,能够在你的Handle方法中根据须要订阅Timeout,可参考以下代码:
public class Saga:Saga<State>, IAmStartedByMessages<StartOrder>, IHandleMessages<CompleteOrder>, IHandleTimeouts<TimeOutMessage> { public Task Handle(StartOrder message, IMessageHandlerContext context) { var model=new TimeOutMessage(); //订阅超时消息 return RequestTimeout(context,TimeSpan.FromMinutes(10)); } public Task Handle(CompleteOrder message, IMessageHandlerContext context) { MarkAsComplete(); return Task.CompletedTask; } protected override string CorrelationPropertyName => nameof(StartOrder.OrderId); public Task Timeout(TimeOutMessage state, IMessageHandlerContext context) { //处理超时消息 } protected override void ConfigureHowToFindSaga(SagaPropertyMapper<State> mapper) { mapper.ConfigureMapping<StartOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId); mapper.ConfigureMapping<CompleteOrder>(message=>message.OrderId).ToSaga(saga=>saga.OrderId); } }
//从Timeout的源码看,这个方法是经过设置SendOptions,而后再把当前这个消息发送给本身来实现 protected Task RequestTimeout<TTimeoutMessageType>(IMessageHandlerContext context, TimeSpan within, TTimeoutMessageType timeoutMessage) { VerifySagaCanHandleTimeout(timeoutMessage); var sendOptions = new SendOptions(); sendOptions.DelayDeliveryWith(within); sendOptions.RouteToThisEndpoint(); SetTimeoutHeaders(sendOptions); return context.Send(timeoutMessage, sendOptions); }
NServiceBus由于是商业产品,对分布式消息系统所涉及到的东西都作了实现,包括分布式事务(Outbox)、DTC都有,还有心跳检测,监控都有,全而大,目前咱们用到的也只是NServiceBus里很小的一部分功能。