//原文:http://www.javashuo.com/article/p-rrzehegv-cm.html 做者:啊汉
type Topic struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms messageCount uint64 //消息总数量 messageBytes uint64 //消息总长度 sync.RWMutex name string //topic name channelMap map[string]*Channel //保存topic下面的全部channel backend BackendQueue //磁盘队列 memoryMsgChan chan *Message //内存队列 startChan chan int exitChan chan int channelUpdateChan chan int waitGroup util.WaitGroupWrapper exitFlag int32 //退出标记 idFactory *guidFactory //生成msg id的工厂 ephemeral bool //是否临时topic deleteCallback func(*Topic) //删除topic方法指针 deleter sync.Once paused int32 //暂停标记,1暂停, 0正常 pauseChan chan int ctx *context }
// Command represents a command from a client to an NSQ daemon
//原文:http://www.javashuo.com/article/p-rrzehegv-cm.html 做者:啊汉
type Command struct { Name []byte //命令名称,可选:IDENTIFY、FIN、RDY、REQ、PUB、MPUB、DPUB、NOP、TOUCH、SUB、CLS、AUTH Params [][]byte //不一样的命令作不一样解析,涉及到topic的,Params[0]为topic name Body []byte //消息内容 } // WriteTo implements the WriterTo interface and // serializes the Command to the supplied Writer. // // It is suggested that the target Writer is buffered // to avoid performing many system calls. func (c *Command) WriteTo(w io.Writer) (int64, error) { var total int64 var buf [4]byte n, err := w.Write(c.Name) //命名名称,nsqd根据这个名称执行相关功能 total += int64(n) if err != nil { return total, err } for _, param := range c.Params { n, err := w.Write(byteSpace) //空格 total += int64(n) if err != nil { return total, err } n, err = w.Write(param) //参数 total += int64(n) if err != nil { return total, err } } n, err = w.Write(byteNewLine) //空行\n total += int64(n) if err != nil { return total, err } //消息内容 if c.Body != nil { bufs := buf[:] binary.BigEndian.PutUint32(bufs, uint32(len(c.Body))) n, err := w.Write(bufs) //消息长度4字节 total += int64(n) if err != nil { return total, err } n, err = w.Write(c.Body) //消息内容 total += int64(n) if err != nil { return total, err } } return total, nil }
func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) { if bytes.Equal(params[0], []byte("IDENTIFY")) { return p.IDENTIFY(client, params) } err := enforceTLSPolicy(client, p, params[0]) if err != nil { return nil, err } switch { case bytes.Equal(params[0], []byte("FIN")): return p.FIN(client, params) case bytes.Equal(params[0], []byte("RDY")): return p.RDY(client, params) case bytes.Equal(params[0], []byte("REQ")): return p.REQ(client, params) case bytes.Equal(params[0], []byte("PUB")): return p.PUB(client, params) case bytes.Equal(params[0], []byte("MPUB")): return p.MPUB(client, params) case bytes.Equal(params[0], []byte("DPUB")): return p.DPUB(client, params) case bytes.Equal(params[0], []byte("NOP")): return p.NOP(client, params) case bytes.Equal(params[0], []byte("TOUCH")): return p.TOUCH(client, params) case bytes.Equal(params[0], []byte("SUB")): return p.SUB(client, params) case bytes.Equal(params[0], []byte("CLS")): return p.CLS(client, params) case bytes.Equal(params[0], []byte("AUTH")): return p.AUTH(client, params) } return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0])) }
type Channel struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms requeueCount uint64 //从新入队数量 messageCount uint64 //消息数量 timeoutCount uint64 //超时数量,已经消费,但没有反馈结果,会从新加入队列,messageCount不会自增 sync.RWMutex topicName string //topic name name string //channel name ctx *context backend BackendQueue //将消息写入磁盘的队列,维护磁盘消息的读写 memoryMsgChan chan *Message //内存消息队列,通道buffer默认10000 exitFlag int32 //退出标记,1表示退出,0没有退出 exitMutex sync.RWMutex // state tracking clients map[int64]Consumer //链接到这个topic-channel的全部client paused int32 //暂停标记,0不暂停,1暂停,暂停就不会往这个channel中copy消息 ephemeral bool //临时channel标记,临时channel不会存到文件中 deleteCallback func(*Channel) //用于从topic中删除channel deleter sync.Once // Stats tracking e2eProcessingLatencyStream *quantile.Quantile // TODO: these can be DRYd up deferredMessages map[MessageID]*pqueue.Item //延迟消息map,方便查找 deferredPQ pqueue.PriorityQueue //延迟消息队列 deferredMutex sync.Mutex inFlightMessages map[MessageID]*Message //消费中的消息map,方便查找 inFlightPQ inFlightPqueue //消费中的消息队列 inFlightMutex sync.Mutex }
// frame types const ( FrameTypeResponse int32 = 0 //响应 FrameTypeError int32 = 1 //错误 FrameTypeMessage int32 = 2 //消息 )
type inFlightPqueue []*Message inFlightPQ inFlightPqueue //按照超时时间排序的最小堆 inFlightMessages map[MessageID]*Message //保存消息
type Item struct { Value interface{} //*Message Priority int64 //执行的时间戳,单位毫秒 Index int //队列索引 } type PriorityQueue []*Item deferredPQ pqueue.PriorityQueue deferredMessages map[MessageID]*pqueue.Item deferredPQ和inFlightPQ同样,是按照时间排序的最小堆
type meta struct { Topics []struct { Name string `json:"name"` Paused bool `json:"paused"` Channels []struct { Name string `json:"name"` Paused bool `json:"paused"` } `json:"channels"` } `json:"topics"` }
服务命令 | 服务描述 |
INENTIFY | 认证 |
FIN | 消费完成 |
RDY | 指定可同时处理的消息数量 |
REQ | 消息从新加入队列 |
PUB | 发布单条消息 |
MPUB | 发布多条消息 |
DPUB | 发布单条延迟消息 |
NOP | 不作任何处理 |
TOUCH | 从新设置消息处理超时时间 |
SUB | 订阅,订阅后才能消费消息 |
CLS | 关闭中止消费 |
AUTH | 受权 |
服务名称 |
发布单条/多条消息 |
topic新增/删除/状况topic中消息/暂停/启动 |
channel新增/删除/状况topic中消息/暂停/启动 |
nsq状态信息 |
ping |
启动参数查询和修改 |
协议名称 | 默认端口 |
tcp | 4150 |
http | 4151 |
https | 4152 |
// diskQueue implements a filesystem backed FIFO queue
//原文:http://www.javashuo.com/article/p-rrzehegv-cm.html 做者:啊汉
type diskQueue struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms // run-time state (also persisted to disk) readPos int64 //已经读的位置 writePos int64 //已经写的位置 readFileNum int64 //正在读的文件编号 writeFileNum int64 //正在写的文件编号 depth int64 //没有消费的消息数量 sync.RWMutex // instantiation time metadata name string // topicName 或者 topicName + ":" + channelName dataPath string //存消息文件的目录 maxBytesPerFile int64 // currently this cannot change once created minMsgSize int32 //消息最小值 maxMsgSize int32 //消息最大值 syncEvery int64 // number of writes per fsync syncTimeout time.Duration // duration of time per fsync exitFlag int32 //退出标记 needSync bool //强制将文件缓冲区的数据写入磁盘 // keeps track of the position where we have read // (but not yet sent over readChan) nextReadPos int64 //下次读的位置 nextReadFileNum int64 //下次读的文件编号 readFile *os.File //正在读的文件 writeFile *os.File //正在写的文件 reader *bufio.Reader //读缓冲区,默认4K writeBuf bytes.Buffer //写缓冲区 // exposed via ReadChan() readChan chan []byte //读channel // internal channels writeChan chan []byte //写channel writeResponseChan chan error //写结果通知 emptyChan chan int //删除全部文件channel emptyResponseChan chan error //删除通知channel exitChan chan int //退出channel exitSyncChan chan int //退出命令同步等待channel logf AppLogFunc //写日志 }
func (d *diskQueue) fileName(fileNum int64) string { return fmt.Sprintf(path.Join(d.dataPath, "%s.diskqueue.%06d.dat"), d.name, fileNum) }