nsq
最初是由 bitly
公司开源出来的一款简单易用的分布式消息中间件,它可用于大规模系统中的实时消息服务,而且天天可以处理数亿级别的消息。html
它具备如下特性:git
分布式。它提供了分布式的、去中心化且没有单点故障的拓扑结构,稳定的消息传输发布保障,可以具备高容错和高可用特性。github
易于扩展。它支持水平扩展,没有中心化的消息代理(Broker
),内置的发现服务让集群中增长节点很是容易。sql
运维方便。它很是容易配置和部署,灵活性高。docker
高度集成。如今已经有官方的Golang
、Python
和JavaScript
客户端,社区也有了其余各个语言的客户端库方便接入,自定义客户端也很是容易。bash
下载对应的二进制可执行文件,在本地按照上述步骤就能够跑起来了,看下nsqadmin
后台展现以下: 运维
参考官方提供资料建立:docker-compose.ymltcp
version: '2' # 高版本支持3
services:
nsqlookupd:
image: nsqio/nsq
command: /nsqlookupd
ports:
- "4160:4160" # tcp
- "4161:4161" # http
nsqd:
image: nsqio/nsq
# 广播地址不填的话默认就是oshostname(或虚拟机名称),那样 lookupd 会链接不上,因此直接写IP
command: /nsqd --broadcast-address=10.236.92.208 --lookupd-tcp-address=nsqlookupd:4160
depends_on:
- nsqlookupd
ports:
- "4150:4150" # tcp
- "4151:4151" # http
nsqadmin:
image: nsqio/nsq
command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
depends_on:
- nsqlookupd
ports:
- "4171:4171" # http
复制代码
执行 docker-compose up -d
生成对应的三个容器:分布式
nsqgo_nsqd_1
nsqgo_nsqlookupd_1
nsqgo_nsqadmin_1
建立生产者:producer.go
package main
import (
"fmt"
"log"
"time"
"github.com/nsqio/go-nsq"
)
func main() {
config := nsq.NewConfig()
p, err := nsq.NewProducer("127.0.0.1:4150", config)
if err != nil {
log.Panic(err)
}
for i := 0; i < 1000; i++ {
msg := fmt.Sprintf("num-%d", i)
log.Println("Pub:" + msg)
err = p.Publish("testTopic", []byte(msg))
if err != nil {
log.Panic(err)
}
time.Sleep(time.Second * 1)
}
p.Stop()
}
复制代码
循环写 1000 个 num-1--1000,经过 p.Publish
发送到消息队列中,等待消费。
建立消费者:consumer.go
package main
import (
"log"
"sync"
"github.com/nsqio/go-nsq"
)
func main() {
wg := &sync.WaitGroup{}
wg.Add(1000)
config := nsq.NewConfig()
c, _ := nsq.NewConsumer("testTopic", "ch", config)
c.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
log.Printf("Got a message: %s", message.Body)
wg.Done()
return nil
}))
// 1.直连nsqd
// err := c.ConnectToNSQD("127.0.0.1:4150")
// 2.经过 nsqlookupd 服务发现
err := c.ConnectToNSQLookupd("127.0.0.1:4161")
if err != nil {
log.Panic(err)
}
wg.Wait()
}
复制代码
可经过两种方式与 nsqd
链接:
1). 直连 nsqd
,适用于单机(standalone
)版;
2). 经过 nsqlookupd
服务发现,适用于集群(cluster
)版;
输出结果:
go run producer.go
2019/08/18 20:29:51 Pub:num-0
2019/08/18 20:29:51 INF 1 (127.0.0.1:4150) connecting to nsqd
2019/08/18 20:29:52 Pub:num-1
2019/08/18 20:29:53 Pub:num-2
2019/08/18 20:29:54 Pub:num-3
2019/08/18 20:29:55 Pub:num-4
2019/08/18 20:29:56 Pub:num-5
2019/08/18 20:29:57 Pub:num-6
2019/08/18 20:29:58 Pub:num-7
2019/08/18 20:29:59 Pub:num-8
2019/08/18 20:30:00 Pub:num-9
2019/08/18 20:30:01 Pub:num-10
复制代码
go run consumer.go
2019/08/18 20:30:08 INF 1 [testTopic/ch] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=testTopic
2019/08/18 20:30:08 INF 1 [testTopic/ch] (10.236.92.208:4150) connecting to nsqd
2019/08/18 20:30:08 Got a message: num-0
2019/08/18 20:30:08 Got a message: num-1
2019/08/18 20:30:08 Got a message: num-2
2019/08/18 20:30:08 Got a message: num-3
2019/08/18 20:30:08 Got a message: num-4
2019/08/18 20:30:08 Got a message: num-5
2019/08/18 20:30:08 Got a message: num-6
2019/08/18 20:30:08 Got a message: num-7
2019/08/18 20:30:08 Got a message: num-8
2019/08/18 20:30:08 Got a message: num-9
2019/08/18 20:30:08 Got a message: num-10
复制代码
github 源码下载:
【小结】
a. 单个nsqd能够有多个topic,每一个topic能够有多个channel。channel接收这个topic全部消息的副本,从而实现多播分发,而channel上的每一个消息被均匀的分发给它的订阅者,从而实现负载均衡;
b. nsq 专门为分布式、集群化而生,在处理 SPOF(single point of failure, 单点故障)、高可用、最终一致性方面颇有优点。