上篇文章已经了解了消息中间件相关的知识,这篇文章学习一下Golang语言编写的知名消息中间件NSQ。html
nsq最初是由bitly公司开源出来的一款简单易用的消息中间件,它可用于大规模系统中的实时消息服务,而且天天可以处理数亿级别的消息。它有如下特性:git
如今开始体验它~github
首先安装它,我在Mac上用Homebrew安装:golang
❯ brew install nsq
复制代码
nsq一共有四种组件sql
nsqlookupd是负责管理拓扑信息并提供最终一致性的发现服务的守护进程(daemon)。在终端1启动它:浏览器
❯ nsqlookupd
[nsqlookupd] 2019/07/18 11:42:16.876296 INFO: nsqlookupd v1.1.0 (built w/go1.11)
[nsqlookupd] 2019/07/18 11:42:16.876864 INFO: HTTP: listening on [::]:4161
[nsqlookupd] 2019/07/18 11:42:16.876868 INFO: TCP: listening on [::]:4160
复制代码
默认HTTP接口监听4161,TCP接口监听4160。bash
nsqd是一个负责接收、排队、投递消息给客户端的守护进程。客户端经过查询 nsqlookupd 来发现指定话题(topic)的nsqd生产者,nsqd节点会广播话题(topic)和通道(channel)信息。数据流模型以下:负载均衡
单个nsqd能够有多个topic,每一个topic能够有多个channel。channel接收这个topic全部消息的副本,从而实现多播分发,而channel上的每一个消息被分发给它的订阅者,从而实现负载均衡。运维
在终端2启动nsqd:curl
❯ nsqd --lookupd-tcp-address=127.0.0.1:4160
...
[nsqd] 2019/07/18 11:47:46.427184 INFO: HTTP: listening on [::]:4151
[nsqd] 2019/07/18 11:47:46.427195 INFO: TCP: listening on [::]:4150
[nsqd] 2019/07/18 11:47:46.427203 INFO: LOOKUP(127.0.0.1:4160): adding peer
[nsqd] 2019/07/18 11:47:46.427355 INFO: LOOKUP connecting to 127.0.0.1:4160
...
复制代码
nsqd经过tcp端口链接到了nsqlookupd,它本身在4151接受HTTP请求,在4150接受TCP请求。
nsqadmin 是一套WEB管理UI,用来聚集集群的实时统计,并执行不一样的管理任务。在终端3启动它:
❯ nsqadmin --lookupd-http-address=127.0.0.1:4161
[nsqadmin] 2019/07/18 11:54:23.125392 INFO: nsqadmin v1.1.0 (built w/go1.11)
[nsqadmin] 2019/07/18 11:54:23.128755 INFO: HTTP: listening on [::]:4171
复制代码
浏览器打开http://localhost:4171
就能访问了,须要注意,管理UI能够按需启动。
安装nsq后会增长nsq_stat/nsq_tail/nsq_to_file等功能工具,这些实用程序以数据流的形式提供了通用功能和内部检查,稍后能体验到。
❯ curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test' # 在终端4执行
OK # 发布消息到nsqd,用Rest API完成,看参数表示话题是test。因为尚未test这个话题,会先建立话题再接收消息。
❯ nsq_tail --lookupd-http-address=127.0.0.1:4161 --topic=test # 在终端5执行,一会再来看
# 回到终端4
❯ curl -d 'hello world 2' 'http://127.0.0.1:4151/pub?topic=test'
OK
~
❯ curl -d 'hello world 3' 'http://127.0.0.1:4151/pub?topic=test'
OK
~
# 回到终端5
❯ nsq_tail --lookupd-http-address=127.0.0.1:4161 --topic=test
2019/07/18 12:06:24 Adding consumer for topic: test
...
hello world 2
hello world 3
复制代码
回到终端5,能够看到接受(消费)了启动nsq_tail后发布的2个消息,这就是功能工具的做用,另外如nsq_to_file
是把消息发到文件。
首先安装go-nsq:
❯ go get github.com/nsqio/go-nsq
复制代码
先看生产者:
package main
import (
"github.com/nsqio/go-nsq"
"log"
"math/rand"
"time"
)
func main() {
config := nsq.NewConfig()
w, err := nsq.NewProducer("127.0.0.1:4150", config)
if err != nil {
log.Panic(err)
}
chars := []byte("ABCDEFGHIJKLMNOPQRSTUVWXYZ")
for {
buf := make([]byte, 4)
for i := 0; i < 4; i++ {
buf[i] = chars[rand.Intn(len(chars))]
}
log.Printf("Pub: %s", buf)
err = w.Publish("test", buf)
if err != nil {
log.Panic(err)
}
time.Sleep(time.Second * 1)
}
w.Stop()
}
复制代码
NewProducer的第一个参数就是nsqd的地址,在这里作了个无限for循环,每次随机4个byte发布到test话题里面。
接着看消费者代码:
package main
import (
"log"
"sync"
"github.com/nsqio/go-nsq"
)
func main() {
wg := &sync.WaitGroup{}
wg.Add(1000)
config := nsq.NewConfig()
q, _ := nsq.NewConsumer("test", "ch", config)
q.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
log.Printf("Got a message: %s", message.Body)
wg.Done()
return nil
}))
err := q.ConnectToNSQD("127.0.0.1:4150")
if err != nil {
log.Panic(err)
}
wg.Wait()
}
复制代码
一开始经过sync.WaitGroup安排了1000个待执行的等待组,NewConsumer的第一个参数是话题test,第二是通道名字,而后用AddHandler添加一个消费处理函数,在处理函数中会打印这个消息。
如今就能够体验了,首先启动消费者,再启动发布者。
❯ go run consumer.go
2019/07/18 15:29:29 INF 1 [test/ch] (127.0.0.1:4150) connecting to nsqd
2019/07/18 15:29:37 Got a message: ZGBA
2019/07/18 15:29:38 Got a message: ICMR
2019/07/18 15:29:39 Got a message: AJWW
2019/07/18 15:29:40 Got a message: HTHC
2019/07/18 15:29:41 Got a message: TCUA
...
❯ go run producer.go
2019/07/18 15:29:36 INF 1 (127.0.0.1:4150) connecting to nsqd
2019/07/18 15:29:37 Pub: ZGBA
2019/07/18 15:29:38 Pub: ICMR
2019/07/18 15:29:39 Pub: AJWW
2019/07/18 15:29:40 Pub: HTHC
2019/07/18 15:29:41 Pub: TCUA
...
复制代码
原文地址: strconv.com/posts/use-n…
完整代码能够在这个地址找到。