NSQ系列之nsqlookupd代码分析四(详解nsqlookupd中的RegitrationDB操作方法)
上一章我们大致了解了nsqlookupd
的tcpServer
中的IOLoop
协议的处理逻辑,里面有提到一个存储nsqd
的PeerInfo
以及topic
channel
数据信息的RegitrationDB
的一些操作方法。今天我们就来讲解一下关于RegitrationDB
的操作方法
废话不多说,直接上代码吧(代码位于nsq/nsqlookupd/regitration_db.go这个文件中)
type RegistrationDB struct { sync.RWMutex //读写锁用于并发操作 registrationMap map[Registration]Producers //定义一个一Regitration为key producer指针的slice为value的map } type Registration struct { Category string Key string SubKey string } type Registrations []Registration //用于记录client相关信息 type PeerInfo struct { lastUpdate int64 //client 心跳包最后接收时间 id string //client remote address RemoteAddress string `json:"remote_address"` Hostname string `json:"hostname"` BroadcastAddress string `json:"broadcast_address"` TCPPort int `json:"tcp_port"` HTTPPort int `json:"http_port"` Version string `json:"version"` } type Producer struct { //PeerInfo指针 peerInfo *PeerInfo tombstoned bool tombstonedAt time.Time } type Producers []*Producer //实现String接口,打印出producer信息 func (p *Producer) String() string { return fmt.Sprintf("%s [%d,%d]",p.peerInfo.BroadcastAddress,p.peerInfo.TCPPort,p.peerInfo.HTTPPort) } //producer标记为tombstoned 并记录当前时间 func (p *Producer) Tombstone() { p.tombstoned = true p.tombstonedAt = time.Now() } //判断producer是否是tombstoned func (p *Producer) IsTombstoned(lifetime time.Duration) bool { return p.tombstoned && time.Now().Sub(p.tombstonedAt) < lifetime } //初始化一个RegistrationDB func NewRegistrationDB() *RegistrationDB { return &RegistrationDB{ registrationMap: make(map[Registration]Producers),} } // add a registration key //添加一个Registration key 如果不存在Map中则将其设置为你一个空的Producer func (r *RegistrationDB) AddRegistration(k Registration) { r.Lock() defer r.Unlock() _,ok := r.registrationMap[k] if !ok { r.registrationMap[k] = Producers{} } } // add a producer to a registration //添加一个producer到registration中 func (r *RegistrationDB) AddProducer(k Registration,p *Producer) bool { r.Lock() defer r.Unlock() producers := r.registrationMap[k] found := false for _,producer := range producers { if producer.peerInfo.id == p.peerInfo.id { found = true } } if found == false { r.registrationMap[k] = append(producers,p) } return !found } // remove a producer from a registration //移除registration中一个producer func (r *RegistrationDB) RemoveProducer(k Registration,id string) (bool,int) { r.Lock() defer r.Unlock() producers,ok := r.registrationMap[k] if !ok { return false,0 } removed := false //这里用到里从一个slice中删除一个元素的方法 cleaned := Producers{} for _,producer := range producers { if producer.peerInfo.id != id { cleaned = append(cleaned,producer) } else { removed = true } } // Note: this leaves keys in the DB even if they have empty lists r.registrationMap[k] = cleaned //返货是否移除以及新的producers长度 return removed,len(cleaned) } // remove a Registration and all it's producers //删除registration下所有的producers func (r *RegistrationDB) RemoveRegistration(k Registration) { r.Lock() defer r.Unlock() delete(r.registrationMap,k) } func (r *RegistrationDB) needFilter(key string,subkey string) bool { return key == "*" || subkey == "*" } //根据category key subkey查找Registrations //如果传入的key 或 subkey为*的话则获取所有的registrationMap中所有的registration //如果key 或 subkey 不为* 的话则 获取具体的registration //这里实现了类似 * 这个通配符的概念 func (r *RegistrationDB) FindRegistrations(category string,key string,subkey string) Registrations { r.RLock() defer r.RUnlock() if !r.needFilter(key,subkey) { k := Registration{category,key,subkey} if _,ok := r.registrationMap[k]; ok { return Registrations{k} } return Registrations{} } results := Registrations{} for k := range r.registrationMap { if !k.IsMatch(category,subkey) { continue } results = append(results,k) } return results } //根据category key subkey查找所有的Producer //同上面的FindRegistrations函数一样,实现了*通配符的概念 func (r *RegistrationDB) FindProducers(category string,subkey string) Producers { r.RLock() defer r.RUnlock() if !r.needFilter(key,subkey} return r.registrationMap[k] } results := Producers{} for k,producers := range r.registrationMap { if !k.IsMatch(category,subkey) { continue } for _,producer := range producers { found := false for _,p := range results { if producer.peerInfo.id == p.peerInfo.id { found = true } } if found == false { results = append(results,producer) } } } return results } //根据producer.peerInfo.id查找所属的registration key func (r *RegistrationDB) LookupRegistrations(id string) Registrations { r.RLock() defer r.RUnlock() results := Registrations{} for k,producers := range r.registrationMap { for _,p := range producers { if p.peerInfo.id == id { results = append(results,k) break } } } return results } //依据Registration中的category key subkey,判断是否与Registration匹配 func (k Registration) IsMatch(category string,subkey string) bool { if category != k.Category { return false } if key != "*" && k.Key != key { return false } if subkey != "*" && k.SubKey != subkey { return false } return true } //根据category key subkey过滤Registrations func (rr Registrations) Filter(category string,subkey string) Registrations { output := Registrations{} for _,k := range rr { if k.IsMatch(category,subkey) { output = append(output,k) } } return output } //获取registrationMap中所有Registration的key func (rr Registrations) Keys() []string { keys := make([]string,len(rr)) for i,k := range rr { keys[i] = k.Key } return keys } //获取registrationMap中所有Registration的subkey func (rr Registrations) SubKeys() []string { subkeys := make([]string,k := range rr { subkeys[i] = k.SubKey } return subkeys } //过滤出所有可用的Producer func (pp Producers) FilterByActive(inactivityTimeout time.Duration,tombstoneLifetime time.Duration) Producers { now := time.Now() results := Producers{} for _,p := range pp { cur := time.Unix(0,atomic.LoadInt64(&p.peerInfo.lastUpdate)) if now.Sub(cur) > inactivityTimeout || p.IsTombstoned(tombstoneLifetime) { continue } results = append(results,p) } return results } //获取Producers中所有的PeerInfo func (pp Producers) PeerInfo() []*PeerInfo { results := []*PeerInfo{} for _,p := range pp { results = append(results,p.peerInfo) } return results }
通过上面代码的分析我们不难看出registration_db.go
文件用map
的形式保存Producer
,并提供一系列增、删、改、查的操作。同时使用RWMutex
做并发控制。
到这里我们讲解了nsqlookupd
中tcpServer
的全部代码了,我们了解了nsqlookupd
是用来发现并记录nsqd
服务相关的remot address tcp
端口 http
端口等信息 以及 相应的topic
和channel
信息的功能,这样好方便消费查询相应的topic
和channel
的nsqd
服务链接信息,已实现对nsqd
进行拓扑管理的功能。
下一章我们开始分析nsqlookupd
中的httpServer
相关的代码
PS:顺便附送前面三章的传送门
NSQ系列之nsqlookupd代码分析一(初探nsqlookup)
NSQ系列之nsqlookupd代码分析二(初识nsqlookupd tcpServer)
NSQ系列之nsqlookupd代码分析三(详解tcpServer 中的IOLoop方法)
PS:如果文中有错误,欢迎大家指正哦。
原文链接:https://www.f2er.com/go/190227.html