最近在研究一些消息中间件,经常使用的MQ如RabbitMQ,ActiveMQ,Kafka等。NSQ是一个基于Go语言的分布式实时消息平台,它基于MIT开源协议发布,由bitly公司开源出来的一款简单易用的消息中间件。 官方和第三方还为NSQ开发了众多客户端功能库,如官方提供的基于HTTP的nsqd、Go客户端go-nsq、Python客户端pynsq、基于Node.js的JavaScript客户端nsqjs、异步C客户端libnsq、Java客户端nsq-java以及基于各类语言的众多第三方客户端功能库。java
1). DistributedNSQ提供了分布式的,去中心化,且没有单点故障的拓扑结构,稳定的消息传输发布保障,可以具备高容错和HA(高可用)特性。2). Scalable易于扩展NSQ支持水平扩展,没有中心化的brokers。内置的发现服务简化了在集群中增长节点。同时支持pub-sub和load-balanced 的消息分发。3). Ops FriendlyNSQ很是容易配置和部署,生来就绑定了一个管理界面。二进制包没有运行时依赖。官方有Docker image。4.Integrated高度集成官方的 Go 和 Python库都有提供。并且为大多数语言提供了库。node
Topic :一个topic就是程序发布消息的一个逻辑键,当程序第一次发布消息时就会建立topic。 sql
Channels :channel与消费者相关,是消费者之间的负载均衡,channel在某种意义上来讲是一个“队列”。每当一个发布者发送一条消息到一个topic,消息会被复制到全部消费者链接的channel上,消费者经过这个特殊的channel读取消息,实际上,在消费者第一次订阅时就会建立channel。Channel会将消息进行排列,若是没有消费者读取消息,消息首先会在内存中排队,当量太大时就会被保存到磁盘中。 服务器
Messages:消息构成了咱们数据流的中坚力量,消费者能够选择结束消息,代表它们正在被正常处理,或者从新将他们排队待到后面再进行处理。每一个消息包含传递尝试的次数,当消息传递超过必定的阀值次数时,咱们应该放弃这些消息,或者做为额外消息进行处理。 markdown
nsqd:nsqd 是一个守护进程,负责接收,排队,投递消息给客户端。它能够独立运行,不过一般它是由 nsqlookupd 实例所在集群配置的(它在这能声明 topics 和 channels,以便你们能找到)。 网络
nsqlookupd:nsqlookupd 是守护进程负责管理拓扑信息。客户端经过查询 nsqlookupd 来发现指定话题(topic)的生产者,而且 nsqd 节点广播话题(topic)和通道(channel)信息。有两个接口:TCP 接口,nsqd 用它来广播。HTTP 接口,客户端用它来发现和管理。 架构
nsqadmin:nsqadmin 是一套 WEB UI,用来聚集集群的实时统计,并执行不一样的管理任务。 经常使用工具类: 负载均衡
nsq_to _file:消费指定的话题(topic)/通道(channel),并写到文件中,有选择的滚动和/或压缩文件。 curl
nsq_to _http:消费指定的话题(topic)/通道(channel)和执行 HTTP requests (GET/POST) 到指定的端点。 异步
nsq_to _nsq:消费者指定的话题/通道和重发布消息到目的地 nsqd 经过 TCP。
NSQ推荐经过他们相应的nsqd实例使用协同定位发布者,这意味着即便面对网络分区,消息也会被保存在本地,直到它们被一个消费者读取。更重要的是,发布者没必要去发现其余的nsqd节点,他们老是能够向本地实例发布消息。
首先,一个发布者向它的本地nsqd发送消息,要作到这点,首先要先打开一个链接,而后发送一个包含topic和消息主体的发布命令,在这种状况下,咱们将消息发布到事件topic上以分散到咱们不一样的worker中。 事件topic会复制这些消息而且在每个链接topic的channel上进行排队,在咱们的案例中,有三个channel,它们其中之一做为档案channel。消费者会获取这些消息而且上传到S3。
每一个channel的消息都会进行排队,直到一个worker把他们消费,若是此队列超出了内存限制,消息将会被写入到磁盘中。Nsqd节点首先会向nsqlookup广播他们的位置信息,一旦它们注册成功,worker将会从nsqlookup服务器节点上发现全部包含事件topic的nsqd节点。
而后每一个worker向每一个nsqd主机进行订阅操做,用于代表worker已经准备好接受消息了。这里咱们不须要一个完整的连通图,但咱们必需要保证每一个单独的nsqd实例拥有足够的消费者去消费它们的消息,不然channel会被队列堆着。
NSQ 保证消息将交付至少一次,虽然消息多是重复的。消费者应该关注到这一点,删除重复数据或执行idempotent等操做。 这个担保是做为协议和工做流的一部分,工做原理以下(假设客户端成功链接并订阅一个话题): 1)客户表示已经准备好接收消息 2)NSQ 发送一条消息,并暂时将数据存储在本地(在 re-queue 或 timeout) 3)客户端回复 FIN(结束)或 REQ(从新排队)分别指示成功或失败。若是客户端没有回复, NSQ 会在设定的时间超时,自动从新排队消息 这确保了消息丢失惟一可能的状况是不正常结束 nsqd 进程。在这种状况下,这是在内存中的任何信息(或任何缓冲未刷新到磁盘)都将丢失。 如何防止消息丢失是最重要的,即便是这个意外状况能够获得缓解。一种解决方案是构成冗余 nsqd对(在不一样的主机上)接收消息的相同部分的副本。由于你实现的消费者是幂等的,以两倍时间处理这些消息不会对下游形成影响,并使得系统可以承受任何单一节点故障而不会丢失信息。
单个 nsqd 实例被设计成能够同时处理多个数据流。流被称为“话题”和话题有 1 个或多个“通道”。每一个通道都接收到一个话题中全部消息的拷贝。在实践中,一个通道映射到下行服务消费一个话题。 话题和通道都没有预先配置。话题由第一次发布消息到命名的话题或第一次经过订阅一个命名话题来建立。通道被第一次订阅到指定的通道建立。话题和通道的全部缓冲的数据相互独立,防止缓慢消费者形成对其余通道的积压(一样适用于话题级别)。 一个通道通常会有多个客户端链接。假设全部已链接的客户端处于准备接收消息的状态,每一个消息将被传递到一个随机的客户端。nsqlookupd,它提供了一个目录服务,消费者能够查找到提供他们感兴趣订阅话题的 nsqd 地址 。在配置方面,把消费者与生产者解耦开(它们都分别只须要知道哪里去链接 nsqlookupd 的共同实例,而不是对方),下降复杂性和维护。 在更底的层面,每一个 nsqd 有一个与 nsqlookupd 的长期 TCP 链接,按期推进其状态。这个数据被 nsqlookupd 用于给消费者通知 nsqd 地址。对于消费者来讲,一个暴露的 HTTP /lookup 接口用于轮询。为话题引入一个新的消费者,只需启动一个配置了 nsqlookup 实例地址的 NSQ 客户端。无需为添加任何新的消费者或生产者更改配置,大大下降了开销和复杂性。
NSQ被设计以分布的方式被使用。nsqd 客户端(经过 TCP )链接到指定话题的全部生产者实例。没有中间人,没有消息代理,也没有单点故障。 这种拓扑结构消除单链,聚合,反馈。相反,你的消费者直接访问全部生产者。从技术上讲,哪一个客户端链接到哪一个 NSQ 不重要,只要有足够的消费者链接到全部生产者,以知足大量的消息,保证全部东西最终将被处理。对于 nsqlookupd,高可用性是经过运行多个实例来实现。他们不直接相互通讯和数据被认为是最终一致。消费者轮询全部的配置的 nsqlookupd 实例和合并 response。失败的,没法访问的,或以其余方式故障的节点不会让系统陷于停顿。
对于数据的协议,经过推送数据到客户端最大限度地提升性能和吞吐量的,而不是等待客户端拉数据。这个概念,称之为 RDY 状态,基本上是客户端流量控制的一种形式。 当客户端链接到 nsqd 和并订阅到一个通道时,它被放置在一个 RDY 为 0 状态。这意味着,尚未信息被发送到客户端。当客户端已准备好接收消息发送,更新它的命令 RDY 状态到它准备处理的数量,好比 100。无需任何额外的指令,当 100 条消息可用时,将被传递到客户端(服务器端为那个客户端每次递减 RDY 计数)。客户端库的被设计成在 RDY 数达到配置 max-in-flight 的 25% 发送一个命令来更新 RDY 计数(并适当考虑链接到多个 nsqd 状况下,适当地分配)。
NSQ 的 TCP 协议是面向 push 的。在创建链接,握手,和订阅后,消费者被放置在一个为 0 的 RDY 状态。当消费者准备好接收消息,它更新的 RDY 状态到准备接收消息的数量。NSQ 客户端库不断在幕后管理,消息控制流的结果。每隔一段时间,nsqd 将发送一个心跳线链接。客户端能够配置心跳之间的间隔,但 nsqd 会期待一个回应在它发送下一个心掉以前。 组合应用级别的心跳和 RDY 状态,避免头阻塞现象,也可能使心跳无用(即,若是消费者是在后面的处理消息流的接收缓冲区中,操做系统将被填满,堵心跳)为了保证进度,全部的网络 IO 时间上限势必与配置的心跳间隔相关联。这意味着,你能够从字面上拔掉之间的网络链接 nsqd 和消费者,它会检测并正确处理错误。当检测到一个致命错误,客户端链接被强制关闭。在传输中的消息会超时而从新排队等待传递到另外一个消费者。最后,错误会被记录并累计到各类内部指标。
由于NSQ没有在守护程序之间共享信息,因此它从一开始就是为了分布式操做而生。个别的机器能够随便宕机随便启动而不会影响到系统的其他部分,消息发布者能够在本地发布,即便面对网络分区。 这种“分布式优先”的设计理念意味着NSQ基本上能够永远不断地扩展,须要更高的吞吐量?那就添加更多的nsqd吧。惟一的共享状态就是保存在lookup节点上,甚至它们不须要全局视图,配置某些nsqd注册到某些lookup节点上这是很简单的配置,惟一关键的地方就是消费者能够经过lookup节点获取全部完整的节点集。清晰的故障事件——NSQ在组件内创建了一套明确关于可能致使故障的的故障权衡机制,这对消息传递和恢复都有意义。虽然它们可能不像Kafka系统那样提供严格的保证级别,但NSQ简单的操做使故障状况很是明显。
不像其余的队列组件,NSQ并无提供任何形式的复制和集群,也正是这点让它可以如此简单地运行,但它确实对于一些高保证性高可靠性的消息发布没有足够的保证。咱们能够经过下降文件同步的时间来部分避免,只需经过一个标志配置,经过EBS支持咱们的队列。可是这样仍然存在一个消息被发布后立刻死亡,丢失了有效的写入的状况。
虽然Kafka由一个有序的日志构成,但NSQ不是。消息能够在任什么时候间以任何顺序进入队列。在咱们使用的案例中,这一般没有关系,由于全部的数据都被加上了时间戳,但它并不适合须要严格顺序的状况。
NSQ对于超时系统,它使用了心跳检测机制去测试消费者是否存活仍是死亡。不少缘由会致使咱们的consumer没法完成心跳检测,因此在consumer中必须有一个单独的步骤确保幂等性。
本文将nsq集群具体的安装过程略去,你们能够自行参考官网,比较简单。这部分介绍下笔者实验的拓扑,以及nsqadmin的相关信息。
实验采用3台NSQD服务,2台LOOKUPD服务。采用官方推荐的拓扑,消息发布的服务和NSQD在一台主机。一共5台机器。NSQ基本没有配置文件,配置经过命令行指定参数。主要命令以下:LOOKUPD命令
复制代码bin /nsqlookupd
bin /nsqlookupd
NSQD命令
复制代码bin/nsqd --lookupd-tcp-address=172.16.30.254:4160 -broadcast-address=172.16.30.254
bin/nsqd --lookupd-tcp-address=172.16.30.254:4160 -broadcast-address=172.16.30.254
复制代码bin/nsqadmin --lookupd-http-address=172.16.30.254:4161
bin/nsqadmin --lookupd-http-address=172.16.30.254:4161
工具类,消费后存储到本地文件。
复制代码bin/nsq_to_file --topic=newtest --channel=test --output-dir=/tmp --lookupd-http-address=172.16.30.254:4161
bin/nsq_to_file --topic=newtest --channel=test --output-dir=/tmp --lookupd-http-address=172.16.30.254:4161
发布一条消息
复制代码curl -d 'hello world 5' 'http://172.16.30.254:4151/put?topic=test'
curl -d 'hello world 5' 'http://172.16.30.254:4151/put?topic=test'
对Streams的详细信息进行查看,包括NSQD节点,具体的channel,队列中的消息数,链接数等信息。
列出全部的NSQD节点:
消息的统计:
lookup主机的列表:
NSQ基本核心就是简单性,是一个简单的队列,这意味着它很容易进行故障推理和很容易发现bug。消费者能够自行处理故障事件而不会影响系统剩下的其他部分。
事实上,简单性是咱们决定使用NSQ的首要因素,这方便与咱们的许多其余软件一块儿维护,经过引入队列使咱们获得了堪称完美的表现,经过队列甚至让咱们增长了几个数量级的吞吐量。愈来愈多的consumer须要一套严格可靠性和顺序性保障,这已经超过了NSQ提供的简单功能。
结合咱们的业务系统来看,对于咱们所须要传输的发票消息,相对比较敏感,没法容忍某个nsqd宕机,或者磁盘没法使用的状况,该节点堆积的消息没法找回。这是咱们没有选择该消息中间件的主要缘由。简单性和可靠性彷佛并不能彻底知足。相比Kafka,ops肩负起更多负责的运营。另外一方面,它拥有一个可复制的、有序的日志能够提供给咱们更好的服务。但对于其余适合NSQ的consumer,它为咱们服务的至关好,咱们期待着继续巩固它的坚实的基础。
ps: 本文首发于笔者的csdn博客,此处将其加入我的博客。
NSQ:分布式的实时消息平台
NSQ - NYC Golang Meetup
NSQ Docs