NSQ是一个实时分布式消息平台, 旨在大规模运行, 天天处理数十亿条消息, 被许多互联网公司所使用;sql
其中 nsqd 是一个守护进程, 负责接收, 排队, 投递消息给客户端;
它能够独立运行, 不过一般它是由 nsqlookupd 实例所在集群配置的(它在这能声明 topics 和 channels, 以便你们能找到);
它在 2 个 TCP 端口监听, 一个给客户端, 另外一个是 HTTP API; 同时, 它也能在第三个端口监听 HTTPS数据库
nsq
大概分nsqd
nsqlookupd
nsqadmin
三个部分json
nsqlookupd
是守护进程负责管理拓扑信息; 客户端经过查询nsqlookupd
来发现指定话题(topic)的生产者, 而且 nsqd 节点广播话题(topic)和通道(channel)信息, 具备如下功能api
nsq
服务中只有一个nsqlookupd
服务, 固然也能够在集群中部署多个nsqlookupd
, 但它们之间是没有关联的nsqlookupd
崩溃, 也会不影响正在运行的nsqd
服务nsqd
和naqadmin
信息交互的中间件http
查询服务, 给客户端定时更新nsqd
的地址目录nsqd
是一个守护进程, 负责接收, 排队, 投递消息给客户端缓存
topic
, 同一个channel
的消费者使用负载均衡策略(不是轮询)channel
存在, 即便没有该channel
的消费者, 也会将生产者的message
缓存到队列中(注意消息的过时处理)message
至少会被消费一次, 即便nsqd
退出, 也会将队列中的消息暂存磁盘上(结束进程等意外状况除外)nsqd
中每一个channel
队列在内存中缓存的message
数量, 一旦超出, message
将被缓存到磁盘中topic
, channel
一旦创建, 将会一直存在, 要及时在管理台或者用代码清除无效的topic
和channel
, 避免资源的浪费是一套 WEB UI, 用来聚集集群的实时统计, 并执行不一样的管理任务并发
本文以及后面的分析都是基于 1.0.0 版本代码, 为了增长可读性, 我把注释放在了函数外, 基本都覆盖到, 本文中就不啰嗦讲如何使用了, 查阅文档便可app
package nsqlookupd
// 锁
// 配置选项
// tcpListener 如上文所说 tcp http 端口监听
// httpListener
// waitGroup 线程同步
// 数据库
type NSQLookupd struct {
sync.RWMutex
opts *Options
tcpListener net.Listener
httpListener net.Listener
waitGroup util.WaitGroupWrapper
DB *RegistrationDB
}
// 若是没有指定 Logger, 就new一个
// new NSQLookupd, 待会看一下 `NewRegistrationDB` 作了什么事情
// 解析 log level
func New(opts *Options) *NSQLookupd {
if opts.Logger == nil {
opts.Logger = log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds)
}
n := &NSQLookupd{
opts: opts,
DB: NewRegistrationDB(),
}
var err error
opts.logLevel, err = lg.ParseLogLevel(opts.LogLevel, opts.Verbose)
if err != nil {
n.logf(LOG_FATAL, "%s", err)
os.Exit(1)
}
n.logf(LOG_INFO, version.String("nsqlookupd"))
return n
}
// 建立 context, 其实 ctx 就是 NSQLookupd, 不明白为何画蛇添足, 想要引入原生的 Context struct?
// 建立 tcpListener, 这里用到了锁, 说明该场景有并发
// 根据 ctx 建立 tcpServer
// waitGroup 线程同步后, 建立 TCPServer
// 重复以上步骤,建立 HTTPServer
func (l *NSQLookupd) Main() {
ctx := &Context{l}
tcpListener, err := net.Listen("tcp", l.opts.TCPAddress)
if err != nil {
l.logf(LOG_FATAL, "listen (%s) failed - %s", l.opts.TCPAddress, err)
os.Exit(1)
}
l.Lock()
l.tcpListener = tcpListener
l.Unlock()
tcpServer := &tcpServer{ctx: ctx}
l.waitGroup.Wrap(func() {
protocol.TCPServer(tcpListener, tcpServer, l.logf)
})
httpListener, err := net.Listen("tcp", l.opts.HTTPAddress)
if err != nil {
l.logf(LOG_FATAL, "listen (%s) failed - %s", l.opts.HTTPAddress, err)
os.Exit(1)
}
l.Lock()
l.httpListener = httpListener
l.Unlock()
httpServer := newHTTPServer(ctx)
l.waitGroup.Wrap(func() {
http_api.Serve(httpListener, httpServer, "HTTP", l.logf)
})
}
// 获取 TCP 地址, 继续锁, 说明地址可能会修改
func (l *NSQLookupd) RealTCPAddr() *net.TCPAddr {
l.RLock()
defer l.RUnlock()
return l.tcpListener.Addr().(*net.TCPAddr)
}
// 获取 HTTP 地址
func (l *NSQLookupd) RealHTTPAddr() *net.TCPAddr {
l.RLock()
defer l.RUnlock()
return l.httpListener.Addr().(*net.TCPAddr)
}
// 关闭 tcpListener httpListener, 等待线程同步后结束
func (l *NSQLookupd) Exit() {
if l.tcpListener != nil {
l.tcpListener.Close()
}
if l.httpListener != nil {
l.httpListener.Close()
}
l.waitGroup.Wait()复制代码
OK, 至此 nsqlookupd.go
已经分析完毕, 若是想知道以上代码如何单独使用, 能够看测试nsqlookupd_test.go
呀 😂, 在以上代码中, 咱们看到了 db
部分, 接下来看看负载均衡
package nsqlookupd
// 锁
// 以 Registration 为 key 储存 Producers, 即生产者
type RegistrationDB struct {
sync.RWMutex
registrationMap map[Registration]Producers
}
type Registration struct {
Category string
Key string
SubKey string
}
type Registrations []Registration
// *节点信息*
// 上次更新时间
// 标识符
// 地址
// 主机名
// 广播地址
// tcp 地址
// http 地址
// 版本号
type PeerInfo struct {
lastUpdate int64
id string
RemoteAddress string `json:"remote_address"`
Hostname string `json:"hostname"`
BroadcastAddress string `json:"broadcast_address"`
TCPPort int `json:"tcp_port"`
HTTPPort int `json:"http_port"`
Version string `json:"version"`
}
// *生产者*
// 节点信息
// 是否删除
// 删除时间
type Producer struct {
peerInfo *PeerInfo
tombstoned bool
tombstonedAt time.Time
}
type Producers []*Producer
// 转换为字符串
func (p *Producer) String() string {
return fmt.Sprintf("%s [%d, %d]", p.peerInfo.BroadcastAddress, p.peerInfo.TCPPort, p.peerInfo.HTTPPort)
}
// 删除
func (p *Producer) Tombstone() {
p.tombstoned = true
p.tombstonedAt = time.Now()
}
// 是否删除
func (p *Producer) IsTombstoned(lifetime time.Duration) bool {
return p.tombstoned && time.Now().Sub(p.tombstonedAt) < lifetime
}
// 建立 RegistrationDB
func NewRegistrationDB() *RegistrationDB {
return &RegistrationDB{
registrationMap: make(map[Registration]Producers),
}
}
// 增长一个注册表 key
func (r *RegistrationDB) AddRegistration(k Registration) {
r.Lock()
defer r.Unlock()
_, ok := r.registrationMap[k]
if !ok {
r.registrationMap[k] = Producers{}
}
}
// 添加一个 producer 到 registration
// 取出 producers, 并遍历,
// 若是不存在, 就添加进去
// 若是存在, 返回 false
func (r *RegistrationDB) AddProducer(k Registration, p *Producer) bool {
r.Lock()
defer r.Unlock()
producers := r.registrationMap[k]
found := false
for _, producer := range producers {
if producer.peerInfo.id == p.peerInfo.id {
found = true
}
}
if found == false {
r.registrationMap[k] = append(producers, p)
}
return !found
}
// 根据 id 从 registration 中删除 producer
// 若是不存在, 返回 false
// 建立一个新的 Producers, 遍历原来的 Producers,
// 若是 id 不相同就添加进去, 即删除成功 简单粗暴 哈哈哈哈哈哈
func (r *RegistrationDB) RemoveProducer(k Registration, id string) (bool, int) {
r.Lock()
defer r.Unlock()
producers, ok := r.registrationMap[k]
if !ok {
return false, 0
}
removed := false
cleaned := Producers{}
for _, producer := range producers {
if producer.peerInfo.id != id {
cleaned = append(cleaned, producer)
} else {
removed = true
}
}
// Note: this leaves keys in the DB even if they have empty lists
r.registrationMap[k] = cleaned
return removed, len(cleaned)
}
// 删除一个 registration
func (r *RegistrationDB) RemoveRegistration(k Registration) {
r.Lock()
defer r.Unlock()
delete(r.registrationMap, k)
}
// 须要过滤
func (r *RegistrationDB) needFilter(key string, subkey string) bool {
return key == "*" || subkey == "*"
}
// 根据 category, key, subkey 查找 Registrations
// 若是 key == '*' 或者 subkey == '*', 则只查找一个
// 不然 遍历 registrationMap, 返回全部条件符合的 registration
func (r *RegistrationDB) FindRegistrations(category string, key string, subkey string) Registrations {
r.RLock()
defer r.RUnlock()
if !r.needFilter(key, subkey) {
k := Registration{category, key, subkey}
if _, ok := r.registrationMap[k]; ok {
return Registrations{k}
}
return Registrations{}
}
results := Registrations{}
for k := range r.registrationMap {
if !k.IsMatch(category, key, subkey) {
continue
}
results = append(results, k)
}
return results
}
// 根据 category, key, subkey 查找 Producers
// 同上 没什么好说的, 多了个根据 id 去重, 略啰嗦
func (r *RegistrationDB) FindProducers(category string, key string, subkey string) Producers {
r.RLock()
defer r.RUnlock()
if !r.needFilter(key, subkey) {
k := Registration{category, key, subkey}
return r.registrationMap[k]
}
results := Producers{}
for k, producers := range r.registrationMap {
if !k.IsMatch(category, key, subkey) {
continue
}
for _, producer := range producers {
found := false
for _, p := range results {
if producer.peerInfo.id == p.peerInfo.id {
found = true
}
}
if found == false {
results = append(results, producer)
}
}
}
return results
}
// 根据 id 查找 Registrations
// 依然遍历 没什么好说的
func (r *RegistrationDB) LookupRegistrations(id string) Registrations {
r.RLock()
defer r.RUnlock()
results := Registrations{}
for k, producers := range r.registrationMap {
for _, p := range producers {
if p.peerInfo.id == id {
results = append(results, k)
break
}
}
}
return results
}
// 是否匹配
func (k Registration) IsMatch(category string, key string, subkey string) bool {
if category != k.Category {
return false
}
if key != "*" && k.Key != key {
return false
}
if subkey != "*" && k.SubKey != subkey {
return false
}
return true
}
// 过滤
func (rr Registrations) Filter(category string, key string, subkey string) Registrations {
output := Registrations{}
for _, k := range rr {
if k.IsMatch(category, key, subkey) {
output = append(output, k)
}
}
return output
}
// keys
func (rr Registrations) Keys() []string {
keys := make([]string, len(rr))
for i, k := range rr {
keys[i] = k.Key
}
return keys
}
// subkeys
func (rr Registrations) SubKeys() []string {
subkeys := make([]string, len(rr))
for i, k := range rr {
subkeys[i] = k.SubKey
}
return subkeys
}
// 根据时间过滤
func (pp Producers) FilterByActive(inactivityTimeout time.Duration, tombstoneLifetime time.Duration) Producers {
now := time.Now()
results := Producers{}
for _, p := range pp {
cur := time.Unix(0, atomic.LoadInt64(&p.peerInfo.lastUpdate))
if now.Sub(cur) > inactivityTimeout || p.IsTombstoned(tombstoneLifetime) {
continue
}
results = append(results, p)
}
return results
}
// 节点信息
func (pp Producers) PeerInfo() []*PeerInfo {
results := []*PeerInfo{}
for _, p := range pp {
results = append(results, p.peerInfo)
}
return results
}复制代码
好了, 能够看出 RegistrationDB
以 map
结构包含了全部节点信息; 名为db
, 实则最多算个cache
罢了 2333333; 印证了上文中的 客户端经过查询 nsqlookupd 来发现指定话题(topic)的生产者
;tcp
好了, 第一篇暂时结束, 接下来的敬请期待分布式