剖析nsq消息队列-目录html
分布式消息队列nsq,简单易用,去中心化的设计使nsq
更健壮,nsq
充分利用了go
语言的goroutine
和channel
来实现的消息处理,代码量也不大,读不了多久就没了。后期的文章我会把nsq
的源码分析给你们看。 主要的分析路线以下git
nsq
的总体框架结构,分析如何作到的无中心化分布式拓扑结构,如何处理的单点故障。nsq
是如何保证消息的可靠性,如何保证消息的处理,对于消息的持久化是如何处理和扩展的。nsq
是如何作的消息的负载处理,即如何把合理的、不超过客户端消费能力的状况下,把消息分发到不一样的客户端。nsq
提供的一些辅助组件。这篇帖子,介绍nsq
的主体结构,以及他是如何作到去中心化的分布式拓扑结构,如何处理的单点故障。 几个组件是须要先大概说一下 nsqd
消息队列的主体,对消息的接收,处理和把消息分发到客户端。 nsqlookupd
nsq
拓扑结构信息的管理者,有了他才能组成一个简单易用的无中心化的分布式拓扑网络结构。 go-nsq
nsq
官方的go语言客户端,基本上市面上的主流编程语言都有相应的客户端在这里 还有可视化的组件nsqadmin
和一些工具像nsq_to_file
、nsq_stat
、等等,这些在后期的帖子里会介绍github
两种方式一种是直接链接另外一种是经过nsqlookupd
进行链接算法
nsqd
是独立运行的,咱们能够直接使用部署几个nsqd
而后使用客户端直连的方式使用sql
目前资源有限,我就都在一台机器上模拟了 启动两个nsqd
编程
./nsqd -tcp-address ":8000" -http-address ":8001" -data-path=./a 复制代码
./nsqd -tcp-address ":7000" -http-address ":7001" -data-path=./b 复制代码
正常启动会有相似下面的输出bash
[nsqd] 2019/08/29 18:42:56.928345 INFO: nsqd v1.1.1-alpha (built w/go1.12.7)
[nsqd] 2019/08/29 18:42:56.928512 INFO: ID: 538
[nsqd] 2019/08/29 18:42:56.928856 INFO: NSQ: persisting topic/channel metadata to b/nsqd.dat
[nsqd] 2019/08/29 18:42:56.935797 INFO: TCP: listening on [::]:7000
[nsqd] 2019/08/29 18:42:56.935891 INFO: HTTP: listening on [::]:7001
复制代码
简单使用markdown
func main() { adds := []string{"127.0.0.1:7000", "127.0.0.1:8000"} config := nsq.NewConfig() topicName := "testTopic1" c, _ := nsq.NewConsumer(topicName, "ch1", config) testHandler := &MyTestHandler{consumer: c} c.AddHandler(testHandler) if err := c.ConnectToNSQDs(adds); err != nil { panic(err) } stats := c.Stats() if stats.Connections == 0 { panic("stats report 0 connections (should be > 0)") } stop := make(chan os.Signal) signal.Notify(stop, os.Interrupt) fmt.Println("server is running....") <-stop } type MyTestHandler struct { consumer *nsq.Consumer } func (m MyTestHandler) HandleMessage(message *nsq.Message) error { fmt.Println(string(message.Body)) return nil } 复制代码
方法 c.ConnectToNSQDs(adds)
,链接多个nsqd
服务 而后运行多个客户端实现 这时,咱们发送一个消息,网络
curl -d 'hello world 2' 'http://127.0.0.1:7001/pub?topic=testTopic1' 复制代码
nsqd会根据他的算法,把消息分配到一个客户端 客户端的输入以下框架
2019/08/30 12:05:32 INF 1 [testTopic1/ch1] (127.0.0.1:7000) connecting to nsqd 2019/08/30 12:05:32 INF 1 [testTopic1/ch1] (127.0.0.1:8000) connecting to nsqd server is running.... hello world 2 复制代码
可是这种作的话,须要客户端作一些额外的工做,须要频繁的去检查全部nsqd
的状态,若是发现出现问题须要客户端主动去处理这些问题。
我使用的客户端库是官方库 go-nsq
,使用直接连nsqd
的方式,
nsqd
出现问题,如今的处理方式,他会每隔一段时间执行一次重连操做。想去掉这个链接信息就要额外作一些处理了。nsqd
进行横向扩充,只能是本身民额外的写一些代码调用ConnectToNSQDs
或者ConnectToNSQD
方法官方推荐使用链接nsqlookupd
的方式,nsqlookupd
用于作服务的注册和发现,这样能够作到去中心化。
图中咱们运行着多个nsqd
和多个nsqlookupd
的实例,客户端去链接nsqlookupd
来操做nsqd
咱们要先启动nsqlookupd
,为了演示方便,我启动两个nsqlookupd
实例, 三个nsqd
实例
./nsqlookupd -tcp-address ":8200" -http-address ":8201" 复制代码
./nsqlookupd -tcp-address ":7200" -http-address ":7201" 复制代码
为了演示横向扩充,先启动两个,客户端链接后,再启动第三个。
./nsqd -tcp-address ":8000" -http-address ":8001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200 -data-path=./a 复制代码
./nsqd -tcp-address ":7000" -http-address ":7001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200 -data-path=./b 复制代码
--lookupd-tcp-address
用于指定lookup
的链接地址
客户端简单代码
package main import ( "fmt" "os" "os/signal" "time" "github.com/nsqio/go-nsq" ) func main() { adds := []string{"127.0.0.1:7201", "127.0.0.1:8201"} config := nsq.NewConfig() config.MaxInFlight = 1000 config.MaxBackoffDuration = 5 * time.Second config.DialTimeout = 10 * time.Second topicName := "testTopic1" c, _ := nsq.NewConsumer(topicName, "ch1", config) testHandler := &MyTestHandler{consumer: c} c.AddHandler(testHandler) if err := c.ConnectToNSQLookupds(adds); err != nil { panic(err) } stats := c.Stats() if stats.Connections == 0 { panic("stats report 0 connections (should be > 0)") } stop := make(chan os.Signal) signal.Notify(stop, os.Interrupt) fmt.Println("server is running....") <-stop } type MyTestHandler struct { consumer *nsq.Consumer } func (m MyTestHandler) HandleMessage(message *nsq.Message) error { fmt.Println(string(message.Body)) return nil } 复制代码
方法ConnectToNSQLookupds
就是用于链接nsqlookupd
的,可是须要注意的是,链接的是http
端口7201
和8201
,库go-nsq
是经过请求其中一个nsqlookupd
的 http 方法http://127.0.0.1:7201/lookup?topic=testTopic1
来获得全部提供topic=testTopic1
的nsqd
列表信息,而后对全部的nsqd进行
链接,
2019/08/30 13:47:26 INF 1 [testTopic1/ch1] querying nsqlookupd http://127.0.0.1:7201/lookup?topic=testTopic1 2019/08/30 13:47:26 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:7000) connecting to nsqd 2019/08/30 13:47:26 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) connecting to nsqd 复制代码
目前咱们已经链接了两个。 咱们演示一下橫向扩充,启动第三个nsqd
./nsqd -tcp-address ":6000" -http-address ":6001" --lookupd-tcp-address=127.0.0.1:8200 --lookupd-tcp-address=127.0.0.1:7200 -data-path=./c 复制代码
这里会有一个问题,当我启动了一个亲的nsqd
可是他的topic是空的,咱们需指定这新的nsqd
处理哪些topic。 咱们能够用nsqadmin
查看全部的topic
./nsqadmin --lookupd-http-address=127.0.0.1:8201 --lookupd-http-address=127.0.0.1:7201
复制代码
而后去你的nsqd
上去建topic
curl -X POST 'http://127.0.0.1:6001/topic/create?topic=testTopic1' 复制代码
固然也能够本身写一些自动化的角本 查看客户端的日志输出
2019/08/30 14:56:01 INF 1 [testTopic1/ch1] querying nsqlookupd http://127.0.0.1:7201/lookup?topic=testTopic1 2019/08/30 14:56:01 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:6000) connecting to nsqd 复制代码
已经连上咱们的新nsqd
了
我手动关闭一个nsqd
实例 客户端的日志输出已经断开了链接
2019/08/30 15:04:20 ERR 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) IO error - EOF 2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) beginning close 2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) readLoop exiting 2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) breaking out of writeLoop 2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) writeLoop exiting 2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) finished draining, cleanup exiting 2019/08/30 15:04:20 INF 1 [testTopic1/ch1] (li-peng-mc-macbook.local:8000) clean close complete 2019/08/30 15:04:20 WRN 1 [testTopic1/ch1] there are 2 connections left alive 复制代码
而且nsqd
和nsqlookupd
也断开了链接,客户端再次从nsqlookupd
取全部的nsqd
的地址时获得的老是可用的地址。
nsqlookupd
用于管理整个网络拓扑结构,nsqd用他实现服务的注册,客户端使用他获得全部的nsqd服务节点信息,而后全部的consumer端链接 实现原理以下,
nsqd
把本身的服务信息广播给一个或者多个nsqlookupd
客户端
链接一个或者多个nsqlookupd
,经过nsqlookupd
获得全部的nsqd
的链接信息,进行链接消费,nsqd
出现问题,down机了,会和nsqlookupd
断开,这样客户端
从nsqlookupd
获得的nsqd
的列表永远是可用的。客户端
链接的是全部的nsqd
,一个出问题了就用其余的链接,因此也不会受影响。