使用EventNext实现基于事件驱动的业务处理

事件驱动模型相信对你们来讲并不陌生,由于这是一套很是高效的逻辑处理模型,经过事件来驱动接下来须要完成的工做,而不像传统同步模型等待任务完成后再继续!虽然事件驱动有着这样的好处,但在传统设计上基于消息回调的处理方式在业务处理中相对比较麻烦总体设计成本也比较高,因此落地也不容易。EventNext是一个事件驱动的应用框架,它的事件驱动支持接口调用,在一系列的业务接口调用过程当中经过事件驱动调用来完成;简单来讲组件驱动的接口行为是由上一接口行为完成而触发执行,接下来介绍详细介绍一下EventNext和使用。git

NextQueue

EventNext组件有一个核心的事件驱动队列NextQueue,NextQueue和传统的线程队列有着很大的区别;传统队列都是线程不停的执行消息,下一个消息都是线程等待上一个消息完成后再继续。但NextQueue的设计则不是,它的全部消息都基于上一个消息完成来驱动(无论上一个消息的逻辑是同步仍是异步)。实际状况是NextQueue触发任务的消息是启用线程工做外,后面的消息都是基于上一个消息回调执行;NextQueue上的消息执行线程是不肯定性也不须要等待,虽然队列里的消息执行线程不是惟一的,但执行顺序是一致的这也是NextQueue所带来的好处,在有序的状况下确保线程的利用率更高。github

组件使用

在使用组前须要引用组件,Nuget安装以下api

Install-Package EventNext

经过组件制定的业务必须以接口的方式来描述,而业务的调用也是经过接口的方式进行;虽然组件支持以消息回调的方式便不建议这样作,毕竟面向业务接口有着更好的易用性和可维护性。为了确保业务接口方式 的行为知足事件驱动队列的要求 ,全部业务行为方法必须以Task做为返回值;非Task返回值的行为方法都不能被组件注册和调用。安全

接口定义和实现

接口的定义有必定的规则,除了方法返回值是Task外,也不支持同一名称的函数进行重载,若是有须要可使用特定的Attribute来标记对应的名称(out类型参数不被支持)。如下是一个简单的接口定义:多线程

 1     public interface IUserService
 2     {
 3 
 4         Task<int> Income(int value);
 5 
 6         Task<int> Payout(int value);
 7 
 8         Task<int> Amount();
 9 
10     }

业务实现:并发

 1     [Service(typeof(IUserService))]
 2     public class UserService :  IUserService
 3     {
 4         private int mAmount;
 5 
 6         public Task<int> Amount()
 7         {
 8             return Task.FromResult(mAmount);
 9         }
10 
11         public Task<int> Income(int value)
12         {
13             mAmount += value;
14             return Task.FromResult(mAmount);
15         }
16 
17         public Task<int> Payout(int value)
18         {
19             mAmount -= value;
20             return Task.FromResult(mAmount);
21         }
22 
23     }

须要经过ServiceAttribute来描这个类提供那些事件驱动的接口行为。框架

使用

组件经过一个EventCenter的对象来进行逻辑调用,建立该对象并注册相应业务功能的程序集便可:异步

EventCenter eventCenter = new EventCenter();
eventCenter.Register(typeof(Program).Assembly);

定义EventCenter加载逻辑后就能够建立代理接口调用async

var service=EventCenter.Create<IUserService>();
await server.Payout(10);
await server.Income(10);

事件驱动队列分配

组件针对不一样状况的须要,能够给接口实例或方法定义不一样的事件队列配置,主要为如下几种状况函数

默认

由组件内部队列组进行负载状况进行配置,这种分配方式会致使同一接口的方法有可能分配在不一样的队列上;在默认分配下接口实例的方法会存在多线程中同时的运行,所以这种模式的应用并非线程安全。

Actor

Actor相信你们也很熟悉,一种高性能一致性的调度模型;组件支持这种模型的接口实例建立,只须要在建立接口代理的时候指定Actor名称便可

henry = EventCenter.Create<IUserService>("henry");

当指定Actor名称后,这个接口的全部方法调用都会一致性到对应实例的队列中,即全部功能方法线程调用的惟一性;在接口调用返回的时候也会再次切入到其余事件驱动队列,确保Actor内部的工做队列不受响后的应逻辑影响;当使用这种方式时整个Actor实例都是线程安全的。

ThreadPool

这种配置只适用于接口方法,描述方法不管什么状况都从线程池中执行相关代码,此行为的方法非线程安全

1 [ThreadInvoke(ThreadType.ThreadPool)]
2 public Task<int> ThreadInvoke()
3 {
4             mCount++;
5             return mCount.ToTask();
6 }

SingleQueue

这种配置只适用于接口方法,用于描述方法无论那个实例都一致性到一个队列中,此行为的方法内线程安全,不保证对应实例是线程安全.

 1         [ThreadInvoke(ThreadType.SingleQueue)]
 2         public Task<int> GetID([ThreadUniqueID]string name)
 3         {
 4             if (!mValues.TryGetValue(name, out int value))
 5             {
 6                 value = 1;
 7             }
 8             else
 9             {
10                 value++;
11             }
12             mValues[name] = value;
13             return value.ToTask();
14         }

在这配置下还能够再细分,如上面的[ThreadUniqueID]对不一样参数作一致性对列,这个时候name的不一样值会一致性到不一样的事件队列中。

Actor性能对比

组件默认集成了Actor模型,能够经过它实现高并发无锁业务集成,EventNext最大的特色是以接口的方式集成应用,相对于akka.net基于消息接收的模式来讲有着明显的应用优点。在性能上EventNext基于接口的ask机制也比akka.net基于消息receive的ask机制要高,如下是一个简单的对比测试

akak.net

 1  public class UserActor : ReceiveActor
 2     {
 3         public UserActor()
 4         {
 5             Receive<Income>(Income =>
 6             {
 7                 mAmount += Income.Memory;
 8                 this.Sender.Tell(mAmount);
 9             });
10             Receive<Payout>(Outlay =>
11             {
12                 mAmount -= Outlay.Memory;
13                 this.Sender.Tell(mAmount);
14             });
15             Receive<Get>(Outlay =>
16             {
17                 this.Sender.Tell(mAmount);
18             });
19         }
20         private decimal mAmount;
21     }
22     //invoke
23     Income income = new Income { Memory = i };
24     var result = await nbActor.Ask<decimal>(income);
25     Payout payout = new Payout { Memory = i };
26     var result = await nbActor.Ask<decimal>(payout);

Event Next

 1     [Service(typeof(IUserService))]
 2     public class UserService : IUserService
 3     {
 4         private int mAmount;   
 5 
 6         public Task<int> Amount()
 7         {
 8             return Task.FromResult(mAmount);
 9         }
10 
11         public Task<int> Income(int value)
12         {
13             mAmount += value;
14             return Task.FromResult(mAmount);
15         }
16 
17         public Task<int> Payout(int value)
18         {
19             mAmount -= value;
20             return Task.FromResult(mAmount);
21         }
22     }
23     //invoke
24     var result = await nb.Income(i);
25     var result = await nb.Payout(i);

详细测试代码https://github.com/IKende/EventNext/tree/master/samples/EventNext_AkkaNet 在默认配置下不一样并发下的测试结果

 

Event Sourcing

因为事件驱动提倡的业务处理都是异步,这样就带来一个业务事务性的问题,如何确保不一样接口方法业务处理一致性就比较关键了。因为不一样的逻辑在不一样线程中异步进行,因此相对比较好解决的就是在业务处理时引入Event Sourcing.如下就简单介绍一下组件这方面的应用,就不详细介绍了。毕竟 Event Sourcing设计和业务还有着一些关系

 1         public async Task<long> Income(int amount)
 2         {
 3             await EventCenter.WriteEvent(this, null, null, new { History = user.Amount, Change = amount, Value = user.Amount + amount });
 4             user.Amount += amount;
 5             return user.Amount;
 6         }
 7 
 8         public async Task<long> Pay(int amount)
 9         {
10             await EventCenter.WriteEvent(this, null, null, new { History = user.Amount, Change = -amount, Value = user.Amount - amount });
11             user.Amount -= amount;
12             return user.Amount;
13         }

组件提供事件信息的读写接口IEventLogHandler能够经过实现这个接口扩展本身的事件源处理。

使用注意事项

适应async/await

其实整个事件队列都是使用async/await,经过它大大简化了消息和回调函数间不一样数据状态整合的难度。.Net也现有所异步API都支持async/wait

异步化设计你的逻辑

在实现接口逻辑的状况尽量使和异步逻辑方法,在逻辑实施过程当中禁用Task.Wait或一些线程相关Wait的方法,特别不带超时的Wait由于这种操做极容易致使事件驱动队列逻辑被挂起,致使队列没法正常工做;更糟糕的状况可能引发事件队列假死的状况。

传统异步API

因为各类缘由,可能还存在旧的异步API不支持async/wait,出现这状况能够经过TaskCompletionSource来扩展已经有的异步方法支持async/wait

项目地址

https://github.com/IKende/EventNext

相关文章
相关标签/搜索