微服务实战(四):落地微服务架构到直销系统(将生产者与消费者接入消息总线)

微服务实战(四):落地微服务架构到直销系统(将生产者与消费者接入消息总线)服务器

前一篇文章咱们已经完成了基于RabbitMq实现的的消息总线,这篇文章就来看看生产者(订单微服务)与消费者(经销商微服务)如何接入消息总线实现消息的发送与消息的接收处理。微信

定义须要发送的消息:架构

下单消息要被发送到消息总线,并被经销商微服务的处理器处理。经销商微服务处理时,须要知道要对哪一个经销商处理多少的PV值与电子币余额。这些信息就是事件消息须要承载的重要信息。app

public class OrderCreatedProcessDealerEvent:BaseEventssh

{
    public decimal OrderTotalPrice { get; set; }
    public decimal OrderTotalPV { get; set; }
    public Guid DealerId { get; set; }
    public Guid OrderId { get; set; }
    public OrderCreatedProcessDealerEvent(Guid dealerid,Guid orderid,decimal ordertotalprice,decimal
        ordertotalpv)
    {
        this.OrderTotalPrice = ordertotalprice;
        this.OrderTotalPV = ordertotalpv;
        this.DealerId = dealerid;
        this.OrderId = orderid;
    }
}

生产者(订单微服务)链接到消息总线:ide

生产者-订单微服务经过Asp.net core WebApi自带的依赖注入,链接到RabbitMq消息总线。函数

services.AddSingleton<IEventHandlerExecutionContext>(new EventHandlerExecutionContext(services));
        var connectionFactory = new ConnectionFactory { HostName = "localhost" };
        services.AddSingleton<IEventBus>(sp => new RabbitMqEB(connectionFactory,
            sp.GetRequiredService<IEventHandlerExecutionContext>(), "exchange2", "direct", "ordereventqueue", 1));

从上面代码能够看出,生产者链接到了localhost的Rabbit服务器,并经过调用消息总线的构造函数,定义了发送消息的通道。构造函数具体内容能够查看上一篇文章。微服务

生产者(订单微服务)发送消息到消息总线:
ieventbus.Publish(new OrderCreatedProcessDealerEvent(orderdto.DealerId,ui

orderid, order.OrderTotalPrice.TotalPrice, order.OrderTotalPV.TotalPV));

ieventbus是注入到订单微服务的构造函数中,并传递到订单建立的用例中。 this

实现消费者(经销商微服务)的消息处理器:

消费者会链接到消息总线,接收到特定类型的消息(这里是OrderCreatedProcessDealerEvent),会交给特定的处理器进行处理,因此须要先定义并实现消息处理器。

public class OrderCreatedEventHandler : IEventHandler

{
    ServiceLocator servicelocator = new ServiceLocator();
    public Task<bool> HandleAsync<TEvent>(TEvent @event) where TEvent : IEvent
    {
        var idealercontext = servicelocator.GetService<IDealerContext>();
        var irepository =
            servicelocator.GetService<IRepository>(new ParameterOverrides { { "context", idealercontext } });
        var idealerrepository = servicelocator.GetService<IDealerRepository>(new ParameterOverrides { { "context", idealercontext } });

//先将接收到的消息转换成特定类型
var ordercreatedevent = @event as OrderCreatedProcessDealerEvent;

using (irepository)
        {
            try
            {
              //根据消息内容,处理本身的逻辑与持久化
                idealerrepository.SubParentEleMoney(ordercreatedevent.DealerId, ordercreatedevent.OrderTotalPrice);
                idealerrepository.AddDealerPV(ordercreatedevent.DealerId, ordercreatedevent.OrderTotalPV);
                irepository.Commit();
            }
            catch (EleMoneyNotEnoughException)
            {
                 //先不处理电子币余额不足的状况                   

            }
        }
        return Task.FromResult(true);
    }
}

消费者(经销商微服务)链接到消息总线:

须要在经销商微服务指定须要链接到的消息总线,并订阅哪一个类型的消息交给哪一个事件处理器进行处理。

//用于侦听订单上下文传递的消息
        services.AddSingleton<IEventHandlerExecutionContext>(new EventHandlerExecutionContext(services));
        var connectionFactory = new ConnectionFactory { HostName = "localhost" };
        services.AddSingleton<IEventBus>(sp => new RabbitMqEB(connectionFactory,
            sp.GetRequiredService<IEventHandlerExecutionContext>(), "exchange2", "direct", "ordereventqueue", 2));
        var eventbus = app.ApplicationServices.GetService<IEventBus>();

//订阅消息

eventbus.Subscribe<OrderCreatedProcessDealerEvent, OrderCreatedEventHandler>();

这样,两个微服务直接就能经过RabbitMq消息总线进行消息的发送、消息的接收与处理了,实现了解耦。

QQ讨论群:309287205 微服务实战视频请关注微信公众号:msshcj

相关文章
相关标签/搜索