本篇是“事件驱动的微服务”系列的第二篇,主要讲述事件驱动设计。若是想要了解整体设计,请看第一篇"事件驱动的微服务-整体设计"git
咱们经过一个具体的例子来说解事件驱动设计。 本文中的程序有两个微服务,一个是订单服务(Order Service), 另外一个是支付服务(Payment Service)。用户调用订单服务的用例createOrder()来建立订单,建立以后的订单暂时尚未支付信息,订单服务而后发布命令(Command)给支付服务,支付服务完成支付,发送支付完成(Payment Created)消息。订单服务收到消息(Event),在Order表里增长Payment_Id并修改订单状态为“已付款”。
下面就是组件图:
github
事件分红内部事件和外部事件,内部事件是存在于一个微服务内部的事件,不与其余微服务共享。若是用DDD的语言来描述就是在有界上下文(Bounded Context)内的域事件(Domain Event)。外部事件是从一个微服务发布,而被其余微服务接收的事件。若是用DDD的语言来描述就是在不一样有界上下文(Bounded Context)之间传送的域事件(Domain Event)。这两种事件的处理方式不一样。编程
对于内部事件的处理早已有了成熟的方法,它的基本思路是建立一个事件总线(Event Bus),由它来监听事件。而后注册不一样的事件处理器(Event Handler)来处理事件。这种思路被普遍地用于各类领域。架构
下面就是事件总线(Event Bus)的接口,它有两个函数,一个是发布事件(Publish Event),另外一个是添加事件处理器(Event Handler)。一个事件能够有一个或多个事件处理器。框架
type EventBus interface { PublishEvent(EventMessage) AddHandler(EventHandler, ...interface{}) }
事件总线的代码的关键部分是加载事件处理器。咱们以订单服务为例,下面就是加载事件处理器(Event Handler)的代码,它是初始化容器代码的一部分。在这段代码中,它只注册了一个事件,支付完成事件(PaymentCreateEvent),和与之相对应的事件处理器-支付完成事件处理器(PaymentCreatedEventHandler)。函数
func loadEventHandler(c servicecontainer.ServiceContainer) error { var value interface{} var found bool rluf, err := containerhelper.BuildModifyOrderUseCase(&c) if err != nil { return err } pceh := event.PaymentCreatedEventHandler{rluf} if value, found = c.Get(container.EVENT_BUS); !found { message := "can't find key=" + container.EVENT_BUS + " in container " return errors.New(message) } eb := value.(ycq.EventBus) eb.AddHandler(pceh,&event.PaymentCreatedEvent{}) return nil }
因为在处理事件时要调用相应的用例,所以须要把用例注入到事件处理器中。在上段代码中,首先从容器中得到用例,而后建立事件处理器,最后把事件和与之对应的处理器加入到事件总线中。微服务
事件的发布是经过调用事件总线的PublishEvent()来实现的。下面的例子就是在订单服务中经过消息中间件来监听来自外部的支付完成事件(PaymentCreatedEvent),收到后,把它转化成内部事件,而后发送到事件总线上,这样已经注册的事件处理器就能处理它了。ui
eb := value.(ycq.EventBus) subject := config.SUBJECT_PAYMENT_CREATED _, err := ms.Subscribe(subject, func(pce event.PaymentCreatedEvent) { cpm := pce.NewPaymentCreatedDescriptor() logger.Log.Debug("payload:",pce) eb.PublishEvent(cpm) })
那么事件是怎样被处理的呢?关键就在PublishEvent函数。当一个事件发布时,事件总线会把全部注册到该事件的事件处理器的Handle()函数依次调用一遍, 下面就是PublishEvent()的代码。这样每一个事件处理器只要实现Handle()函数就能够了。spa
func (b *InternalEventBus) PublishEvent(event EventMessage) { if handlers, ok := b.eventHandlers[event.EventType()]; ok { for handler := range handlers { handler.Handle(event) } } }
下面就是PaymentCreatedEventHandler的代码。它的逻辑比较简单,就是从Event里得到须要的支付信息,而后调用相应的用例来完成UpdatePayment()功能。.net
type PaymentCreatedEventHandler struct { Mouc usecase.ModifyOrderUseCaseInterface } func(pc PaymentCreatedEventHandler) Handle (message ycq.EventMessage) { switch event := message.Event().(type) { case *PaymentCreatedEvent: status := model.ORDER_STATUS_PAID err := pc.Mouc.UpdatePayment(event.OrderNumber, event.Id,status) if err != nil { logger.Log.Errorf("error in PaymentCreatedEventHandler:", err) } default: logger.Log.Errorf("event type mismatch in PaymentCreatedEventHandler:") } }
我在这里用到了一个第三方库"jetbasrawi/go.cqrs"来处理Eventbus。Jetbasrawi是一个事件溯源(Event Sourcing)的库。事件溯源与事件驱动很容易搞混,它们看起来有点像,但其实是彻底不一样的两个东西。事件驱动是微服务之间的一种调用方式,存在于微服务之间,与RPC的调用方式相对应;而事件溯源是一种编程模式,你能够在微服务内部使用它或不使用它。但我一时找不到事件驱动的库,就先找一个事件溯源的库来用。其实本身写一个也很简单,但我不以为能写的比jetbasrawi更好,那就仍是先用它把。不过事件溯源要比事件驱动复杂,所以用Jetbasrawi可能有点大材小用了。
外部事件的不一样之处是它要在微服务之间进行传送,所以须要消息中间件。我定义了一个通用接口,这样能够支持不一样的消息中间件。它的最重要的两个函数是publish()和Subscribe()。
package gmessaging type MessagingInterface interface { Publish(subject string, i interface{}) error Subscribe(subject string, cb interface{} ) (interface{}, error) Flush() error // Close will close the decorated connection (For example, it could be the coded connection) Close() // CloseConnection will close the connection to the messaging server. If the connection is not decorated, then it is // the same with Close(), otherwise, it is different CloseConnection() }
因为定义了通用接口,它能够支持多种消息中间件,我这里选的是"NATS"消息中间件。当初选它是由于它是云原生计算基金会("CNCF")的项目,并且功能强大,速度也快。若是想了解云原生概念,请参见"云原生的不一样解释及正确含义"
下面的代码就是NATS的实现,若是你想换用别的消息中间件,能够参考下面的代码。
type Nat struct { Ec *nats.EncodedConn } func (n Nat) Publish(subject string, i interface{}) error { return n.Ec.Publish(subject,i) } func (n Nat) Subscribe(subject string, i interface{} ) (interface{}, error) { h := i.(nats.Handler) subscription, err :=n.Ec.Subscribe(subject, h) return subscription, err } func (n Nat) Flush() error { return n.Ec.Flush() } func (n Nat) Close() { n.Ec.Close() } func (n Nat) CloseConnection() { n.Ec.Conn.Close() }
“Publish(subject string, i interface{})”有两个参数,“subject”是消息中间件的队列(Queue)或者是主题(Topic)。第二个参数是要发送的信息,它通常是JSON格式。使用消息中间件时须要一个连接(Connection),这里用的是“*nats.EncodedConn”, 它是一个封装以后的连接,它里面含有一个JSON解码器,能够支持在结构(struct)和JSON之间进行转换。当你调用发布函数时,发送的是结构(struct),解码器自动把它转换成JSON文本再发送出去。“Subscribe(subject string, i interface{} )”也有两个参数,第一个与Publish()的同样,第二个是事件驱动器。当接收到JSON文本后,解码器自动把它转换成结构(struct),而后调用事件处理器。
我把与消息中间件有关的代码写成了一个单独的第三方库,这样不论你是否使用本框架均可以使用这个库。详细信息参见"jfeng45/gmessaging"
命令(Command)在代码实现上和事件(Event)很是类似,但他们在概念上彻底不一样。例如支付申请(Make Payment)是命令,是你主动要求第三方(支付服务)去作一件事情,并且你知道这个第三方是谁。支付完成(Payment Created)是事件,是你在汇报一件事情已经作完,而其余第三方程序可能会根据它的结果来决定是否要作下一步的动做,例如订单服务当收到支付完成这个事件时,就能够更改本身的订单状态为“已支付”。这里,事件的发送方并不知道谁会对这条消息感兴趣,所以这个发送是广播式发送。并且这个动做(支付)已经完成,而命令是还没有完成的动做,所以接收方能够选择拒绝执行一条命令。咱们日常常常讲的事件驱动是松耦合,而RPC是紧耦合,这里指的是事件方式,而不是命令方式。采用命令方式时,因为你已经知道了要发给谁,所以是紧耦合的。
在实际应用中,咱们所看到的大部分的命令都是在一个微服务内部使用,不多有在微服务之间发送命令的,微服务之间传递的主要是事件。但因为事件和命令很容易混淆,有很多在微服务之间传递的“事件”其实是“命令”。所以并非使用事件驱动方式就能把程序变成松耦合的,而要进一步检查你是否将“命令”错用成了“事件”。在本程序中会严格区分它们。
下面就是命令总线(Dispatcher)的接口,除了函数名字不同外,其余与事件总线几乎如出一辙。
type Dispatcher interface { Dispatch(CommandMessage) error RegisterHandler(CommandHandler, ...interface{}) error }
咱们彻底能够把它定义成下面的样子,是否是就与事件总线很像了?下面的接口和上面的是等值的。
type CommandBus interface { PublishCommand(CommandMessage) error AddHandler(CommandHandler, ...interface{}) error }
事件和命令的其余方面,例如定义方式,处理流程,实现方式,传送方式也几乎如出一辙。详细的我就不讲了,你能够本身看代码进行比较。那咱们可不能够只用一个事件总线同时处理时间和命令呢?理论上来说是没有问题的。我开始的时候也是这么想的,但因为如今的接口("jetbasrawi/go.cqrs")不支持,若是要改的话须要从新定义接口,所以就暂时放弃了。另外,他们两个在概念上仍是很不一样的,因此在实现上定义不一样的接口也是有必要的。
下面来说解在设计事件驱动时应注意的问题。
事件驱动模式与RPC相比增长的部分是事件和命令。所以首先要考虑的是要对RPC的程序结构作哪些扩充和怎样扩充。“Event”和“command”从本质上来说是业务逻辑的一部分,所以应属于领域层。所以在程序结构上也增长了两个目录“Event”和“command”分别用来存放事件和命令。结构以下图所示。
如今的代码在处理外部事件时,在发送端和接收端的方式是不同的。
下面就是发送端的代码(代码在支付服务项目里),整个代码功能是建立支付,完成以后再发布“支付完成”消息。它直接经过消息中间件接口把事件发送出去。
type MakePaymentUseCase struct { PaymentDataInterface dataservice.PaymentDataInterface Mi gmessaging.MessagingInterface } func (mpu *MakePaymentUseCase) MakePayment(payment *model.Payment) (*model.Payment, error) { payment, err := mpu.PaymentDataInterface.Insert(payment) if err!= nil { return nil, errors.Wrap(err, "") } pce := event.NewPaymentCreatedEvent(*payment) err = mpu.Mi.Publish(config.SUBJECT_PAYMENT_CREATED, pce) if err != nil { return nil, err } return payment, nil }
下面就是接收端的代码例子。是它是先用消息接口接收时间,再把外部事件转化为内部事件,而后调用事件总线的接口在微服务内部发布事件。
eb := value.(ycq.EventBus) subject := config.SUBJECT_PAYMENT_CREATED _, err := ms.Subscribe(subject, func(pce event.PaymentCreatedEvent) { cpm := pce.NewPaymentCreatedDescriptor() logger.Log.Debug("payload:",pce) eb.PublishEvent(cpm) })
为何会有这种不一样?在接收时,可不能够不生成内部事件,而是直接调用用例来处理外部事件呢?在发送时,若是没有别的内部事件处理器,那么直接调用消息中间件来发送是最简单的方法(这个发送过程是轻量级的,耗时很短)。而接收时可能须要处理比较复杂的业务逻辑。所以你但愿把这个过程分红接收和处理两个部分,让复杂的业务逻辑在另一个过程里处理,这样能够尽可能缩短接收时间,提升接收效率。
如今的设计是每一个事件和事件驱动器都有一个单独的文件。我见过有些人只用一个文件例如PaymentEvent来存放全部与Payment相关的事件,事件驱动器也是同样。这两种办法都是可行的。如今看来,生成单独的文件比较清晰,但若是之后事件很是多,也许一个文件存放多个事件会比较容易管理,不过到那时再改也不迟。
对于一个好的设计来说,全部的业务逻辑都应该集中在一块儿,这样便于管理。在如今的架构里,业务逻辑是放在用例(Use Case)里的,但事件处理器里也须要有业务逻辑,应该怎么办?支付事件处理器的主要功能是修改订单中的付款信息,这部分的业务逻辑已经体如今修改订单(Modify Order)用例里,所以支付事件处理器只要调用修改订单的MakePayment()函数就能够了。实际上全部的事件处理器都应该这样设计,它们自己不该包含业务逻辑,而只是一个简单的封装,去调用用例里的业务逻辑。那么可不能够直接在用例里定义Handle()函数,这样用例就变成了事件处理器?这样的设计确实可行,但我以为把事件处理器作成一个单独的文件,这样逻辑上更清晰。由于修改订单付款功能你是必定要有的,但事件处理器只有在事件驱动模式下才有,它们是属于两个不一样层面的东西,只有分开放置才层次清晰。
完整的源程序连接:
3 "CNCF"
5 "NATS"