NSQD是 nsq 的主要逻辑部分,请参考官方文档。咱们直接看代码。html
main 函数位于git
github.com/nsqio/nsq/apps/nsqd/nsqd.gogithub
func main() { prg := &program{} if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil { log.Fatal(err) } }
其中svc.Run 是一个service wrapper,来自于go-svc。它传入Service 接口,而且作了四件事情Init, Start, NotifySignal和 Stop:api
// Run runs your Service. // // Run will block until one of the signals specified in sig is received. // If sig is empty syscall.SIGINT and syscall.SIGTERM are used by default. func Run(service Service, sig ...os.Signal) error { env := environment{} if err := service.Init(env); err != nil { return err } if err := service.Start(); err != nil { return err } if len(sig) == 0 { sig = []os.Signal{syscall.SIGINT, syscall.SIGTERM} } signalChan := make(chan os.Signal, 1) signalNotify(signalChan, sig...) <-signalChan return service.Stop() }
让咱们来看下program 对于Service 这个接口的实现网络
func (p *program) Init(env svc.Environment) error { if env.IsWindowsService() { dir := filepath.Dir(os.Args[0]) return os.Chdir(dir) } return nil } func (p *program) Start() error { //ztd: 初始化选项 opts := nsqd.NewOptions() //ztd: 设置接受的参数 flagSet := nsqdFlagSet(opts) flagSet.Parse(os.Args[1:]) rand.Seed(time.Now().UTC().UnixNano()) //ztd: 若是仅仅查询版本信息 if flagSet.Lookup("version").Value.(flag.Getter).Get().(bool) { fmt.Println(version.String("nsqd")) os.Exit(0) } //ztd: 若是指定了conf 文件 var cfg config configFile := flagSet.Lookup("config").Value.String() if configFile != "" { _, err := toml.DecodeFile(configFile, &cfg) if err != nil { log.Fatalf("ERROR: failed to load config file %s - %s", configFile, err.Error()) } } cfg.Validate() //ztd: 合并命令行和配置文件中的配置 options.Resolve(opts, flagSet, cfg) nsqd := nsqd.New(opts) //ztd: metadata 中存储了topic和 channel 信息 err := nsqd.LoadMetadata() if err != nil { log.Fatalf("ERROR: %s", err.Error()) } //ztd: 不是很明白为何读完了立刻又写 err = nsqd.PersistMetadata() if err != nil { log.Fatalf("ERROR: failed to persist metadata - %s", err.Error()) } nsqd.Main() p.nsqd = nsqd return nil } func (p *program) Stop() error { if p.nsqd != nil { p.nsqd.Exit() } return nil }
func (n *NSQD) Main() { var httpListener net.Listener var httpsListener net.Listener ctx := &context{n} //ztd: 监听某一端口 tcpListener, err := net.Listen("tcp", n.getOpts().TCPAddress) if err != nil { n.logf("FATAL: listen (%s) failed - %s", n.getOpts().TCPAddress, err) os.Exit(1) } n.Lock() n.tcpListener = tcpListener n.Unlock() tcpServer := &tcpServer{ctx: ctx} n.waitGroup.Wrap(func() { protocol.TCPServer(n.tcpListener, tcpServer, n.getOpts().Logger) }) //ztd: 启动https 服务 if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" { httpsListener, err = tls.Listen("tcp", n.getOpts().HTTPSAddress, n.tlsConfig) if err != nil { n.logf("FATAL: listen (%s) failed - %s", n.getOpts().HTTPSAddress, err) os.Exit(1) } n.Lock() n.httpsListener = httpsListener n.Unlock() httpsServer := newHTTPServer(ctx, true, true) n.waitGroup.Wrap(func() { http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.getOpts().Logger) }) } //ztd: 启动http 服务 httpListener, err = net.Listen("tcp", n.getOpts().HTTPAddress) if err != nil { n.logf("FATAL: listen (%s) failed - %s", n.getOpts().HTTPAddress, err) os.Exit(1) } n.Lock() n.httpListener = httpListener n.Unlock() httpServer := newHTTPServer(ctx, false, n.getOpts().TLSRequired == TLSRequired) n.waitGroup.Wrap(func() { http_api.Serve(n.httpListener, httpServer, "HTTP", n.getOpts().Logger) }) n.waitGroup.Wrap(func() { n.queueScanLoop() }) n.waitGroup.Wrap(func() { n.lookupLoop() }) if n.getOpts().StatsdAddress != "" { n.waitGroup.Wrap(func() { n.statsdLoop() }) } }
拿出几个片断讨论一下app
n.Lock() n.tcpListener = tcpListener n.Unlock()
开始看到这块有点懵,原来mutex 还能够这么用,先看眼NSQD 的结构tcp
type NSQD struct { //64bit atomic vars need to be first for proper alignment on 32bit platforms clientIDSequence int64 sync.RWMutex ... }
原来NSQD 内嵌了一个sync.RWMtex, 使用的时候直接n.Lock().按照我原来的习惯,都是这样写:函数
var lock sync.Mutex lock.Lock()
NSQD 对sync.WaitGroup 作了巧妙的封装, 使用起来是这个样子:oop
n.waitGroup.Wrap(func() { protocol.TCPServer(n.tcpListener, tcpServer, n.getOpts().Logger) })
来看一眼Wrap的实现:ui
type WaitGroupWrapper struct { sync.WaitGroup } func (w *WaitGroupWrapper) Wrap(cb func()) { w.Add(1) go func() { cb() w.Done() }() }
这段代码看起来比较简单,先给waitgroup +1,起了一个goroutine 运行 callback func,等函数运行结束之后对waitgroup -1. 不过waitgroup 是在哪里wait 的呢?对全局搜了一下代码,发现是在Exit方法里面:
func (n *NSQD) Exit() { if n.tcpListener != nil { n.tcpListener.Close() } if n.httpListener != nil { n.httpListener.Close() } if n.httpsListener != nil { n.httpsListener.Close() } n.Lock() err := n.PersistMetadata() if err != nil { n.logf("ERROR: failed to persist metadata - %s", err) } n.logf("NSQ: closing topics") for _, topic := range n.topicMap { topic.Close() } n.Unlock() close(n.exitChan) n.waitGroup.Wait() n.dl.Unlock() }
Exit 方法关闭了各类资源以后调用了n.waitGroup.Wait(),等待全部资源释放完毕.
回到刚才这一句 protocol.TCPServer(n.tcpListener, tcpServer, n.getOpts().Logger)
func TCPServer(listener net.Listener, handler TCPHandler, l app.Logger) { l.Output(2, fmt.Sprintf("TCP: listening on %s", listener.Addr())) for { clientConn, err := listener.Accept() if err != nil { if nerr, ok := err.(net.Error); ok && nerr.Temporary() { l.Output(2, fmt.Sprintf("NOTICE: temporary Accept() failure - %s", err)) //ztd: 若是发生临时的error让出时间片。猜测这么作的缘由多是当前网络情况很差引发的一 些临时error,若是立刻去accept,会获得另一个error,不如让出时间片,让其余goroutine 作事情。有一点延迟的意思。若是有更好的解释,请纠正我。。。 runtime.Gosched() continue } // theres no direct way to detect this error because it is not exposed if !strings.Contains(err.Error(), "use of closed network connection") { l.Output(2, fmt.Sprintf("ERROR: listener.Accept() - %s", err)) } //ztd: 在上面提到的Exit方法里会关闭listener, break 跳出循环 break } go handler.Handle(clientConn) } l.Output(2, fmt.Sprintf("TCP: closing %s", listener.Addr())) }
在后面的文章中,讲继续阅读tcp handler,http handler,n.queueScanLoop() 和 n.lookupLoop()