ASP.NET Core Web API下事件驱动型架构的实现(一):一个简单的实现

很长一段时间以来,我都在思考如何在ASP.NET Core的框架下,实现一套完整的事件驱动型架构。这个问题看上去有点大,其实主要目标是为了实现一个基于ASP.NET Core的微服务,它可以很是简单地订阅来自于某个渠道的事件消息,并对接收到的消息进行处理,于此同时,它还可以向该渠道发送事件消息,以便订阅该事件消息的消费者可以对消息数据作进一步处理。让咱们回顾一下微服务之间通讯的几种方式,分为同步和异步两种。同步通讯最多见的就是RESTful API,并且很是简单轻量,一个Request/Response回环就结束了;异步通讯最多见的就是经过消息渠道,将载有特殊意义的数据的事件消息发送到消息渠道,而对某种类型消息感兴趣的消费者,就能够获取消息中所带信息并执行相应操做,这也是咱们比较熟知的事件驱动架构的一种表现形式。虽然事件驱动型架构看起来很是复杂,从微服务的实现来看显得有些繁重,但它的应用范围确实很广,也为服务间通讯提供了新的思路。了解DDD的朋友相信必定知道CQRS体系结构模式,它就是一种事件驱动型架构。事实上,实现一套完整的、安全的、稳定的、正确的事件驱动架构并不简单,因为异步特性带来的一致性问题会很是棘手,甚至须要借助一些基础结构层工具(好比关系型数据库,不错!只能是关系型数据库)来解决一些特殊问题。本文就打算带领你们一块儿探探路,基于ASP.NET Core Web API实现一个相对比较简单的事件驱动架构,而后引出一些有待深刻思考的问题,留在从此的文章中继续讨论。或许,本文所引入的源代码没法直接用于生产环境,但我但愿本文介绍的内容可以给到读者一些启发,并可以帮助解决实际中遇到的问题。git

术语约定

本文会涉及一些相关的专业术语,在此先做约定:github

  • 事件:在某一特定时刻发生在某件事物上的一件事情,例如:在我撰写本文的时候,电话铃响了
  • 消息:承载事件数据的实体。事件的序列化/反序列化和传输都以消息的形式进行
  • 消息通讯渠道:一种带有消息路由功能的数据传输机制,用以在消息的派发器和订阅器之间进行数据传输

注意:为了迎合描述的须要,在下文中可能会混用事件和消息两个概念。sql

一个简单的设计

先从简单的设计开始,基本上事件驱动型架构会有事件消息(Events)、事件订阅器(Event Subscriber)、事件派发器(Event Publisher)、事件处理器(Event Handler)以及事件总线(Event Bus)等主要组件,它们之间的关系大体以下:数据库

class_diagram

首先,IEvent接口定义了事件消息(更确切地说,数据)的基本结构,几乎全部的事件都会有一个惟一标识符(Id)和一个事件发生的时间(Timestamp),这个时间一般使用UTC时间做为标准。IEventHandler定义了事件处理器接口,显而易见,它包含两个方法:CanHandle方法,用以肯定传入的事件对象是否可被当前处理器所处理,以及Handle方法,它定义了事件的处理过程。IEvent和IEventHandler构成了事件处理的基本元素。编程

而后就是IEventSubscriber与IEventPublisher接口。前者表示实现该接口的类型为事件订阅器,它负责事件处理器的注册,并侦听来自事件通讯渠道上的消息,一旦所得到的消息可以被某个处理器处理,它就会指派该处理器对接收到的消息进行处理。所以,IEventSubscriber会保持着对事件处理器的引用;而对于实现了IEventPublisher接口的事件派发器而言,它的主要任务就是将事件消息发送到消息通讯渠道,以便订阅端可以得到消息并进行处理。api

IEventBus接口表示消息通讯渠道,也就是你们所熟知的消息总线的概念。它不只具备消息订阅的功能,并且还具备消息派发的能力,所以,它会同时继承于IEventSubscriber和IEventPublisher接口。在上面的设计中,经过接口分离消息总线的订阅器和派发器的角色是颇有必要的,由于两种角色的各自职责不同,这样的设计同时知足SOLID中的SRP和ISP两个准则。安全

基于以上基础模型,咱们能够很快地将这个对象关系模型转换为C#代码:数据结构

public interface IEvent
{
    Guid Id { get; }
    DateTime Timestamp { get; }
}

public interface IEventHandler
{
    Task<bool> HandleAsync(IEvent @event, CancellationToken cancellationToken = default);
    bool CanHandle(IEvent @event);
}

public interface IEventHandler<in T> : IEventHandler
    where T : IEvent
{
    Task<bool> HandleAsync(T @event, CancellationToken cancellationToken = default);
}

public interface IEventPublisher : IDisposable
{
    Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)
        where TEvent : IEvent;
}

public interface IEventSubscriber : IDisposable
{
    void Subscribe();
}

public interface IEventBus : IEventPublisher, IEventSubscriber { }

短短30行代码,就把咱们的基本对象关系描述清楚了。对于上面的代码咱们须要注意如下几点:架构

  1. 这段代码使用了C# 7.1的新特性(default关键字)
  2. Publish以及Handle方法被替换为支持异步调用的PublishAsync和HandleAsync方法,它们会返回Task对象,这样能够方便使用C#中async/await的编程模型
  3. 因为咱们的这个模型能够做为实现消息系统的通用模型,而且会须要用到ASP.NET Core的项目中,所以,建议将这些接口的定义放在一个独立的NetStandard的Class Library中,方便从此重用和扩展

OK,接口定义好了。实现呢?下面,咱们实现一个很是简单的消息总线:PassThroughEventBus。在从此的文章中,我还会介绍如何基于RabbitMQ和Azure Service Bus实现不同的消息总线。app

PassThroughEventBus

顾名思义,PassThroughEventBus表示当有消息被派发到消息总线时,消息总线将不作任何处理与路由,而是直接将消息推送到订阅方。在订阅方的事件监听函数中,会经过已经注册的事件处理器对接收到的消息进行处理。整个过程并不会依赖于任何外部组件,不须要引用额外的开发库,只是利用现有的.NET数据结构来模拟消息的派发和订阅过程。所以,PassThroughEventBus不具有容错和消息重发功能,不具有消息存储和路由功能,咱们先实现这样一个简单的消息总线,来体验事件驱动型架构的设计过程。

咱们可使用.NET中的Queue或者ConcurrentQueue等基本数据结构来做为消息队列的实现,与这些基本的数据结构相比,消息队列自己有它本身的职责,它须要在消息被推送进队列的同时通知调用方。固然,PassThroughEventBus不须要依赖于Queue或者ConcurrentQueue,它所要作的事情就是模拟一个消息队列,当消息推送进来的时候,马上通知订阅方进行处理。一样,为了分离职责,咱们能够引入一个EventQueue的实现(以下),从而将消息推送和路由的职责(基础结构层的职责)从消息总线中分离出来。

internal sealed class EventQueue
{
    public event System.EventHandler<EventProcessedEventArgs> EventPushed;

    public EventQueue() { }

    public void Push(IEvent @event)
    {
        OnMessagePushed(new EventProcessedEventArgs(@event));
    }

    private void OnMessagePushed(EventProcessedEventArgs e) => this.EventPushed?.Invoke(this, e);
}

EventQueue中最主要的方法就是Push方法,从上面的代码能够看到,当EventQueue的Push方法被调用时,它将马上触发EventPushed事件,它是一个.NET事件,用以通知EventQueue对象的订阅者,消息已经被派发。整个EventQueue的实现很是简单,咱们仅专一于事件的路由,彻底没有考虑任何额外的事情。

接下来,就是利用EventQueue来实现PassThroughEventBus。毫无悬念,PassThroughEventBus须要实现IEventBus接口,它的两个基本操做分别是Publish和Subscribe。在Publish方法中,会将传入的事件消息转发到EventQueue上,而Subscribe方法则会订阅EventQueue.EventPushed事件(.NET事件),而在EventPushed事件处理过程当中,会从全部已注册的事件处理器(Event Handlers)中找到可以处理所接收到的事件,并对其进行处理。整个流程仍是很是清晰的。如下即是PassThroughEventBus的实现代码:

public sealed class PassThroughEventBus : IEventBus
{
    private readonly EventQueue eventQueue = new EventQueue();
    private readonly IEnumerable<IEventHandler> eventHandlers;

    public PassThroughEventBus(IEnumerable<IEventHandler> eventHandlers)
    {
        this.eventHandlers = eventHandlers;
    }

    private void EventQueue_EventPushed(object sender, EventProcessedEventArgs e)
        => (from eh in this.eventHandlers
            where eh.CanHandle(e.Event)
            select eh).ToList().ForEach(async eh => await eh.HandleAsync(e.Event));

    public Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)
        where TEvent : IEvent
            => Task.Factory.StartNew(() => eventQueue.Push(@event));

    public void Subscribe()
        => eventQueue.EventPushed += EventQueue_EventPushed;


    #region IDisposable Support
    private bool disposedValue = false; // To detect redundant calls
    void Dispose(bool disposing)
    {
        if (!disposedValue)
        {
            if (disposing)
            {
                this.eventQueue.EventPushed -= EventQueue_EventPushed;
            }

            disposedValue = true;
        }
    }
    public void Dispose() => Dispose(true);
    #endregion
}

实现过程很是简单,固然,从这些代码也能够更清楚地了解到,PassThroughEventBus不作任何路由处理,更不会依赖于一个基础结构设施(好比实现了AMQP的消息队列),所以,不要期望可以在生产环境中使用它。不过,目前来看,它对于咱们接下来要讨论的事情仍是会颇有帮助的,至少在咱们引入基于RabbitMQ等实现的消息总线以前。

一样地,请将PassThroughEventBus实如今另外一个NetStandard的Class Library中,虽然它不须要额外的依赖,但它毕竟是众多消息总线中的一种,将它从接口定义的程序集中剥离开来,好处有两点:第一,保证了定义接口的程序集的纯净度,使得该程序集不须要依赖任何外部组件,并确保了该程序集的职责单一性,即为消息系统的实现提供基础类库;第二,将PassThroughEventBus置于独立的程序集中,有利于调用方针对IEventBus进行技术选择,好比,若是开发者选择使用基于RabbitMQ的实现,那么,只须要引用基于RabbitMQ实现IEventBus接口的程序集就能够了,而无需引用包含了PassThroughEventBus的程序集。这一点我以为能够概括为框架设计中“隔离依赖关系(Dependency Segregation)”的准则。

好了,基本组件都定义好了,接下来,让咱们一块儿基于ASP.NET Core Web API来作一个RESTful服务,并接入上面的消息总线机制,实现消息的派发和订阅。

Customer RESTful API

咱们仍然以客户管理的RESTful API为例子,不过,咱们不会过多地讨论如何去实现管理客户信息的RESTful服务,那并非本文的重点。做为一个案例,我使用ASP.NET Core 2.0 Web API创建了这个服务,使用Visual Studio 2017 15.5作开发,并在CustomersController中使用Dapper来对客户信息CRUD。后台基于SQL Server 2017 Express Edition,使用SQL Server Management Studio可以让我方便地查看数据库操做的结果。

RESTful API的实现

假设咱们的客户信息只包含客户ID和名称,下面的CustomersController代码展现了咱们的RESTful服务是如何保存并读取客户信息的。固然,我已经将本文的代码经过Github开源,开源协议为MIT,虽然商业友好,但毕竟是案例代码没有通过测试,因此请谨慎使用。本文源代码的使用我会在文末介绍。

[Route("api/[controller]")]
public class CustomersController : Controller
{
    private readonly IConfiguration configuration;
    private readonly string connectionString;

    public CustomersController(IConfiguration configuration)
    {
        this.configuration = configuration;
        this.connectionString = configuration["mssql:connectionString"];
    }


    // 获取指定ID的客户信息
    [HttpGet("{id}")]
    public async Task<IActionResult> Get(Guid id)
    {
        const string sql = "SELECT [CustomerId] AS Id, [CustomerName] AS Name FROM [dbo].[Customers] WHERE [CustomerId]=@id";
        using (var connection = new SqlConnection(connectionString))
        {
            var customer = await connection.QueryFirstOrDefaultAsync<Model.Customer>(sql, new { id });
            if (customer == null)
            {
                return NotFound();
            }

            return Ok(customer);
        }
    }

    // 建立新的客户信息
    [HttpPost]
    public async Task<IActionResult> Create([FromBody] dynamic model)
    {
        var name = (string)model.Name;
        if (string.IsNullOrEmpty(name))
        {
            return BadRequest();
        }

        const string sql = "INSERT INTO [dbo].[Customers] ([CustomerId], [CustomerName]) VALUES (@Id, @Name)";
        using (var connection = new SqlConnection(connectionString))
        {
            var customer = new Model.Customer(name);
            await connection.ExecuteAsync(sql, customer);

            return Created(Url.Action("Get", new { id = customer.Id }), customer.Id);
        }
    }
}

代码一如既往的简单,Web API控制器经过Dapper简单地实现了客户信息的建立和返回。咱们不妨测试一下,使用下面的Invoke-RestMethod PowerShell指令,发送Post请求,经过上面的Create方法建立一个用户:

image

能够看到,response中已经返回了新建客户的ID号。接下来,继续使用Invoke-RestMethod来获取新建客户的详细信息:

image

OK,API调试彻底没有问题。下面,咱们将这个案例再扩充一下,咱们但愿这个API在完成客户信息建立的同时,向外界发送一条“客户信息已建立”的事件,并设置一个事件处理器,负责将该事件的详细内容保存到数据库中。

加入事件总线和消息处理机制

首先,咱们在ASP.NET Core Web API项目上,添加对以上两个程序集的引用,而后,按常规作法,在ConfigureServices方法中,将PassThroughEventBus添加到IoC容器中:

public void ConfigureServices(IServiceCollection services)
{
    services.AddMvc();
    services.AddSingleton<IEventBus, PassThroughEventBus>();
}

在此,将事件总线注册为单例(Singleton)服务,是由于它不保存状态。理论上讲,使用单例服务时,须要特别注意服务实例对象的生命周期管理,由于它的生命周期是整个应用程序级别,在程序运行的过程当中,由其引用的对象资源将没法释放,所以,当程序结束运行时,须要合理地将这些资源dispose掉。好在ASP.NET Core的依赖注入框架中已经帮咱们处理过了,所以,对于上面的PassThroughEventBus单例注册,咱们不须要过多担忧,程序执行结束并正常退出时,依赖注入框架会自动帮咱们dispose掉PassThroughEventBus的单例实例。那么对于单例实例来讲,咱们是否只须要经过AddSingleton方法进行注册就能够了,而无需关注它是否真的被dispose了呢?答案是否认的,有兴趣的读者能够参考微软的官方文档,在下一篇文章中我会对这部份内容作些介绍。

接下来,咱们须要定义一个CustomerCreatedEvent对象,表示“客户信息已经建立”这一事件信息,同时,再定义一个CustomerCreatedEventHandler事件处理器,用来处理从PassThroughEventBus接收到的事件消息。代码以下,固然也很简单:

public class CustomerCreatedEvent : IEvent
{
    public CustomerCreatedEvent(string customerName)
    {
        this.Id = Guid.NewGuid();
        this.Timestamp = DateTime.UtcNow;
        this.CustomerName = customerName;
    }

    public Guid Id { get; }

    public DateTime Timestamp { get; }

    public string CustomerName { get; }
}

public class CustomerCreatedEventHandler : IEventHandler<CustomerCreatedEvent>
{
    public bool CanHandle(IEvent @event)
        => @event.GetType().Equals(typeof(CustomerCreatedEvent));

    public Task<bool> HandleAsync(CustomerCreatedEvent @event, CancellationToken cancellationToken = default)
    {
        return Task.FromResult(true);
    

    public Task<bool> HandleAsync(IEvent @event, CancellationToken cancellationToken = default)
        => CanHandle(@event) ? HandleAsync((CustomerCreatedEvent)@event, cancellationToken) : Task.FromResult(false);
}

二者分别实现了咱们最开始定义好的IEvent和IEventHandler接口。在CustomerCreatedEventHandler类的第一个HandleAsync重载方法中,咱们暂且让它简单地返回一个true值,表示事件处理成功。下面要作的事情就是,在客户信息建立成功后,向事件总线发送CustomerCreatedEvent事件,以及在ASP.NET Core Web API程序启动的时候,注册CustomerCreatedEventHandler实例,并调用事件总线的Subscribe方法,使其开始侦听事件的派发行为。

因而,CustomerController须要依赖IEventBus,而且在CustomerController.Create方法中,须要经过调用IEventBus的Publish方法将事件发送出去。现对CustomerController的实现作一些调整,调整后代码以下:

[Route("api/[controller]")]
public class CustomersController : Controller
{
    private readonly IConfiguration configuration;
    private readonly string connectionString;
    private readonly IEventBus eventBus;

    public CustomersController(IConfiguration configuration,
        IEventBus eventBus)
    {
        this.configuration = configuration;
        this.connectionString = configuration["mssql:connectionString"];
        this.eventBus = eventBus;
    }

    // 建立新的客户信息
    [HttpPost]
    public async Task<IActionResult> Create([FromBody] dynamic model)
    {
        var name = (string)model.Name;
        if (string.IsNullOrEmpty(name))
        {
            return BadRequest();
        }

        const string sql = "INSERT INTO [dbo].[Customers] ([CustomerId], [CustomerName]) VALUES (@Id, @Name)";
        using (var connection = new SqlConnection(connectionString))
        {
            var customer = new Model.Customer(name);
            await connection.ExecuteAsync(sql, customer);

            await this.eventBus.PublishAsync(new CustomerCreatedEvent(name));

            return Created(Url.Action("Get", new { id = customer.Id }), customer.Id);
        }
    }
    
    // Get方法暂且省略
}

而后,修改Startup.cs中的ConfigureServices方法,将CustomerCreatedEventHandler注册进来:

public void ConfigureServices(IServiceCollection services)
{
    services.AddMvc();

    services.AddTransient<IEventHandler, CustomerCreatedEventHandler>();
    services.AddSingleton<IEventBus, PassThroughEventBus>();
}

而且调用Subscribe方法,开始侦听消息总线:

public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
    var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
    eventBus.Subscribe();

    if (env.IsDevelopment())
    {
        app.UseDeveloperExceptionPage();
    }

    app.UseMvc();
}

OK,如今让咱们在CustomerCreatedEventHandler的HandleAsync方法上设置个断点,按下F5启用Visual Studio 2017调试,而后从新使用Invoke-RestMethod命令发送一个Post请求,能够看到,HandleAsync方法上的断点被命中,同时事件已被正确派发:

image

数据库中的数据也被正确更新:

image

目前还差最后一小步,就是在HandleAsync中,将CustomerCreatedEvent对象的数据序列化并保存到数据库中。固然这也不难,一样能够考虑使用Dapper,或者直接使用ADO.NET,甚至使用比较重量级的Entity Framework Core,均可以实现。那就在此将这个问题留给感兴趣的读者朋友本身搞定啦。

小结

到这里基本上本文的内容也就告一段落了,回顾一下,本文一开始就提出了一种相对简单的消息系统和事件驱动型架构的设计模型,并实现了一个最简单的事件总线:PassThroughEventBus。随后,结合一个实际的ASP.NET Core Web API案例,了解了在RESTful API中实现事件消息派发和订阅的过程,并实现了在事件处理器中,对得到的事件消息进行处理。

然而,咱们还有不少问题须要更深刻地思考,好比:

  • 若是事件处理器须要依赖基础结构层组件,依赖关系如何管理?组件生命周期如何管理?
  • 如何实现基于RabbitMQ或者Azure Service Bus的事件总线?
  • 若是在数据库更新成功后,事件发送失败怎么办?
  • 如何保证事件处理的顺序?

等等。。。在接下来的文章中,我会尽力作更详细的介绍。

源代码的使用

本系列文章的源代码在https://github.com/daxnet/edasample这个Github Repo里,经过不一样的release tag来区分针对不一样章节的源代码。本文的源代码请参考chapter_1这个tag,以下:

image

接下来还将会有chapter_二、chapter_3等这些tag,对应本系列文章的第二部分、第三部分等等。敬请期待。

相关文章
相关标签/搜索