上两篇帖子主要说了一下nsq的拓扑结构,如何进行故障处理和横向扩展,保证了客户端和服务端的长链接,链接保持了,就要传输数据了,nsq
如何保证消息被订阅者消费,如何保证消息不丢失,就是今天要阐述的内容。
html
nsq
topic、channel、和消费我客户端的结构如上图,一个topic
下有多个channel
每一个channel
能够被多个客户端订阅。
消息处理的大概流程:当一个消息被nsq
接收后,传给相应的topic
,topic
把消息传递给全部的channel
,channel
根据算法选择一个订阅客户端,把消息发送给客户端进行处理。
看上去这个流程是没有问题的,咱们来思考几个问题git
nsq
服务端从新启动时消息不丢失;以前的帖子说过客户端和服务端进行链接后,会启动一个gorouting
来发送信息给客户端github
go p.messagePump(client, messagePumpStartedChan)
而后会监听客户端发过来的命令client.Reader.ReadSlice('\n')
服务端会定时检查client端的链接状态,读取客户端发过来的各类命令,发送心跳等。每个链接最终的目的就是监听channel
的消息,发送给客户端进行消费。
当有消息发送给订阅客户端的时候,固然选择哪一个client
也是有无则的,这个之后讲,redis
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { // ... for { // ... case b := <-backendMsgChan: if sampleRate > 0 && rand.Int31n(100) > sampleRate { continue } msg, err := decodeMessage(b) if err != nil { p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err) continue } msg.Attempts++ subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) client.SendingMessage() err = p.SendMessage(client, msg) if err != nil { goto exit } flushed = false case msg := <-memoryMsgChan: if sampleRate > 0 && rand.Int31n(100) > sampleRate { continue } msg.Attempts++ subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) client.SendingMessage() err = p.SendMessage(client, msg) if err != nil { goto exit } flushed = false case <-client.ExitChan: goto exit } } // ... }
看一下这个方法调用subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
,在发送给客户端以前,把这个消息设置为在飞翔中,算法
// pushInFlightMessage atomically adds a message to the in-flight dictionary func (c *Channel) pushInFlightMessage(msg *Message) error { c.inFlightMutex.Lock() _, ok := c.inFlightMessages[msg.ID] if ok { c.inFlightMutex.Unlock() return errors.New("ID already in flight") } c.inFlightMessages[msg.ID] = msg c.inFlightMutex.Unlock() return nil }
而后发送给客户端进行处理。
在发送中的数据,存在的各类不肯定性,nsq
的处理方式是:对发送给客户端信息设置为在飞翔中,若是在若是处理成功就把这个消息从飞翔中的状态中去掉,若是在规定的时间内没有收到客户端的反馈,则认为这个消息超时,而后从新归队,两次进行处理。因此不管是哪一种特殊状况,nsq
统一认为消息为超时。服务器
nsq
对超时消息的处理,借鉴了redis
的过时算法,但也不太同样redis
的更复杂一些,由于redis是单线程的,还要处理占用cpu
时间等等,nsq
由于gorouting
的存在要很简单不少。
简单来讲,就是在nsq
启动的时候启动协程去处理channel的过时数据网络
func (n *NSQD) Main() error { // ... // 启动协程去处理channel的过时数据 n.waitGroup.Wrap(n.queueScanLoop) n.waitGroup.Wrap(n.lookupLoop) if n.getOpts().StatsdAddress != "" { n.waitGroup.Wrap(n.statsdLoop) } err := <-exitCh return err }
固然不是每个channel启动一个协程来处理过时数据,而是有一些规定,咱们看一下一些默认值,而后再展开讲算法oop
return &Options{ // ... HTTPClientConnectTimeout: 2 * time.Second, HTTPClientRequestTimeout: 5 * time.Second, // 内存最大队列数 MemQueueSize: 10000, MaxBytesPerFile: 100 * 1024 * 1024, SyncEvery: 2500, SyncTimeout: 2 * time.Second, // 扫描channel的时间间隔 QueueScanInterval: 100 * time.Millisecond, // 刷新扫描的时间间隔 QueueScanRefreshInterval: 5 * time.Second, QueueScanSelectionCount: 20, // 最大的扫描池数量 QueueScanWorkerPoolMax: 4, // 标识百分比 QueueScanDirtyPercent: 0.25, // 消息超时 MsgTimeout: 60 * time.Second, MaxMsgTimeout: 15 * time.Minute, MaxMsgSize: 1024 * 1024, MaxBodySize: 5 * 1024 * 1024, MaxReqTimeout: 1 * time.Hour, ClientTimeout: 60 * time.Second, // ... }
这些参数均可以在启动nsq
的时候根据本身须要来指定,咱们主要说一下这几个:atom
QueueScanWorkerPoolMax
就是最大协程数,默认是4
,这个数是扫描全部channel的最大协程数,固然channel
的数量小于这个参数的话,就调整协程的数量,以最小的为准,好比channel
的数量为2
个,而默认的是4个,那就调扫描的数量为2
个QueueScanSelectionCount
每次扫描最大的channel
数量,默认是20
,若是channel
的数量小于这个值,则以channel
的数量为准。QueueScanDirtyPercent
标识脏数据 channel
的百分比,默认为0.25
,eg: channel
数量为10
,则一次最多扫描10
个,查看每一个channel
是否有过时的数据,若是有,则标记为这个channel是有脏数据的,若是有脏数据的channel的数量 占此次扫描的10
个channel的比例超过这个百分比,则直接再次进行扫描一次,而不用等到下一次时间点。QueueScanInterval
扫描channel的时间间隔,默认的是每100毫秒扫描一次。QueueScanRefreshInterval
刷新扫描的时间间隔 目前的处理方式是调整channel的协程数量。nsq
处理过时数据的算法,总结一下就是,使用协程定时去扫描随机的channel
里是否有过时数据。func (n *NSQD) queueScanLoop() { workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount) responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount) closeCh := make(chan int) workTicker := time.NewTicker(n.getOpts().QueueScanInterval) refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval) channels := n.channels() n.resizePool(len(channels), workCh, responseCh, closeCh) for { select { case <-workTicker.C: if len(channels) == 0 { continue } case <-refreshTicker.C: channels = n.channels() n.resizePool(len(channels), workCh, responseCh, closeCh) continue case <-n.exitChan: goto exit } num := n.getOpts().QueueScanSelectionCount if num > len(channels) { num = len(channels) } loop: // 随机channel for _, i := range util.UniqRands(num, len(channels)) { workCh <- channels[i] } numDirty := 0 for i := 0; i < num; i++ { if <-responseCh { numDirty++ } } if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent { goto loop } } exit: n.logf(LOG_INFO, "QUEUESCAN: closing") close(closeCh) workTicker.Stop() refreshTicker.Stop() }
在扫描channel
的时候,若是发现有过时数据后,会从新放回到队列,进行重发
操做。线程
func (c *Channel) processInFlightQueue(t int64) bool { // ... for { c.inFlightMutex.Lock() msg, _ := c.inFlightPQ.PeekAndShift(t) c.inFlightMutex.Unlock() if msg == nil { goto exit } dirty = true _, err := c.popInFlightMessage(msg.clientID, msg.ID) if err != nil { goto exit } atomic.AddUint64(&c.timeoutCount, 1) c.RLock() client, ok := c.clients[msg.clientID] c.RUnlock() if ok { client.TimedOutMessage() } //从新放回队列进行消费处理。 c.put(msg) } exit: return dirty }
以前的帖子中的例子中有说过,客户端要消费消息,须要实现接口
type Handler interface { HandleMessage(message *Message) error }
在服务端发送消息给客户端后,若是在处理业务逻辑时,若是发生错误则给服务器发送Requeue
命令告诉服务器,从新发送消息进处理。若是处理成功,则发送Finish
命令
func (r *Consumer) handlerLoop(handler Handler) { r.log(LogLevelDebug, "starting Handler") for { message, ok := <-r.incomingMessages if !ok { goto exit } if r.shouldFailMessage(message, handler) { message.Finish() continue } err := handler.HandleMessage(message) if err != nil { r.log(LogLevelError, "Handler returned error (%s) for msg %s", err, message.ID) if !message.IsAutoResponseDisabled() { message.Requeue(-1) } continue } if !message.IsAutoResponseDisabled() { message.Finish() } } exit: r.log(LogLevelDebug, "stopping Handler") if atomic.AddInt32(&r.runningHandlers, -1) == 0 { r.exit() } }
服务端收到命令后,对飞翔中的消息进行处理,若是成功则去掉,若是是Requeue
则执行归队和重发操做,或者进行defer队列处理。
默认的状况下,只有内存队列不足时MemQueueSize:10000
时,才会把数据保存到文件内进行持久到硬盘。
select { case c.memoryMsgChan <- m: default: b := bufferPoolGet() err := writeMessageToBackend(b, m, c.backend) bufferPoolPut(b) c.ctx.nsqd.SetHealth(err) if err != nil { c.ctx.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s", c.name, err) return err } } return nil
若是将 --mem-queue-size 设置为 0,全部的消息将会存储到磁盘。咱们不用担忧消息会丢失,nsq 内部机制保证在程序关闭时将队列中的数据持久化到硬盘,重启后就会恢复。
nsq
本身开发了一个库go-diskqueue来持久会消息到内存。这个库的代码量很少,理解起来也不难,代码逻辑我想下一篇再讲。
看一下保存在硬盘后的样子: