nsqlookupd 用于Topic, Channel, Node 三类信息的一致性分发node
功能定位git
实现方式github
程序入口文件: /apps/nsqlookupd/main.go
golang
为了时NSQ 在windows 良好运行,NSQ 使用了 github.com/judwhite/go-svc/svc
包,用于构建一个可实现windows 服务。 能够用windows 的服务管理插件直接管理。web
svc 包使用时,只须要实现 github.com/judwhite/go-svc/svc.Service
的接口便可。接口以下:sql
type Service interface { // Init is called before the program/service is started and after it's // determined if the program is running as a Windows Service. Init(Environment) error // Start is called after Init. This method must be non-blocking. Start() error // Stop is called in response to os.Interrupt, os.Kill, or when a // Windows Service is stopped. Stop() error }
所以,nsqlookup 只须要实现上述三个方法便可:编程
此方法仅针对windows 的服务作了处理。若为windows 服务,则修改当前目录为可执行文件的目录。json
此方法作了nsqlookupd.Exit() 的处理。
此处用到了sync.Once. 即调用的退出程序仅执行一次。windows
Exit
的具体内容为:api
func (l *NSQLookupd) Exit() { if l.tcpListener != nil { l.tcpListener.Close() } if l.httpListener != nil { l.httpListener.Close() } l.waitGroup.Wait() }
NSQ 命令行参数的构造,采用了golang 自带的flag 包。参数保存于Options对象中,采用了先初始化,后赋值的方式,减小了没必要要的条件判断。
能够采用--config 的方式,直接添加配置文件。配置文件采用toml格式.
配置的解析,采用github.com/mreiferson/go-options
实现,优先级由高到低为:
// RegistrationDB 使用读写锁作读写控制。 type RegistrationDB struct { sync.RWMutex registrationMap map[Registration]ProducerMap } type Registration struct { Category string // Category 有三种类型,Topic, Channel, Client. Key string SubKey string } type ProducerMap map[string]*Producer type Producer struct { peerInfo *PeerInfo //客户端的相关信息 tombstoned bool tombstonedAt time.Time } type PeerInfo struct { lastUpdate int64 // 上次更新的时间 id string // 使用ip标识的id RemoteAddress string `json:"remote_address"` Hostname string `json:"hostname"` BroadcastAddress string `json:"broadcast_address"` TCPPort int `json:"tcp_port"` HTTPPort int `json:"http_port"` Version string `json:"version"` }
tcp 消息是 nsqd 与nsqlookupd 沟通的协议。 node 保存的是nsqd 的信息
Tcp Listener 是用来监听客户端发来的TCP 消息。
创建链接后,发送4个byte标识链接的版本号。目前是v1. "__V1" (下划线用空格替代)
消息之间按照换行符\n
分割。
目前客户端支持4类消息:
IDENTIFY\nBODYLEN(32bit)BODY
|8bit |1 bit | 32bit | N bit | |IDENTIFY| 换行 | body 长度 | body |
** http 客户端的定位是用于服务的发现和admin的交互 **
nsq/internal/http_api
包,此包是对golang 中http请求handler 的一次封装:type Decorator func(APIHandler) APIHandler type APIHandler func(http.ResponseWriter, *http.Request, httprouter.Params) (interface{}, error) // f 是业务处理逻辑, ds 能够自定义多个包装器,用于对f 的输入和输出数据作处理。 func Decorate(f APIHandler, ds ...Decorator) httprouter.Handle { decorated := f for _, decorate := range ds { decorated = decorate(decorated) } return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { decorated(w, req, ps) } } // Decorator 的一个例子,作日志记录的处理 func Log(logf lg.AppLogFunc) Decorator { return func(f APIHandler) APIHandler { return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) { start := time.Now() response, err := f(w, req, ps) elapsed := time.Since(start) status := 200 if e, ok := err.(Err); ok { status = e.Code } logf(lg.INFO, "%d %s %s (%s) %s", status, req.Method, req.URL.RequestURI(), req.RemoteAddr, elapsed) return response, err } } }
这种处理方式相似于大部分web框架HTTP 中间件的处理方式,是利用递归嵌套的方式,保留了处理的上下文, 实现服务切片编程。
http 服务,使用github.com/julienschmidt/httprouter
包实现http 的路由功能。
目前HTTP 客户端支持如下的请求:
Method | Router | Param | Response |
---|---|---|---|
GET | /ping | - | "OK" |
GET | /info | - | 返回版本信息 |
GET | /debug | - | 返回 db 中全部信息 |
GET | /lookup | topic | 返回topic 关联的全部的channels 和 nsqd 服务的信息 |
GET | /topics | - | 返回全部topic 的值 |
GET | /channels | topic | 返回topic 下全部的channels 信息 |
GET | /nodes | - | 返回全部在线的nsqd 的node 信息, node 节点中包含了 topic 的信息,以及是否须要被删除 |
POST | /topic/create | topic | 建立topic <不超过64个字符长度> |
POST | /topic/delete | topic | 删除相应topic 的channel 和topic 信息 |
POST | /channel/create | topic, channel | 建立 channel , 若topic 不存在,建立topic |
POST | /channel/delete | topic, channel | 删除 channel, 支持 * |
POST | /topic/tombstone | topic, node | 将topic 下某个node 设置删除标识 tombstone, 给node 节点 一段空余时间用于删除相关topic 信息,并发送删除topic的命令 |
GET | /debug/pprof | - | pprof 提供的信息 |
GET | /debug/pprof/cmdline | - | pprof 提供的信息 |
GET | /debug/pprof/symbol | - | pprof 提供的信息 |
POST | /debug/pprof/symbol | - | pprof 提供的信息 |
GET | /debug/pprof/profile | - | pprof 提供的信息 |
GET | /debug/pprof/heap | - | pprof 提供的信息 |
GET | /debug/pprof/goroutine | - | pprof 提供的信息 |
GET | /debug/pprof/block | - | pprof 提供的信息 |
GET | /debug/pprof/threadcreate | - | pprof 提供的信息 |
github.com/judwhite/go-svc/svc
的使用github.com/julienschmidt/httprouter
的使用github.com/mreiferson/go-options
的使用