tcp handler 处理每个tcp connectiongit
type tcpServer struct { ctx *context } func (p *tcpServer) Handle(clientConn net.Conn) { p.ctx.nsqd.logf("TCP: new client(%s)", clientConn.RemoteAddr()) // The client should initialize itself by sending a 4 byte sequence indicating // the version of the protocol that it intends to communicate, this will allow us // to gracefully upgrade the protocol away from text/line oriented to whatever... // ztd: 客户端每次创建链接后的第一条消息都会发协议版本过来,从代码 //来看,目前只支持 v2 buf := make([]byte, 4) _, err := io.ReadFull(clientConn, buf) if err != nil { p.ctx.nsqd.logf("ERROR: failed to read protocol version - %s", err) return } protocolMagic := string(buf) p.ctx.nsqd.logf("CLIENT(%s): desired protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic) var prot protocol.Protocol switch protocolMagic { case " V2": prot = &protocolV2{ctx: p.ctx} default: protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL")) clientConn.Close() p.ctx.nsqd.logf("ERROR: client(%s) bad protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic) return } // ztd: 调用了IOLoop 函数来处理客户端的链接 err = prot.IOLoop(clientConn) if err != nil { p.ctx.nsqd.logf("ERROR: client(%s) - %s", clientConn.RemoteAddr(), err) return } }
IOloop对于tcp connection 的处理:github
func (p *protocolV2) IOLoop(conn net.Conn) error { var err error var line []byte var zeroTime time.Time clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1) client := newClientV2(clientID, conn, p.ctx) // synchronize the startup of messagePump in order // to guarantee that it gets a chance to initialize // goroutine local state derived from client attributes // and avoid a potential race with IDENTIFY (where a client // could have changed or disabled said attributes) messagePumpStartedChan := make(chan bool) go p.messagePump(client, messagePumpStartedChan) <-messagePumpStartedChan for { if client.HeartbeatInterval > 0 { client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval * 2)) } else { client.SetReadDeadline(zeroTime) } // ReadSlice does not allocate new space for the data each request // ie. the returned slice is only valid until the next call to it line, err = client.Reader.ReadSlice('\n') if err != nil { if err == io.EOF { err = nil } else { err = fmt.Errorf("failed to read command - %s", err) } break } // trim the '\n' line = line[:len(line)-1] // optionally trim the '\r' if len(line) > 0 && line[len(line)-1] == '\r' { line = line[:len(line)-1] } // ztd:命令以空格分隔 params := bytes.Split(line, separatorBytes) if p.ctx.nsqd.getOpts().Verbose { p.ctx.nsqd.logf("PROTOCOL(V2): [%s] %s", client, params) } var response []byte // ztd: 执行命令,不一样的命令执行不一样的函数,后面对照一个典型的client 讨论 response, err = p.Exec(client, params) if err != nil { ctx := "" if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil { ctx = " - " + parentErr.Error() } p.ctx.nsqd.logf("ERROR: [%s] - %s%s", client, err, ctx) sendErr := p.Send(client, frameTypeError, []byte(err.Error())) if sendErr != nil { p.ctx.nsqd.logf("ERROR: [%s] - %s%s", client, sendErr, ctx) break } // errors of type FatalClientErr should forceably close the connection if _, ok := err.(*protocol.FatalClientErr); ok { break } continue } if response != nil { err = p.Send(client, frameTypeResponse, response) if err != nil { err = fmt.Errorf("failed to send response - %s", err) break } } } // ztd: 收到EOF代表客户端关闭了链接 p.ctx.nsqd.logf("PROTOCOL(V2): [%s] exiting ioloop", client) conn.Close() close(client.ExitChan) if client.Channel != nil { client.Channel.RemoveClient(client.ID) } return err }
参照官网的consumer 示例写了一个简单的client,这个client 的功能就是订阅一个topic 和 channel,当有producer 向这个channel 发消息时,将消息打印在屏幕上。但愿经过交互的过程来进一步理解server NSQD. 以下缓存
package main import ( "fmt" nsq "github.com/nsqio/go-nsq" ) func main() { config := nsq.NewConfig() c, err := nsq.NewConsumer("nsq", "consumer", config) if err != nil { fmt.Println("Failed to init consumer: ", err.Error()) return } c.AddHandler(nsq.HandlerFunc(func(m *nsq.Message) error { fmt.Println("received message: ", string(m.Body)) m.Finish() return nil })) err = c.ConnectToNSQD("127.0.0.1:4150") if err != nil { fmt.Println("Failed to connect to nsqd: ", err.Error()) return } <-c.StopChan }
在ConnectToNSQD 过程当中,有两步与server 端的交互。第一步:服务器
resp, err := conn.Connect()
在Connect 部分:数据结构
conn, err := dialer.Dial("tcp", c.addr) if err != nil { return nil, err } c.conn = conn.(*net.TCPConn) c.r = conn c.w = conn _, err = c.Write(MagicV2)
tcp 链接创建后,会向server 端发送协议版本号,正如咱们在tcp handler 看到的那样,每次链接创建后都会先收到一个协议版本号。
第二步交互:tcp
cmd := Subscribe(r.topic, r.channel) err = conn.WriteCommand(cmd)
客户端会向服务器端发送"SUB topic channel" 这样一条命令。
下面来看server 端是如何处理这个命令的
在方法func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
中,咱们略过各类各样的检查:ide
topic := p.ctx.nsqd.GetTopic(topicName) channel := topic.GetChannel(channelName) channel.AddClient(client.ID, client) atomic.StoreInt32(&client.State, stateSubscribed) client.Channel = channel // update message pump client.SubEventChan <- channel
除了给channel 添加了一个client 和给client 分配一个channel意外,还更新了message pump,好了,是时候来看看这个message pump 都作了什么了。函数
在tcp Handler 中,有这样的代码oop
messagePumpStartedChan := make(chan bool) go p.messagePump(client, messagePumpStartedChan) <-messagePumpStartedChan
进入到protocal_v2 的messagePump 中:this
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) { var err error var buf bytes.Buffer var memoryMsgChan chan *Message var backendMsgChan chan []byte var subChannel *Channel // NOTE: `flusherChan` is used to bound message latency for // the pathological case of a channel on a low volume topic // with >1 clients having >1 RDY counts var flusherChan <-chan time.Time var sampleRate int32 subEventChan := client.SubEventChan identifyEventChan := client.IdentifyEventChan outputBufferTicker := time.NewTicker(client.OutputBufferTimeout) heartbeatTicker := time.NewTicker(client.HeartbeatInterval) heartbeatChan := heartbeatTicker.C msgTimeout := client.MsgTimeout // v2 opportunistically buffers data to clients to reduce write system calls // we force flush in two cases: // 1. when the client is not ready to receive messages // 2. we're buffered and the channel has nothing left to send us // (ie. we would block in this loop anyway) // flushed := true // signal to the goroutine that started the messagePump // that we've started up close(startedChan) for { if subChannel == nil || !client.IsReadyForMessages() { // the client is not ready to receive messages... memoryMsgChan = nil backendMsgChan = nil flusherChan = nil // force flush client.writeLock.Lock() err = client.Flush() client.writeLock.Unlock() if err != nil { goto exit } flushed = true // ztd: 一旦subChannel 不是nil,就将memoryMsgChan 和 //backendMsgChan 赋值 } else if flushed { // last iteration we flushed... // do not select on the flusher ticker channel memoryMsgChan = subChannel.memoryMsgChan backendMsgChan = subChannel.backend.ReadChan() flusherChan = nil } else { // we're buffered (if there isn't any more data we should flush)... // select on the flusher ticker channel, too memoryMsgChan = subChannel.memoryMsgChan backendMsgChan = subChannel.backend.ReadChan() flusherChan = outputBufferTicker.C } select { case <-flusherChan: // if this case wins, we're either starved // or we won the race between other channels... // in either case, force flush client.writeLock.Lock() err = client.Flush() client.writeLock.Unlock() if err != nil { goto exit } flushed = true case <-client.ReadyStateChan: // ztd: 在SUB 函数里client.SubEventChan <- channel 就是 // 给这个subChannel 赋了值 case subChannel = <-subEventChan: // you can't SUB anymore subEventChan = nil case identifyData := <-identifyEventChan: // you can't IDENTIFY anymore identifyEventChan = nil outputBufferTicker.Stop() if identifyData.OutputBufferTimeout > 0 { outputBufferTicker = time.NewTicker(identifyData.OutputBufferTimeout) } heartbeatTicker.Stop() heartbeatChan = nil if identifyData.HeartbeatInterval > 0 { heartbeatTicker = time.NewTicker(identifyData.HeartbeatInterval) heartbeatChan = heartbeatTicker.C } if identifyData.SampleRate > 0 { sampleRate = identifyData.SampleRate } msgTimeout = identifyData.MsgTimeout case <-heartbeatChan: err = p.Send(client, frameTypeResponse, heartbeatBytes) if err != nil { goto exit } case b := <-backendMsgChan: if sampleRate > 0 && rand.Int31n(100) > sampleRate { continue } msg, err := decodeMessage(b) if err != nil { p.ctx.nsqd.logf("ERROR: failed to decode message - %s", err) continue } msg.Attempts++ subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) client.SendingMessage() err = p.SendMessage(client, msg, &buf) if err != nil { goto exit } flushed = false case msg := <-memoryMsgChan: if sampleRate > 0 && rand.Int31n(100) > sampleRate { continue } msg.Attempts++ subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) client.SendingMessage() err = p.SendMessage(client, msg, &buf) if err != nil { goto exit } flushed = false case <-client.ExitChan: goto exit } } exit: p.ctx.nsqd.logf("PROTOCOL(V2): [%s] exiting messagePump", client) heartbeatTicker.Stop() outputBufferTicker.Stop() if err != nil { p.ctx.nsqd.logf("PROTOCOL(V2): [%s] messagePump error - %s", client, err) } }``` 在这段代码里,一旦有client 订阅一个channel,就开始监听memoryMsgChan, 等待producer 发送信息过来。下面看一下pub 的过程 :
topic := p.ctx.nsqd.GetTopic(topicName) msg := NewMessage(topic.GenerateID(), messageBody) err = topic.PutMessage(msg) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error()) }
在作了一系列检查以后,向topic put 了一条message。 每次new 一个topic 的时候,会启动一个topic的 messagePump:
// Topic constructor func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic { t := &Topic{ name: topicName, channelMap: make(map[string]*Channel), memoryMsgChan: make(chan *Message, ctx.nsqd.getOpts().MemQueueSize), exitChan: make(chan int), channelUpdateChan: make(chan int), ctx: ctx, pauseChan: make(chan bool), deleteCallback: deleteCallback, idFactory: NewGUIDFactory(ctx.nsqd.getOpts().ID), } if strings.HasSuffix(topicName, "#ephemeral") { t.ephemeral = true t.backend = newDummyBackendQueue() } else { t.backend = diskqueue.New(topicName, ctx.nsqd.getOpts().DataPath, ctx.nsqd.getOpts().MaxBytesPerFile, int32(minValidMsgLength), int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength, ctx.nsqd.getOpts().SyncEvery, ctx.nsqd.getOpts().SyncTimeout, ctx.nsqd.getOpts().Logger) } t.waitGroup.Wrap(func() { t.messagePump() }) t.ctx.nsqd.Notify(t) return t }
在messagePump 中,会监听topic 的memoryMsgChan:
for { select { case msg = <-memoryMsgChan:
而每次收到一个消息,会向topic 下面全部的channel 进行广播:
for i, channel := range chans { chanMsg := msg // copy the message because each channel // needs a unique instance but... // fastpath to avoid copy if its the first channel // (the topic already created the first copy) if i > 0 { chanMsg = NewMessage(msg.ID, msg.Body) chanMsg.Timestamp = msg.Timestamp chanMsg.deferred = msg.deferred } if chanMsg.deferred != 0 { channel.PutMessageDeferred(chanMsg, chanMsg.deferred) continue } err := channel.PutMessage(chanMsg) if err != nil { t.ctx.nsqd.logf( "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s", t.name, msg.ID, channel.name, err) } }
若是对nsq 有了解的话,会知道每个topic 会将一个msg 广播给全部的channel,这个逻辑的实现就在这块。 channel 的PutMessage:
func (c *Channel) put(m *Message) error { select { case c.memoryMsgChan <- m:
将消息塞入了channel 的memoryMsgChan中。这时,代码又回到了protocal_v2 的 messagePump中:
case msg := <-memoryMsgChan: if sampleRate > 0 && rand.Int31n(100) > sampleRate { continue } msg.Attempts++ // ztd: 出于可靠性的考虑,将消息发送到subscriber 并不真 // 正将消息删除,而是设置过时时间后,将消息缓存起来 subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout) client.SendingMessage() // ztd: 向客户端发送消息 err = p.SendMessage(client, msg, &buf) if err != nil { goto exit } flushed = false
在客户端的代码中,咱们注意有一行: `m.Finish()`,这行代码是告诉server 端我已经消费完这条信息了,能够丢弃了。这行代码向server 端发送一条`FIN` 命令。在server 端:
// FinishMessage successfully discards an in-flight message func (c *Channel) FinishMessage(clientID int64, id MessageID) error { msg, err := c.popInFlightMessage(clientID, id) if err != nil { return err } c.removeFromInFlightPQ(msg) if c.e2eProcessingLatencyStream != nil { c.e2eProcessingLatencyStream.Insert(msg.Timestamp) } return nil }
做者将这个msg 加到了两个queue,一个是message queue(数据结构实际上是个map),另一个是infligtPG,后面会讲到inflightPQ 的做用。 若是没有及时Finish 消息,怎么处理timeout 的消息呢?在NSQD的 Main 函数中,启动了一个queueScanLoop:
n.waitGroup.Wrap(func() { n.queueScanLoop() })
在这个loop 中,设置了一个ticker,每过一段时间,就会执行resizePool:
func (n *NSQD) queueScanLoop() { workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount) responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount) closeCh := make(chan int) workTicker := time.NewTicker(n.getOpts().QueueScanInterval) refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval) channels := n.channels() n.resizePool(len(channels), workCh, responseCh, closeCh) for { select { case <-workTicker.C: if len(channels) == 0 { continue } case <-refreshTicker.C: channels = n.channels() n.resizePool(len(channels), workCh, responseCh, closeCh) continue case <-n.exitChan: goto exit }
resizePool 中,执行了这么个函数:
func (c *Channel) processInFlightQueue(t int64) bool { c.exitMutex.RLock() defer c.exitMutex.RUnlock() if c.Exiting() { return false } dirty := false for { c.inFlightMutex.Lock() // ztd: 取出一个过时的 msg, _ := c.inFlightPQ.PeekAndShift(t) c.inFlightMutex.Unlock() if msg == nil { goto exit } dirty = true _, err := c.popInFlightMessage(msg.clientID, msg.ID) if err != nil { goto exit } atomic.AddUint64(&c.timeoutCount, 1) c.RLock() client, ok := c.clients[msg.clientID] c.RUnlock() if ok { client.TimedOutMessage() } c.put(msg) } exit: return dirty
}
这个函数不断从inflightPqueue 中取出一个过时的,从inflightMsgQueue 中删除。PeekAndShift:
func (pq *inFlightPqueue) PeekAndShift(max int64) (*Message, int64) { if len(*pq) == 0 { return nil, 0 } x := (*pq)[0] if x.pri > max { return nil, x.pri - max } pq.Pop() return x, 0
}
inflightPqueue 的数据结构是一个最小堆,每次push 一条新的消息:
func (pq *inFlightPqueue) up(j int) { for { i := (j - 1) / 2 // parent if i == j || (*pq)[j].pri >= (*pq)[i].pri { break } pq.Swap(i, j) j = i } }
因此,堆顶的消息是最近一个过时的消息,若是最近一条过时的消息都尚未过时,那就没有过时的消息。若是有过时的,就pop 出来。这样在for 循环中不断把过时消息pop 出来,直到没有过时的消息。