1: 启动类: main.goios
利用:flag.NewFlagSet 解析传递的参数tcp
'注册系统的信号量oop
exitChan := make(chan int)spa
signalChan := make(chan os.Signal, 1)code
go func() {server
<-signalChanit
exitChan <- 1io
}()cli
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)List
调用optios.go 作默认的参数 NewNSQDOptions
启动server的服务: nsqd := NewNSQD(opts)
加载硬盘上的数据:nsqd.LoadMetadata()
err := nsqd.PersistMetadata()
nsqd.Main()
执行lookup 服务的初始化工做
n.waitGroup.Wrap(func() { n.lookupLoop() })
n.waitGroup.Wrap(func() { util.TCPServer(n.tcpListener, tcpServer) }) 开启tcp的监听服务
n.waitGroup.Wrap(func() { util.HTTPServer(n.httpListener, httpServer) }) 开始http的监听服务
main.go 的流程走完
接下来看看有客户端链接之后的服务:
tcp.go
(p *tcpServer) Handle(clientConn net.Conn)
一系列的校验。。。
调用:ProtocolV2. IOLoop
client := NewClientV2(clientID, conn, p.context) 初始化客户端
go p.messagePump(client) 客户select服务
response, err := p.Exec(client, params) 执行具体的客户端操做 func (p *ProtocolV2) Exec(client *ClientV2, params [][]byte) ([]byte, error) { switch { case bytes.Equal(params[0], []byte("FIN")): return p.FIN(client, params) case bytes.Equal(params[0], []byte("RDY")): return p.RDY(client, params) case bytes.Equal(params[0], []byte("REQ")): return p.REQ(client, params) case bytes.Equal(params[0], []byte("PUB")): return p.PUB(client, params) case bytes.Equal(params[0], []byte("MPUB")): return p.MPUB(client, params) case bytes.Equal(params[0], []byte("NOP")): return p.NOP(client, params) case bytes.Equal(params[0], []byte("TOUCH")): return p.TOUCH(client, params) case bytes.Equal(params[0], []byte("IDENTIFY")): return p.IDENTIFY(client, params) case bytes.Equal(params[0], []byte("SUB")): return p.SUB(client, params) case bytes.Equal(params[0], []byte("CLS")): return p.CLS(client, params) } return nil, util.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0])) }