在上一篇文章中对nsq进行了简单的介绍,从nsq 的golang的客户端代码分析了一下nsq的使用,这篇文章会分析nsqd的代码golang
nsqd作了什么sql
nsqadmin 是一个简单的管理界面,经过它能够查询topic、channel、消费者等等一些基本信息,nsqadmin是从 nsqlookup中获取信息的,经过nsqadmin也能够建立topic、channel,建立到了nsqlookup中,在nsqlookup中的内存中维护者,nsqd 会在某一个合适的时刻将这些信息拉回本地而后建立
nsqd 启动segmentfault
func (n *NSQD) Main() error { ctx := &context{n} exitCh := make(chan error) var once sync.Once exitFunc := func(err error) { once.Do(func() { if err != nil { n.logf(LOG_FATAL, "%s", err) } exitCh <- err }) } n.tcpServer.ctx = ctx // 启动 tcp监听 n.waitGroup.Wrap(func() { exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf)) }) // 启动http监听 httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired) n.waitGroup.Wrap(func() { exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf)) }) if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" { httpsServer := newHTTPServer(ctx, true, true) n.waitGroup.Wrap(func() { exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf)) }) } // 队列扫描,处理超时、延迟等信息 n.waitGroup.Wrap(n.queueScanLoop) // 向nsqlookup注册本身的元数据信息 n.waitGroup.Wrap(n.lookupLoop) if n.getOpts().StatsdAddress != "" { n.waitGroup.Wrap(n.statsdLoop) } err := <-exitCh return err }
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error { logf(lg.INFO, "TCP: listening on %s", listener.Addr()) var wg sync.WaitGroup for { //等待请求的到来 clientConn, err := listener.Accept() if err != nil { if nerr, ok := err.(net.Error); ok && nerr.Temporary() { logf(lg.WARN, "temporary Accept() failure - %s", err) runtime.Gosched() continue } // theres no direct way to detect this error because it is not exposed if !strings.Contains(err.Error(), "use of closed network connection") { return fmt.Errorf("listener.Accept() error - %s", err) } break } wg.Add(1) // 每当到来一个请求都启动一个goroutine进行处理 go func() { handler.Handle(clientConn) wg.Done() }() } // wait to return until all handler goroutines complete wg.Wait() logf(lg.INFO, "TCP: closing %s", listener.Addr()) return nil }
unc (p *tcpServer) Handle(clientConn net.Conn) { p.ctx.nsqd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteAddr()) // The client should initialize itself by sending a 4 byte sequence indicating // the version of the protocol that it intends to communicate, this will allow us // to gracefully upgrade the protocol away from text/line oriented to whatever... buf := make([]byte, 4) _, err := io.ReadFull(clientConn, buf) if err != nil { p.ctx.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s", err) clientConn.Close() return } //协商协议版本 protocolMagic := string(buf) p.ctx.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic) var prot protocol.Protocol switch protocolMagic { case " V2": prot = &protocolV2{ctx: p.ctx} default: protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL")) clientConn.Close() p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'", clientConn.RemoteAddr(), protocolMagic) return } p.conns.Store(clientConn.RemoteAddr(), clientConn) // 开始一个死循环 err = prot.IOLoop(clientConn) if err != nil { p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err) } p.conns.Delete(clientConn.RemoteAddr()) }
func (p *protocolV2) IOLoop(conn net.Conn) error { var err error var line []byte var zeroTime time.Time clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1) client := newClientV2(clientID, conn, p.ctx) p.ctx.nsqd.AddClient(client.ID, client) // synchronize the startup of messagePump in order // to guarantee that it gets a chance to initialize // goroutine local state derived from client attributes // and avoid a potential race with IDENTIFY (where a client // could have changed or disabled said attributes) messagePumpStartedChan := make(chan bool) go p.messagePump(client, messagePumpStartedChan) // 消息分发,向消费者发送消息 <-messagePumpStartedChan for { // 设置socket读取超时,若是consumer未在指定的时间内发送过来,那么会断开链接,致使consumer退出 if client.HeartbeatInterval > 0 { client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval * 2)) } else { client.SetReadDeadline(zeroTime) } // ReadSlice does not allocate new space for the data each request // ie. the returned slice is only valid until the next call to it //读取生产者或者消费者发送过来的请求 line, err = client.Reader.ReadSlice('\n') if err != nil { if err == io.EOF { err = nil } else { err = fmt.Errorf("failed to read command - %s", err) } break } // trim the '\n' line = line[:len(line)-1] // optionally trim the '\r' if len(line) > 0 && line[len(line)-1] == '\r' { line = line[:len(line)-1] } params := bytes.Split(line, separatorBytes) p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params) var response []byte // 根据不一样的命令执行不一样的动做 response, err = p.Exec(client, params) if err != nil { ctx := "" if parentErr := err.(protocol.ChildErr).Parent(); parentErr != nil { ctx = " - " + parentErr.Error() } p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, err, ctx) sendErr := p.Send(client, frameTypeError, []byte(err.Error())) if sendErr != nil { p.ctx.nsqd.logf(LOG_ERROR, "[%s] - %s%s", client, sendErr, ctx) break } // errors of type FatalClientErr should forceably close the connection if _, ok := err.(*protocol.FatalClientErr); ok { break } continue } if response != nil { err = p.Send(client, frameTypeResponse, response) if err != nil { err = fmt.Errorf("failed to send response - %s", err) break } } } p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting ioloop", client) conn.Close() close(client.ExitChan) if client.Channel != nil { client.Channel.RemoveClient(client.ID) } p.ctx.nsqd.RemoveClient(client.ID) return err }
在继续向下看前,看一下生产者的 PUB 请求在nsqd中作了什么api
func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) { var err error if len(params) < 2 { return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "PUB insufficient number of parameters") } topicName := string(params[1]) if !protocol.IsValidTopicName(topicName) { return nil, protocol.NewFatalClientErr(nil, "E_BAD_TOPIC", fmt.Sprintf("PUB topic name %q is not valid", topicName)) } bodyLen, err := readLen(client.Reader, client.lenSlice) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body size") } if bodyLen <= 0 { return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE", fmt.Sprintf("PUB invalid message body size %d", bodyLen)) } if int64(bodyLen) > p.ctx.nsqd.getOpts().MaxMsgSize { return nil, protocol.NewFatalClientErr(nil, "E_BAD_MESSAGE", fmt.Sprintf("PUB message too big %d > %d", bodyLen, p.ctx.nsqd.getOpts().MaxMsgSize)) } messageBody := make([]byte, bodyLen) _, err = io.ReadFull(client.Reader, messageBody) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body") } if err := p.CheckAuth(client, "PUB", topicName, ""); err != nil { return nil, err } // topic 在nsqd中的建立的lazy create,只有当某个生产者向该topic中发送消息时才会建立topic, topic := p.ctx.nsqd.GetTopic(topicName) msg := NewMessage(topic.GenerateID(), messageBody) err = topic.PutMessage(msg) if err != nil { return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error()) } client.PublishedMessage(topicName, 1) return okBytes, nil }
/ GetTopic performs a thread safe operation // to return a pointer to a Topic object (potentially new) func (n *NSQD) GetTopic(topicName string) *Topic { // most likely, we already have this topic, so try read lock first. n.RLock() // 当topic在nsqd中建立过期就直接返回该topic t, ok := n.topicMap[topicName] n.RUnlock() if ok { return t } n.Lock() t, ok = n.topicMap[topicName] if ok { n.Unlock() return t } deleteCallback := func(t *Topic) { n.DeleteExistingTopic(t.name) } //稍后看一下这个函数 t = NewTopic(topicName, &context{n}, deleteCallback) n.topicMap[topicName] = t n.Unlock() n.logf(LOG_INFO, "TOPIC(%s): created", t.name) // topic is created but messagePump not yet started // if loading metadata at startup, no lookupd connections yet, topic started after load if atomic.LoadInt32(&n.isLoading) == 1 { return t } // if using lookupd, make a blocking call to get the topics, and immediately create them. // this makes sure that any message received is buffered to the right channels //若是使用了nsqlookup,那么从nsqlookup中查询该topic的channel信息,若是没有在nsqd中建立就建立出来 lookupdHTTPAddrs := n.lookupdHTTPAddrs() if len(lookupdHTTPAddrs) > 0 { channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs) if err != nil { n.logf(LOG_WARN, "failed to query nsqlookupd for channels to pre-create for topic %s - %s", t.name, err) } for _, channelName := range channelNames { if strings.HasSuffix(channelName, "#ephemeral") { continue // do not create ephemeral channel with no consumer client } t.GetChannel(channelName) } } else if len(n.getOpts().NSQLookupdTCPAddresses) > 0 { n.logf(LOG_ERROR, "no available nsqlookupd to query for channels to pre-create for topic %s", t.name) } // now that all channels are added, start topic messagePump t.Start() return t }
// Topic constructor func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic { t := &Topic{ name: topicName, channelMap: make(map[string]*Channel), memoryMsgChan: nil, startChan: make(chan int, 1), exitChan: make(chan int), channelUpdateChan: make(chan int), ctx: ctx, paused: 0, pauseChan: make(chan int), deleteCallback: deleteCallback, idFactory: NewGUIDFactory(ctx.nsqd.getOpts().ID), } // create mem-queue only if size > 0 (do not use unbuffered chan) if ctx.nsqd.getOpts().MemQueueSize > 0 { t.memoryMsgChan = make(chan *Message, ctx.nsqd.getOpts().MemQueueSize) } if strings.HasSuffix(topicName, "#ephemeral") { t.ephemeral = true t.backend = newDummyBackendQueue() } else { dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) { opts := ctx.nsqd.getOpts() lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f, args...) } //持久化的结构 t.backend = diskqueue.New( topicName, ctx.nsqd.getOpts().DataPath, ctx.nsqd.getOpts().MaxBytesPerFile, int32(minValidMsgLength), int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength, ctx.nsqd.getOpts().SyncEvery, ctx.nsqd.getOpts().SyncTimeout, dqLogf, ) } // topic中也启动了一个messagePump,在protocolv2中也启动了一个同名函数,前一个是为了向consumer推送消息,这个是向topic下的一个或者多个队列中发送消息 t.waitGroup.Wrap(t.messagePump) // 通知持久化 t.ctx.nsqd.Notify(t) return t } func (t *Topic) Start() { select { case t.startChan <- 1: default: } }
看一下nsqd是如何向nsqlookup注册本身的元数据信息的,在nsqd启动时起了一个goroutine lookuploopapp
func (n *NSQD) lookupLoop() { var lookupPeers []*lookupPeer var lookupAddrs []string connect := true hostname, err := os.Hostname() if err != nil { n.logf(LOG_FATAL, "failed to get hostname - %s", err) os.Exit(1) } // for announcements, lookupd determines the host automatically ticker := time.Tick(15 * time.Second) for { if connect { for _, host := range n.getOpts().NSQLookupdTCPAddresses { if in(host, lookupAddrs) { continue } n.logf(LOG_INFO, "LOOKUP(%s): adding peer", host) lookupPeer := newLookupPeer(host, n.getOpts().MaxBodySize, n.logf, connectCallback(n, hostname)) lookupPeer.Command(nil) // start the connection lookupPeers = append(lookupPeers, lookupPeer) lookupAddrs = append(lookupAddrs, host) } n.lookupPeers.Store(lookupPeers) connect = false } select { case <-ticker: // 向nsqlookup发送心跳信息 // send a heartbeat and read a response (read detects closed conns) for _, lookupPeer := range lookupPeers { n.logf(LOG_DEBUG, "LOOKUPD(%s): sending heartbeat", lookupPeer) cmd := nsq.Ping() _, err := lookupPeer.Command(cmd) if err != nil { n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err) } } case val := <-n.notifyChan: var cmd *nsq.Command var branch string switch val.(type) { // 注册channel case *Channel: // notify all nsqlookupds that a new channel exists, or that it's removed branch = "channel" channel := val.(*Channel) if channel.Exiting() == true { cmd = nsq.UnRegister(channel.topicName, channel.name) } else { cmd = nsq.Register(channel.topicName, channel.name) } // 注册topic case *Topic: // notify all nsqlookupds that a new topic exists, or that it's removed branch = "topic" topic := val.(*Topic) if topic.Exiting() == true { cmd = nsq.UnRegister(topic.name, "") } else { cmd = nsq.Register(topic.name, "") } } for _, lookupPeer := range lookupPeers { n.logf(LOG_INFO, "LOOKUPD(%s): %s %s", lookupPeer, branch, cmd) _, err := lookupPeer.Command(cmd) if err != nil { n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err) } } case <-n.optsNotificationChan: var tmpPeers []*lookupPeer var tmpAddrs []string for _, lp := range lookupPeers { if in(lp.addr, n.getOpts().NSQLookupdTCPAddresses) { tmpPeers = append(tmpPeers, lp) tmpAddrs = append(tmpAddrs, lp.addr) continue } n.logf(LOG_INFO, "LOOKUP(%s): removing peer", lp) lp.Close() } lookupPeers = tmpPeers lookupAddrs = tmpAddrs connect = true case <-n.exitChan: goto exit } } exit: n.logf(LOG_INFO, "LOOKUP: closing") }
在nsqd启动lookuploop这个goroutine时还启动了另外一 queueScanLoop goroutine,主要用来监控超时消息的处理。
总结一下socket
注意,consumer 消费消息是有超时配置的,消费者的每一条消息要在超时范围内,要否则会致使一些问题。tcp