微服务实战(二):落地微服务架构到直销系统(构建消息总线框架接口)

微服务实战(二):落地微服务架构到直销系统(构建消息总线框架接口)前端

从上一篇文章你们能够看出,实现一个本身的消息总线框架是很是重要的内容,消息总线能够将界限上下文之间进行解耦,也能够为大并发访问提供必要的支持。
图片描述
消息总线的做用:
1.界限上下文解耦:在DDD第一波文章中,当更新了订单信息后,咱们经过调用经销商界限上下文的领域模型和仓储,进行了经销商信息的更新,这形成了耦合。经过一个消息总线,能够在订单界限上下文的WebApi服务(来源微服务-生产者)更新了订单信息后,发布一个事件消息到消息总线的某个队列中,经销商界限上下文的WebApi服务(消费者)订阅这个事件消息,而后交给本身的Handler进行消息处理,更新本身的经销商信息。这样就实现了订单界限上下文与经销商界限上下文解耦。数据库

2.大并发支持:能够经过消息总线进一步提高下单的性能。咱们能够将用户下单的操做直接交给一个下单命令WebApi接收,下单命令WebApi接收到命令后,直接丢给一个消息总线的队列,而后当即给前端返回下单结果。这样用户就不用等待后续的复杂订单业务逻辑,加快速度。后续订单的一系列处理交给消息的Handler进行后续的处理与消息的进一步投递。微信

消息总线设计重点:
1.定义消息(事件)的接口:全部须要投递与处理的消息,都从这个消息接口继承,由于须要约束消息中必须包含的内容,好比消息的ID、消息产生的时间等。
public interface IEvent架构

{
    Guid Id { get; set; }
    DateTime CreateDate { get; set; }
}

2.定义消息(事件)处理器接口:当消息投递到消息总线队列中后,必定有消费者WebApi接收并处理这个消息,具体的处理方法逻辑在订阅方处理器中实现,这里先须要定义处理器的接口,便于在消息总线框架中使用。
public interface IEventHandler并发

{
    Task<bool> HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent;
}

从上面代码能够看出,消息(事件)处理器处理的类型就是从IEvent接口继承的消息类。框架

3.定义消息(事件)与消息(事件)处理器关联接口:一种类型的消息被投递后,必定要在订阅方找到这种消息的处理器进行处理,因此必定要定义两者的关联接口,这样才能将消息与消息处理器对应起来,才能实现消息被订阅后的处理。ssh

public interface IEventHandlerExecutionContextasync

{
    void RegisterEventHandler<TEvent, TEventHandler>() where TEvent : IEvent
        where TEventHandler : IEventHandler;
    bool IsRegisterEventHandler<TEvent, TEventHandler>() where TEvent : IEvent
        where TEventHandler : IEventHandler;
    Task HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent;
}

RegisterEventHandler方法就是创建消息与消息处理器的关联,这个方法实际上是在订阅方使用,订阅方告诉消息总线,什么样的消息应该交给个人哪一个处理器进行处理。
IsRegisterEventHandler方法是判断消息与处理器之间是否已经存在关联。
HandleAsync方法是经过查找到消息对应的处理器后,而后调用处理器本身的Handle方法进行消息的处理.ide

4.定义消息发布、订阅与消息总线接口:消息总线至少要支持两个功能,一个是生产者可以发布消息到个人消息总线,另外一个是订阅方须要可以从我这个消息总线订阅消息。
public interface IEventPublisher微服务

{
    void Publish<TEvent>(TEvent @event) where TEvent : IEvent;
}

从上面代码能够看出,生产者发布的消息仍然要从IEvent继承的类型。
public interface IEventSubscriber

{
    void Subscribe<TEvent, TEventHandler>() where TEvent : IEvent
        where TEventHandler : IEventHandler;
}

上面代码是订阅方用于从消息总线订阅消息,从代码中能够看出,它的最终的实现其实就是创建消息与处理器之间的关联。
public interface IEventBus:IEventPublisher,IEventSubscriber

{
}

消息(事件)总线从两个接口继承下来,同时支持消息的发布与消息的订阅。

5.实现事件基类:上面已经订阅了消息(事件)的接口,这里来实现事件的基类,其实就是实现消息ID与产生的时间:

public class BaseEvent : IEvent

{
    public Guid Id { get; set; }
    public DateTime CreateDate { get; set; }
    public BaseEvent()
    {
        this.Id = Guid.NewGuid();
        this.CreateDate = DateTime.Now;
    }
}

6.实现消息总线基类:消息总线底层的依赖能够是各类消息代理产品,好比RabbitMq、Kafaka或第三方云平台提供的消息代理产品,一般咱们要封装这些消息代理产品。在封装以前,咱们须要定义顶层的消息总线基类实现,主要的目的是将来依赖于它的具体实现可替换,另外也将消息与消息处理器的关联接口传递进来,便于订阅方使用。

public abstract class BaseEventBus : IEventBus

{
    protected readonly IEventHandlerExecutionContext eventHandlerExecutionContext;
    protected BaseEventBus(IEventHandlerExecutionContext eventHandlerExecutionContext)
    {
        this.eventHandlerExecutionContext = eventHandlerExecutionContext;
    }
    public abstract void Publish<TEvent>(TEvent @event)
        where TEvent : IEvent;
    public abstract void Subscribe<TEvent, TEventHandler>()
        where TEvent : IEvent
        where TEventHandler : IEventHandler;
}

7.实现消息与处理器关联:消息必须与处理器关联,订阅方收到特定类型的消息后,才知道交给哪一个处理器处理。

public class EventHandlerExecutionContext : IEventHandlerExecutionContext

{
    private readonly IServiceCollection registry;
    private readonly IServiceProvider serviceprovider;
    private Dictionary<Type, List<Type>> registrations = new Dictionary<Type, List<Type>>();
    public EventHandlerExecutionContext(IServiceCollection registry,Func<IServiceCollection,
        IServiceProvider> serviceProviderFactory = null)
    {
        this.registry = registry;
        this.serviceprovider = this.registry.BuildServiceProvider();
    }

   //查找消息关联的处理器,而后调用处理器的处理方法
    public async Task HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent
    {
        var eventtype = @event.GetType();
        if(registrations.TryGetValue(eventtype,out List<Type> handlertypes) && handlertypes.Count > 0)
        {
            using(var childscope = this.serviceprovider.CreateScope())
            {
                foreach(var handlertype in handlertypes)
                {
                    var handler = Activator.CreateInstance(handlertype) as IEventHandler;
                    await handler.HandleAsync(@event);
                }
            }
        }
    }

   //判断消息与处理器之间是否有关联
    public bool IsRegisterEventHandler<TEvent, TEventHandler>()
        where TEvent : IEvent
        where TEventHandler : IEventHandler
    {
        if(registrations.TryGetValue(typeof(TEvent),out List<Type> handlertypelist))
        {
            return handlertypelist != null && handlertypelist.Contains(typeof(IEventHandler));
        }
        return false;
    }
   
  //将消息与处理器关联起来,能够在内存中创建关联,也能够创建在数据库单独表中
    public void RegisterEventHandler<TEvent, TEventHandler>()
        where TEvent : IEvent
        where TEventHandler : IEventHandler
    {
        Utils.DictionaryRegister(typeof(TEvent), typeof(TEventHandler), registrations);
    }
}

上面咱们基本上就将消息总线的架子搭建起来了,也实现了基本的功能,下一章咱们基于它来实现RabbitMq的消息总线。

QQ讨论群:309287205 微服务实战视频请关注微信公众号:msshcj

相关文章
相关标签/搜索