目录html
更新、更全的《Go从入门到放弃》的更新网站,更有python、go、人工智能教学等着你:http://www.javashuo.com/article/p-mxrjjcnn-hn.htmlnode
NSQ是目前比较流行的一个分布式的消息队列,本文主要介绍了NSQ及Go语言如何操做NSQ。python
NSQ是Go语言编写的一个开源的实时分布式内存消息队列,其性能十分优异。 NSQ的优点有如下优点:git
一般来讲,消息队列都适用如下场景。github
参照下图利用消息队列把业务流程中的非关键流程异步化,从而显著下降业务请求的响应时间。 sql
经过使用消息队列将不一样的业务逻辑解耦,下降系统间的耦合,提升系统的健壮性。后续有其余业务要使用订单数据可直接订阅消息队列,提升系统的灵活性。 后端
相似秒杀(大秒)等场景下,某一时间可能会产生大量的请求,使用消息队列可以为后端处理请求提供必定的缓冲区,保证后端服务的稳定性。 浏览器
官方下载页面根据本身的平台下载并解压便可。bash
nsqd是一个守护进程,它接收、排队并向客户端发送消息。服务器
启动nsqd
,指定-broadcast-address=127.0.0.1
来配置广播地址
./nsqd -broadcast-address=127.0.0.1
若是是在搭配nsqlookupd
使用的模式下须要还指定nsqlookupd
地址:
./nsqd -broadcast-address=127.0.0.1 -lookupd-tcp-address=127.0.0.1:4160
若是是部署了多个nsqlookupd
节点的集群,那还能够指定多个-lookupd-tcp-address
。
nsqdq
相关配置项以下:
-auth-http-address value <addr>:<port> to query auth server (may be given multiple times) -broadcast-address string address that will be registered with lookupd (defaults to the OS hostname) (default "PROSNAKES.local") -config string path to config file -data-path string path to store disk-backed messages -deflate enable deflate feature negotiation (client compression) (default true) -e2e-processing-latency-percentile value message processing time percentiles (as float (0, 1.0]) to track (can be specified multiple times or comma separated '1.0,0.99,0.95', default none) -e2e-processing-latency-window-time duration calculate end to end latency quantiles for this duration of time (ie: 60s would only show quantile calculations from the past 60 seconds) (default 10m0s) -http-address string <addr>:<port> to listen on for HTTP clients (default "0.0.0.0:4151") -http-client-connect-timeout duration timeout for HTTP connect (default 2s) -http-client-request-timeout duration timeout for HTTP request (default 5s) -https-address string <addr>:<port> to listen on for HTTPS clients (default "0.0.0.0:4152") -log-prefix string log message prefix (default "[nsqd] ") -lookupd-tcp-address value lookupd TCP address (may be given multiple times) -max-body-size int maximum size of a single command body (default 5242880) -max-bytes-per-file int number of bytes per diskqueue file before rolling (default 104857600) -max-deflate-level int max deflate compression level a client can negotiate (> values == > nsqd CPU usage) (default 6) -max-heartbeat-interval duration maximum client configurable duration of time between client heartbeats (default 1m0s) -max-msg-size int maximum size of a single message in bytes (default 1048576) -max-msg-timeout duration maximum duration before a message will timeout (default 15m0s) -max-output-buffer-size int maximum client configurable size (in bytes) for a client output buffer (default 65536) -max-output-buffer-timeout duration maximum client configurable duration of time between flushing to a client (default 1s) -max-rdy-count int maximum RDY count for a client (default 2500) -max-req-timeout duration maximum requeuing timeout for a message (default 1h0m0s) -mem-queue-size int number of messages to keep in memory (per topic/channel) (default 10000) -msg-timeout string duration to wait before auto-requeing a message (default "1m0s") -node-id int unique part for message IDs, (int) in range [0,1024) (default is hash of hostname) (default 616) -snappy enable snappy feature negotiation (client compression) (default true) -statsd-address string UDP <addr>:<port> of a statsd daemon for pushing stats -statsd-interval string duration between pushing to statsd (default "1m0s") -statsd-mem-stats toggle sending memory and GC stats to statsd (default true) -statsd-prefix string prefix used for keys sent to statsd (%s for host replacement) (default "nsq.%s") -sync-every int number of messages per diskqueue fsync (default 2500) -sync-timeout duration duration of time per diskqueue fsync (default 2s) -tcp-address string <addr>:<port> to listen on for TCP clients (default "0.0.0.0:4150") -tls-cert string path to certificate file -tls-client-auth-policy string client certificate auth policy ('require' or 'require-verify') -tls-key string path to key file -tls-min-version value minimum SSL/TLS version acceptable ('ssl3.0', 'tls1.0', 'tls1.1', or 'tls1.2') (default 769) -tls-required require TLS for client connections (true, false, tcp-https) -tls-root-ca-file string path to certificate authority file -verbose enable verbose logging -version print version string -worker-id do NOT use this, use --node-id
nsqlookupd是维护全部nsqd状态、提供服务发现的守护进程。它能为消费者查找特定topic
下的nsqd提供了运行时的自动发现服务。 它不维持持久状态,也不须要与任何其余nsqlookupd实例协调以知足查询。所以根据你系统的冗余要求尽量多地部署nsqlookupd
节点。它们小豪的资源不多,能够与其余服务共存。咱们的建议是为每一个数据中心运行至少3个集群。
nsqlookupd
相关配置项以下:
-broadcast-address string address of this lookupd node, (default to the OS hostname) (default "PROSNAKES.local") -config string path to config file -http-address string <addr>:<port> to listen on for HTTP clients (default "0.0.0.0:4161") -inactive-producer-timeout duration duration of time a producer will remain in the active list since its last ping (default 5m0s) -log-prefix string log message prefix (default "[nsqlookupd] ") -tcp-address string <addr>:<port> to listen on for TCP clients (default "0.0.0.0:4160") -tombstone-lifetime duration duration of time a producer will remain tombstoned if registration remains (default 45s) -verbose enable verbose logging -version print version string
一个实时监控集群状态、执行各类管理任务的Web管理平台。 启动nsqadmin
,指定nsqlookupd
地址:
./nsqadmin -lookupd-http-address=127.0.0.1:4161
咱们可使用浏览器打开http://127.0.0.1:4171/
访问以下管理界面。
nsqadmin
相关的配置项以下:
-allow-config-from-cidr string A CIDR from which to allow HTTP requests to the /config endpoint (default "127.0.0.1/8") -config string path to config file -graphite-url string graphite HTTP address -http-address string <addr>:<port> to listen on for HTTP clients (default "0.0.0.0:4171") -http-client-connect-timeout duration timeout for HTTP connect (default 2s) -http-client-request-timeout duration timeout for HTTP request (default 5s) -http-client-tls-cert string path to certificate file for the HTTP client -http-client-tls-insecure-skip-verify configure the HTTP client to skip verification of TLS certificates -http-client-tls-key string path to key file for the HTTP client -http-client-tls-root-ca-file string path to CA file for the HTTP client -log-prefix string log message prefix (default "[nsqadmin] ") -lookupd-http-address value lookupd HTTP address (may be given multiple times) -notification-http-endpoint string HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sent -nsqd-http-address value nsqd HTTP address (may be given multiple times) -proxy-graphite proxy HTTP requests to graphite -statsd-counter-format string The counter stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string. (default "stats.counters.%s.count") -statsd-gauge-format string The gauge stats key formatting applied by the implementation of statsd. If no formatting is desired, set this to an empty string. (default "stats.gauges.%s") -statsd-interval duration time interval nsqd is configured to push to statsd (must match nsqd) (default 1m0s) -statsd-prefix string prefix used for keys sent to statsd (%s for host replacement, must match nsqd) (default "nsq.%s") -version print version string
每一个nsqd实例旨在一次处理多个数据流。这些数据流称为“topics”
,一个topic
具备1个或多个“channels”
。每一个channel
都会收到topic
全部消息的副本,实际上下游的服务是经过对应的channel
来消费topic
消息。
topic
和channel
不是预先配置的。topic
在首次使用时建立,方法是将其发布到指定topic
,或者订阅指定topic
上的channel
。channel
是经过订阅指定的channel
在第一次使用时建立的。
topic
和channel
都相互独立地缓冲数据,防止缓慢的消费者致使其余chennel
的积压(一样适用于topic
级别)。
channel
能够而且一般会链接多个客户端。假设全部链接的客户端都处于准备接收消息的状态,则每条消息将被传递到随机客户端。例如:
总而言之,消息是从
topic -> channel
(每一个channel接收该topic的全部消息的副本)多播的,可是从channel -> consumers
均匀分布(每一个消费者接收该channel的一部分消息)。
--mem-queue-size
设置为0,全部的消息将会存储到磁盘。官方提供了Go语言版的客户端:go-nsq,更多客户端支持请查看CLIENT LIBRARIES。
go get -u github.com/nsqio/go-nsq
一个简单的生产者示例代码以下:
// nsq_producer/main.go package main import ( "bufio" "fmt" "os" "strings" "github.com/nsqio/go-nsq" ) // NSQ Producer Demo var producer *nsq.Producer // 初始化生产者 func initProducer(str string) (err error) { config := nsq.NewConfig() producer, err = nsq.NewProducer(str, config) if err != nil { fmt.Printf("create producer failed, err:%v\n", err) return err } return nil } func main() { nsqAddress := "127.0.0.1:4150" err := initProducer(nsqAddress) if err != nil { fmt.Printf("init producer failed, err:%v\n", err) return } reader := bufio.NewReader(os.Stdin) // 从标准输入读取 for { data, err := reader.ReadString('\n') if err != nil { fmt.Printf("read string from stdin failed, err:%v\n", err) continue } data = strings.TrimSpace(data) if strings.ToUpper(data) == "Q" { // 输入Q退出 break } // 向 'topic_demo' publish 数据 err = producer.Publish("topic_demo", []byte(data)) if err != nil { fmt.Printf("publish msg to nsq failed, err:%v\n", err) continue } } }
将上面的代码编译执行,而后在终端输入两条数据123
和456
:
$ ./nsq_producer 123 2018/10/22 18:41:20 INF 1 (127.0.0.1:4150) connecting to nsqd 456
使用浏览器打开http://127.0.0.1:4171/
能够查看到相似下面的页面: 在下面这个页面能看到当前的topic
信息:
点击页面上的topic_demo
就能进入一个展现更多详细信息的页面,在这个页面上咱们能够查看和管理topic
,同时可以看到目前在LWZMBP:4151 (127.0.01:4151)
这个nsqd
上有2条message。又由于没有消费者接入因此暂时没有建立channel
。
在/nodes
这个页面咱们可以很方便的查看当前接入lookupd
的nsqd
节点。
这个/counter
页面显示了处理的消息数量,由于咱们没有接入消费者,因此处理的消息数量为0。
在/lookup
界面支持建立topic
和channel
。
一个简单的消费者示例代码以下:
// nsq_consumer/main.go package main import ( "fmt" "os" "os/signal" "syscall" "time" "github.com/nsqio/go-nsq" ) // NSQ Consumer Demo // MyHandler 是一个消费者类型 type MyHandler struct { Title string } // HandleMessage 是须要实现的处理消息的方法 func (m *MyHandler) HandleMessage(msg *nsq.Message) (err error) { fmt.Printf("%s recv from %v, msg:%v\n", m.Title, msg.NSQDAddress, string(msg.Body)) return } // 初始化消费者 func initConsumer(topic string, channel string, address string) (err error) { config := nsq.NewConfig() config.LookupdPollInterval = 15 * time.Second c, err := nsq.NewConsumer(topic, channel, config) if err != nil { fmt.Printf("create consumer failed, err:%v\n", err) return } consumer := &MyHandler{ Title: "沙河1号", } c.AddHandler(consumer) // if err := c.ConnectToNSQD(address); err != nil { // 直接连NSQD if err := c.ConnectToNSQLookupd(address); err != nil { // 经过lookupd查询 return err } return nil } func main() { err := initConsumer("topic_demo", "first", "127.0.0.1:4161") if err != nil { fmt.Printf("init consumer failed, err:%v\n", err) return } c := make(chan os.Signal) // 定义一个信号的通道 signal.Notify(c, syscall.SIGINT) // 转发键盘中断信号到c <-c // 阻塞 }
将上面的代码保存以后编译执行,就可以获取以前咱们publish的两条消息了:
$ ./nsq_consumer 2018/10/22 18:49:06 INF 1 [topic_demo/first] querying nsqlookupd http://127.0.0.1:4161/lookup?topic=topic_demo 2018/10/22 18:49:06 INF 1 [topic_demo/first] (127.0.0.1:4150) connecting to nsqd 沙河1号 recv from 127.0.0.1:4150, msg:123 沙河1号 recv from 127.0.0.1:4150, msg:456
同时在nsqadmin的/counter
页面查看处处理的数据数量为2。
关于go-nsq
的更多内容请阅读go-nsq的官方文档。