上一章咱们大体了解了nsqlookupd
的tcpServer
中的IOLoop
协议的处理逻辑,里面有提到一个存储nsqd
的PeerInfo
以及topic
channel
数据信息的RegitrationDB
的一些操做方法。今天咱们就来说解一下关于RegitrationDB
的操做方法git
废话很少说,直接上代码吧(代码位于nsq/nsqlookupd/regitration_db.go这个文件中)sql
type RegistrationDB struct { sync.RWMutex //读写锁用于并发操做 registrationMap map[Registration]Producers //定义一个一Regitration为key producer指针的slice为value的map } type Registration struct { Category string Key string SubKey string } type Registrations []Registration //用于记录client相关信息 type PeerInfo struct { lastUpdate int64 //client 心跳包最后接收时间 id string //client remote address RemoteAddress string `json:"remote_address"` Hostname string `json:"hostname"` BroadcastAddress string `json:"broadcast_address"` TCPPort int `json:"tcp_port"` HTTPPort int `json:"http_port"` Version string `json:"version"` } type Producer struct { //PeerInfo指针 peerInfo *PeerInfo tombstoned bool tombstonedAt time.Time } type Producers []*Producer //实现String接口,打印出producer信息 func (p *Producer) String() string { return fmt.Sprintf("%s [%d, %d]", p.peerInfo.BroadcastAddress, p.peerInfo.TCPPort, p.peerInfo.HTTPPort) } //producer标记为tombstoned 并记录当前时间 func (p *Producer) Tombstone() { p.tombstoned = true p.tombstonedAt = time.Now() } //判断producer是不是tombstoned func (p *Producer) IsTombstoned(lifetime time.Duration) bool { return p.tombstoned && time.Now().Sub(p.tombstonedAt) < lifetime } //初始化一个RegistrationDB func NewRegistrationDB() *RegistrationDB { return &RegistrationDB{ registrationMap: make(map[Registration]Producers), } } // add a registration key //添加一个Registration key 若是不存在Map中则将其设置为你一个空的Producer func (r *RegistrationDB) AddRegistration(k Registration) { r.Lock() defer r.Unlock() _, ok := r.registrationMap[k] if !ok { r.registrationMap[k] = Producers{} } } // add a producer to a registration //添加一个producer到registration中 func (r *RegistrationDB) AddProducer(k Registration, p *Producer) bool { r.Lock() defer r.Unlock() producers := r.registrationMap[k] found := false for _, producer := range producers { if producer.peerInfo.id == p.peerInfo.id { found = true } } if found == false { r.registrationMap[k] = append(producers, p) } return !found } // remove a producer from a registration //移除registration中一个producer func (r *RegistrationDB) RemoveProducer(k Registration, id string) (bool, int) { r.Lock() defer r.Unlock() producers, ok := r.registrationMap[k] if !ok { return false, 0 } removed := false //这里用到里从一个slice中删除一个元素的方法 cleaned := Producers{} for _, producer := range producers { if producer.peerInfo.id != id { cleaned = append(cleaned, producer) } else { removed = true } } // Note: this leaves keys in the DB even if they have empty lists r.registrationMap[k] = cleaned //返货是否移除以及新的producers长度 return removed, len(cleaned) } // remove a Registration and all it's producers //删除registration下全部的producers func (r *RegistrationDB) RemoveRegistration(k Registration) { r.Lock() defer r.Unlock() delete(r.registrationMap, k) } func (r *RegistrationDB) needFilter(key string, subkey string) bool { return key == "*" || subkey == "*" } //根据category key subkey查找Registrations //若是传入的key 或 subkey为*的话则获取全部的registrationMap中全部的registration //若是key 或 subkey 不为* 的话则 获取具体的registration //这里实现了相似 * 这个通配符的概念 func (r *RegistrationDB) FindRegistrations(category string, key string, subkey string) Registrations { r.RLock() defer r.RUnlock() if !r.needFilter(key, subkey) { k := Registration{category, key, subkey} if _, ok := r.registrationMap[k]; ok { return Registrations{k} } return Registrations{} } results := Registrations{} for k := range r.registrationMap { if !k.IsMatch(category, key, subkey) { continue } results = append(results, k) } return results } //根据category key subkey查找全部的Producer //同上面的FindRegistrations函数同样,实现了*通配符的概念 func (r *RegistrationDB) FindProducers(category string, key string, subkey string) Producers { r.RLock() defer r.RUnlock() if !r.needFilter(key, subkey) { k := Registration{category, key, subkey} return r.registrationMap[k] } results := Producers{} for k, producers := range r.registrationMap { if !k.IsMatch(category, key, subkey) { continue } for _, producer := range producers { found := false for _, p := range results { if producer.peerInfo.id == p.peerInfo.id { found = true } } if found == false { results = append(results, producer) } } } return results } //根据producer.peerInfo.id查找所属的registration key func (r *RegistrationDB) LookupRegistrations(id string) Registrations { r.RLock() defer r.RUnlock() results := Registrations{} for k, producers := range r.registrationMap { for _, p := range producers { if p.peerInfo.id == id { results = append(results, k) break } } } return results } //依据Registration中的category key subkey,判断是否与Registration匹配 func (k Registration) IsMatch(category string, key string, subkey string) bool { if category != k.Category { return false } if key != "*" && k.Key != key { return false } if subkey != "*" && k.SubKey != subkey { return false } return true } //根据category key subkey过滤Registrations func (rr Registrations) Filter(category string, key string, subkey string) Registrations { output := Registrations{} for _, k := range rr { if k.IsMatch(category, key, subkey) { output = append(output, k) } } return output } //获取registrationMap中全部Registration的key func (rr Registrations) Keys() []string { keys := make([]string, len(rr)) for i, k := range rr { keys[i] = k.Key } return keys } //获取registrationMap中全部Registration的subkey func (rr Registrations) SubKeys() []string { subkeys := make([]string, len(rr)) for i, k := range rr { subkeys[i] = k.SubKey } return subkeys } //过滤出全部可用的Producer func (pp Producers) FilterByActive(inactivityTimeout time.Duration, tombstoneLifetime time.Duration) Producers { now := time.Now() results := Producers{} for _, p := range pp { cur := time.Unix(0, atomic.LoadInt64(&p.peerInfo.lastUpdate)) if now.Sub(cur) > inactivityTimeout || p.IsTombstoned(tombstoneLifetime) { continue } results = append(results, p) } return results } //获取Producers中全部的PeerInfo func (pp Producers) PeerInfo() []*PeerInfo { results := []*PeerInfo{} for _, p := range pp { results = append(results, p.peerInfo) } return results }
经过上面代码的分析咱们不难看出registration_db.go
文件用map
的形式保存Producer
,并提供一系列增、删、改、查的操做。同时使用RWMutex
作并发控制。json
到这里咱们讲解了nsqlookupd
中tcpServer
的所有代码了,咱们了解了nsqlookupd
是用来发现并记录nsqd
服务相关的remot address tcp
端口 http
端口等信息 以及 相应的topic
和channel
信息的功能,这样好方便消费查询相应的topic
和channel
的nsqd
服务连接信息,已实现对nsqd
进行拓扑管理的功能。并发
下一章咱们开始分析nsqlookupd
中的httpServer
相关的代码app
PS:顺便附送前面三章的传送门tcp
NSQ系列之nsqlookupd代码分析一(初探nsqlookup)函数
NSQ系列之nsqlookupd代码分析二(初识nsqlookupd tcpServer)oop
NSQ系列之nsqlookupd代码分析三(详解tcpServer 中的IOLoop方法)this
PS:若是文中有错误,欢迎你们指正哦。atom