经过c:=nsq.NewConsumer(...)方式建立消费者markdown
(1)给消费者c增长异步处理器handler: c.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message)))异步
(2)AddHandler方法中调用AddConcurrentHandlers()tcp
(3)AddConcurrentHandlers()中开启一个goroutine,调用handlerLoop(handler), handlerLoop中开启无限循环,经过无缓冲通道incomingMessages,阻塞监听最新消息, 获取到消息,传递给回调handler.HandleMessage(message)处理oop
func (r *Consumer) AddHandler(handler Handler) {
r.AddConcurrentHandlers(handler, 1)
}
func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) {
for i := 0; i < concurrency; i++ {
go r.handlerLoop(handler)
}
}
func (r *Consumer) handlerLoop(handler Handler) {
for {
message, ok := <-r.incomingMessages
if !ok {
goto exit
}
if r.shouldFailMessage(message, handler) {
message.Finish()
continue
}
err := handler.HandleMessage(message)
...
}
复制代码
调用c.ConnectToNSQLookupd(v),该方法中会调用queryLookupd()->ConnectToNSQD()spa
func (r *Consumer) ConnectToNSQLookupd(addr string) error {
...
if numLookupd == 1 {
r.queryLookupd()
r.wg.Add(1)
go r.lookupdLoop()
}
return nil
}
func (r *Consumer) queryLookupd() {
..
for _, addr := range nsqdAddrs {
err = r.ConnectToNSQD(addr)
...
}
}
复制代码
在ConnectToNSQD()中咱们能够看到,会经过conn.Connect()创建跟nsqd的TCP链接code
func (r *Consumer) ConnectToNSQD(addr string) error {
...
resp, err := conn.Connect()
...
}
复制代码
在Connect会开启一个goroutine,在readLoop方法中无限循环的监听消息的到来orm
func (c *Conn) Connect() (*IdentifyResponse, error) {
dialer := &net.Dialer{
LocalAddr: c.config.LocalAddr,
Timeout: c.config.DialTimeout,
}
conn, err := dialer.Dial("tcp", c.addr)
if err != nil {
return nil, err
}
c.conn = conn.(*net.TCPConn)
...
go c.readLoop()
go c.writeLoop()
return resp, nil
}
复制代码
当收到消息后,交给c.delegate.OnMessage()方法处理,在该方法中,会把消息发送给无缓冲消息通道incomingMessages,这样就完成了整个接收消息的逻辑.string
func (c *Conn) readLoop() {
delegate := &connMessageDelegate{c}
for {
...
frameType, data, err := ReadUnpackedResponse(c)
switch frameType {
...
case FrameTypeMessage:
msg, err := DecodeMessage(data)
...
c.delegate.OnMessage(c, msg)
...
}
...
}
func (r *Consumer) onConnMessage(c *Conn, msg *Message) {
...
r.incomingMessages <- msg
...
}
复制代码