过去在学Actor模型的时候,就认为异步消息是至关的重要,在华为的时候,也深扒了一下当时产品用的消息模型,简单实用,支撑起了不少模块和业务,但也有一个缺点是和其余的框架有耦合,最近看到以太坊的事件框架,一样简单简洁,理念很适合初步接触事件框架的同窗,写文介绍一下。小程序
以太坊的事件框架是一个单独的基础模块,存在于目录go-ethereum/event
中,它有2中独立的事件框架实现,老点的叫TypeMux
,已经基本弃用,新的叫Feed
,当前正在普遍使用。缓存
TypeMux
和Feed
还只是简单的事件框架,与Kafka、RocketMQ等消息系统相比,是很是的传统和简单,可是TypeMux
和Feed
的简单简洁,已经很好的支撑以太坊的上层模块,这是当下最好的选择。app
TypeMux
和Feed
各有优劣,最优秀的共同特色是,他们只依赖于Golang原始的包,彻底与以太坊的其余模块隔离开来,也就是说,你彻底能够把这两个事件框架用在本身的项目中。框架
TypeMux
的特色是,你把全部的订阅塞给它就好,事件来了它自会通知你,但有可能会阻塞,通知你不是那么及时,甚至过了一段挺长的时间。异步
Feed
的特色是,它一般不存在阻塞的状况,会及时的把事件通知给你,但须要你为每类事件都创建一个Feed,而后不一样的事件去不一样的Feed上订阅和发送,这其实挺烦人的,若是你用错了Feed,会致使panic。函数
接下来,介绍下这种简单事件框架的抽象模型,而后再回归到以太坊,介绍下TypeMux
和Feed
。post
!<--more-->ui
如上图,轻量级的事件框架会把全部的被订阅的事件收集起来,而后把每一个订阅者组合成一个列表,当事件框架收到某个事件的时候,就把订阅该事件的全部订阅者找出来,而后把这个事件发给他们。this
它须要具备2个功能:spa
若是作成完善的消息系统,就还得考虑这些特性:可用性、吞吐量、传输延迟、有序消息、消息存储、过滤、重发,这和事件框架相比就复杂上去了,咱们专一的介绍下以太坊的事件模型怎么完成上述3个功能的。
TypeMux
是一个以太坊不太满意的事件框架,因此以太坊就搞了Feed
出来,它解决了TypeMux
效率低下,延迟交付的问题。接下来就先看下这个TypeMux
。
TypeMux是一个同步事件框架。它的实现和上面讲的事件框架的抽象结构是彻底同样的,它维护了一个订阅表,表里维护了每一个事件的订阅者列表。它的特色:
看下它2个功能的实现:
TypeMux.Subscribe()
,入参为要订阅的事件类型,会返回TypeMuxSubscription
给订阅者,订阅者可经过此控制订阅,经过TypeMuxSubscription.Unsubscribe()
能够取消订阅。TypeMux.Post()
,入参为事件类型,根据订阅表找出该事件的订阅者列表,遍历列表,依次向每一个订阅者传递事件,若是前一个没有传递完成进入阻塞,会致使后边的订阅者不能及时收到事件。TypeMux
的精简组成:
// A TypeMux dispatches events to registered receivers. Receivers can be // registered to handle events of certain type. Any operation // called after mux is stopped will return ErrMuxClosed. // // The zero value is ready to use. // // Deprecated: use Feed // 本质:哈希列表,每一个事件的订阅者都存到对于的列表里 type TypeMux struct { mutex sync.RWMutex // 锁 subm map[reflect.Type][]*TypeMuxSubscription // 订阅表:全部事件类型的全部订阅者 stopped bool }
订阅:
// Subscribe creates a subscription for events of the given types. The // subscription's channel is closed when it is unsubscribed // or the mux is closed. // 订阅者只传入订阅的事件类型,而后TypeMux会返回给它一个订阅对象 func (mux *TypeMux) Subscribe(types ...interface{}) *TypeMuxSubscription { sub := newsub(mux) mux.mutex.Lock() defer mux.mutex.Unlock() if mux.stopped { // set the status to closed so that calling Unsubscribe after this // call will short circuit. sub.closed = true close(sub.postC) } else { if mux.subm == nil { mux.subm = make(map[reflect.Type][]*TypeMuxSubscription) } for _, t := range types { rtyp := reflect.TypeOf(t) // 在同一次订阅中,不要重复订阅同一个类型的事件 oldsubs := mux.subm[rtyp] if find(oldsubs, sub) != -1 { panic(fmt.Sprintf("event: duplicate type %s in Subscribe", rtyp)) } subs := make([]*TypeMuxSubscription, len(oldsubs)+1) copy(subs, oldsubs) subs[len(oldsubs)] = sub mux.subm[rtyp] = subs } } return sub }
取消订阅:
func (s *TypeMuxSubscription) Unsubscribe() { s.mux.del(s) s.closewait() }
发布事件和传递事件:
// Post sends an event to all receivers registered for the given type. // It returns ErrMuxClosed if the mux has been stopped. // 遍历map,找到全部订阅的人,向它们传递event,同一个event对象,非拷贝,运行在调用者goroutine func (mux *TypeMux) Post(ev interface{}) error { event := &TypeMuxEvent{ Time: time.Now(), Data: ev, } rtyp := reflect.TypeOf(ev) mux.mutex.RLock() if mux.stopped { mux.mutex.RUnlock() return ErrMuxClosed } subs := mux.subm[rtyp] mux.mutex.RUnlock() for _, sub := range subs { sub.deliver(event) } return nil } func (s *TypeMuxSubscription) deliver(event *TypeMuxEvent) { // Short circuit delivery if stale event // 不发送过早(老)的消息 if s.created.After(event.Time) { return } // Otherwise deliver the event s.postMu.RLock() defer s.postMu.RUnlock() select { case s.postC <- event: case <-s.closing: } }
我上面指出了发送事件可能阻塞,阻塞在哪?关键就在下面这里:建立TypeMuxSubscription
时,通道使用的是无缓存通道,读写是同步的,这里注定了TypeMux是一个同步事件框架,这是以太坊改用Feed的最大缘由。
func newsub(mux *TypeMux) *TypeMuxSubscription { c := make(chan *TypeMuxEvent) // 无缓冲通道,同步读写 return &TypeMuxSubscription{ mux: mux, created: time.Now(), readC: c, postC: c, closing: make(chan struct{}), } }
Feed是一个流式事件框架。上文强调了TypeMux是一个同步框架,也正是由于此以太坊丢弃了它,难道Feed
就是一个异步框架?不必定是的,这取决于订阅者是否采用有缓存的通道,采用有缓存的通道,则Feed就是异步的,采用无缓存的通道,Feed就是同步的,把同步仍是异步的选择交给使用者。
本节强调Feed的流式特色。事件本质是一个数据,接二连三的事件就组成了一个数据流,这些数据流不停的流向它的订阅者那里,而且不会阻塞在任何一个订阅者那里。
举几个不是十分恰当的例子。
Feed和TypeMux相同的是,它们都是推模式,不一样的是Feed是异步的,若是有些订阅者阻塞了,不要紧,它会继续向后面的订阅者发送事件/消息。
Feed是一个一对多的事件流框架。每一个类型的事件都须要一个与之对应的Feed,订阅者经过这个Feed进行订阅事件,发布者经过这个Feed发布事件。
看下Feed是如何实现2个功能的:
Feed.Subscribe()
,入参是一个通道,一般是有缓冲的,就算是无缓存也不会形成Feed阻塞,Feed会校验这个通道的类型和本Feed管理的事件类型是否一致,而后把通道保存下来,返回给订阅者一个Subscription
,能够经过它取消订阅和读取通道错误。Feed.Send()
入参是一个事件,加锁确保本类型事件只有一个发送协程正在进行,而后校验事件类型是否匹配,Feed会尝试给每一个订阅者发送事件,若是订阅者阻塞,Feed就继续尝试给下一个订阅者发送,直到给每一个订阅者发送事件,返回发送该事件的数量。Feed定义:
// Feed implements one-to-many subscriptions where the carrier of events is a channel. // Values sent to a Feed are delivered to all subscribed channels simultaneously. // // Feeds can only be used with a single type. The type is determined by the first Send or // Subscribe operation. Subsequent calls to these methods panic if the type does not // match. // // The zero value is ready to use. // 一对多的事件订阅管理:每一个feed对象,当别人调用send的时候,会发送给全部订阅者 // 每种事件类型都有一个本身的feed,一个feed内订阅的是同一种类型的事件,得用某个事件的feed才能订阅该事件 type Feed struct { once sync.Once // ensures that init only runs once sendLock chan struct{} // sendLock has a one-element buffer and is empty when held.It protects sendCases. 这个锁确保了只有一个协程在使用go routine removeSub chan interface{} // interrupts Send sendCases caseList // the active set of select cases used by Send,订阅的channel列表,这些channel是活跃的 // The inbox holds newly subscribed channels until they are added to sendCases. mu sync.Mutex inbox caseList // 不活跃的在这里 etype reflect.Type closed bool }
订阅事件:
// Subscribe adds a channel to the feed. Future sends will be delivered on the channel // until the subscription is canceled. All channels added must have the same element type. // // The channel should have ample buffer space to avoid blocking other subscribers. // Slow subscribers are not dropped. // 订阅者传入接收事件的通道,feed将通道保存为case,而后返回给订阅者订阅对象 func (f *Feed) Subscribe(channel interface{}) Subscription { f.once.Do(f.init) // 通道和通道类型检查 chanval := reflect.ValueOf(channel) chantyp := chanval.Type() if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 { panic(errBadChannel) } sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)} f.mu.Lock() defer f.mu.Unlock() if !f.typecheck(chantyp.Elem()) { panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)}) } // 把通道保存到case // Add the select case to the inbox. // The next Send will add it to f.sendCases. cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval} f.inbox = append(f.inbox, cas) return sub }
发送和传递事件:这个发送是比较绕一点的,要想真正掌握其中的运行,最好写个小程序练习下。
// Send delivers to all subscribed channels simultaneously. // It returns the number of subscribers that the value was sent to. // 同时向全部的订阅者发送事件,返回订阅者的数量 func (f *Feed) Send(value interface{}) (nsent int) { rvalue := reflect.ValueOf(value) f.once.Do(f.init) <-f.sendLock // 获取发送锁 // Add new cases from the inbox after taking the send lock. // 从inbox加入到sendCases,不能订阅的时候直接加入到sendCases,由于可能其余协程在调用发送 f.mu.Lock() f.sendCases = append(f.sendCases, f.inbox...) f.inbox = nil // 类型检查:若是该feed不是要发送的值的类型,释放锁,而且执行panic if !f.typecheck(rvalue.Type()) { f.sendLock <- struct{}{} panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype}) } f.mu.Unlock() // Set the sent value on all channels. // 把发送的值关联到每一个case/channel,每个事件都有一个feed,因此这里全是同一个事件的 for i := firstSubSendCase; i < len(f.sendCases); i++ { f.sendCases[i].Send = rvalue } // Send until all channels except removeSub have been chosen. 'cases' tracks a prefix // of sendCases. When a send succeeds, the corresponding case moves to the end of // 'cases' and it shrinks by one element. // 全部case仍然保留在sendCases,只是用过的会移动到最后面 cases := f.sendCases for { // Fast path: try sending without blocking before adding to the select set. // This should usually succeed if subscribers are fast enough and have free // buffer space. // 使用非阻塞式发送,若是不能发送就及时返回 for i := firstSubSendCase; i < len(cases); i++ { // 若是发送成功,把这个case移动到末尾,因此i这个位置就是没处理过的,而后大小减1 if cases[i].Chan.TrySend(rvalue) { nsent++ cases = cases.deactivate(i) i-- } } // 若是这个地方成立,表明全部订阅者都不阻塞,都发送完了 if len(cases) == firstSubSendCase { break } // Select on all the receivers, waiting for them to unblock. // 返回一个可用的,直到不阻塞。 chosen, recv, _ := reflect.Select(cases) if chosen == 0 /* <-f.removeSub */ { // 这个接收方要删除了,删除并缩小sendCases index := f.sendCases.find(recv.Interface()) f.sendCases = f.sendCases.delete(index) if index >= 0 && index < len(cases) { // Shrink 'cases' too because the removed case was still active. cases = f.sendCases[:len(cases)-1] } } else { // reflect已经确保数据已经发送,无需再尝试发送 cases = cases.deactivate(chosen) nsent++ } } // 把sendCases中的send都标记为空 // Forget about the sent value and hand off the send lock. for i := firstSubSendCase; i < len(f.sendCases); i++ { f.sendCases[i].Send = reflect.Value{} } f.sendLock <- struct{}{} return nsent }