NSQ源码剖析之nsqd

NSQ简介

NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理天天数以十亿计级别的消息。NSQ 具备分布式和去中心化拓扑结构,该结构具备无单点故障、故障容错、高可用性以及可以保证消息的可靠传递的特征,是一个成熟的、已在大规模生成环境下应用的产品。golang

NSQ 由 3 个守护进程组成: 
nsqd 是接收、保存和传送消息到客户端的守护进程。 
nsqlookupd 是管理的拓扑信息,维护着全部nsqd的状态,并提供了最终一致发现服务的守护进程。 
nsqadmin 是一个 Web UI 来实时监控集群和执行各类管理任务。 
NSQ结构图sql

这篇文章介绍主要介绍nsqd的实现。缓存

Topic与Channel

Topic与Channel是NSQ中重要的两个概念。 
生产者将消息写到Topic中,一个Topic下能够有多个Channel,每一个Channel都是Topic的完整副本。 
消费者从Channel处订阅消息,若是有多个消费者订阅同一个Channel,Channel中的消息将被传递到一个随机的消费者。服务器

图片标题

 
 

要理解Topic Channel中各类chan的做用,关键是要理解golang中如何在并发环境下如何操做一个结构体(多个goroutine同时操做topic),与C/C++多线程操做同一个结构体时加锁(mutex,rwmutex)不一样,go语言中通常是为这个结构体(topic,channel)开启一个主goroutine(messagePump函数),全部对该结构体的改变的操做都应是该主goroutine完成的,也就不存在并发的问题了,其它goroutine若是想要改变这个结构体则应该向结构体提供的chan中发送消息(msgchan)或者通知(exitchan,updatechan),主goroutine会一直监听全部的chan,当有消息或者通知到来时作相应的处理。网络

数据的持久化

了解数据的持久化以前,咱们先来看两个问题? 
1. 往Topic中写入消息就是将消息发送到Topic.memoryMsgChan中,可是memoryMsgChan是一个固定内存大小的内存队列,若是队列满了怎么办呢?会阻塞吗? 
2. 若是消息都存放在memoryMsgChan这个内存队列中,程序退出了消息就所有丢失了吗?多线程

NSQ是如何解决的呢,nsq在建立Topic、Channel的时候都会建立一个DiskQueue,DiskQueue负责向磁盘文件中写入消息、从磁盘文件中读取消息,是NSQ实现数据持久化的最重要结构。 
以Topic为例,若是向Topic.memoryMsgChan写入消息可是memoryMsgChan已满时,nsq会将消息写到topic.DiskQueue中,DiskQueue会负责将消息内存同步到磁盘上。 
若是从Topic.memoryMsgChan中读取消息时,可是memoryMsgChan并无消息时,就从topic.DiskQueue中取出同步到磁盘文件中的消息。架构

 
 

咱们看到topic.backend(diskQueue)负责将消息写到磁盘并从磁盘中读取消息,diskQueue提供了两个chan供外部使用:readChan与writeChan。 
咱们来看下diskQueue实现中的几个要点。并发

  1. diskQueue在建立时会开启一个goroutine,从磁盘文件中读取消息写到readChan中,外部goroutine能够从readChan中获取消息;随时监听writeChan,当有消息时从wirtechan中取出消息,写到本地磁盘文件。
  2. diskQueue既要提供文件的读服务又要提供文件的写服务,因此要记录下文件的读位置(readIndex),写位置(writeIndex)。每次从文件中读取消息时使用file.Seek(readindex)定位到文件读位置而后读取消息信息,每次往文件中写入消息时都要file.Seek(writeIndex)定位到写位置再将消息写入。
  3. readIndex,writeIndex很重要,程序退出时要将这些信息(meta data)写到另外的磁盘文件(元信息文件)中,程序启动时首先读取元信息文件,在根据元信息文件中的readIndex writeIndex操做存储信息的文件。
  4. 因为操做系统层也有缓存,调用file.Write()写入的信息,也可能只是存在缓存中并无同步到磁盘,须要显示调用file.sync()才能够强制要求操做系统把缓存同步到磁盘。能够经过指定建立diskQueue时传入的syncEvery,syncTimeout来控制调用file.sync()的频率。syncTimeout是指每隔syncTimeout秒调用一次file.sync(),syncEvery是指每当写入syncEvery个消息后调用一次file.sync()。这两个参数均可以在启动nsqd程序时经过命令行指定。

网络架构

nsq是一个可靠的、高性能的服务端网络程序,经过阅读nsqd的源码来学习如何搭建一个可靠的网络服务端程序。分布式

 
 

客户端已成功的与服务器创建连接了,每个客户端创建链接后,nsqd都会建立一个Client接口体,该结构体内保存一些client的状态信息。 
每个Client都会有两个goroutine,一个goroutine负责读取客户端主动发送的各类命令,解析命令,处理命令并将处理结果回复给客户端。 
另外一个goutine负责定时发送心跳信息给客户端,若是客户端订阅某个channel的话则将channel中的将消息经过网络发送给客户端。函数

若是服务端不须要主动推送大量消息给客户端,一个链接只须要开一个goroutine处理请求并发送回复就能够了,这是最简单的方式。开启两个goroutine操做同一个conn的话就须要注意加锁了。

 
 

咱们来看下NSQ中几个比较重要的命令:

  • NOP 心跳回复,没有实际意义
  • PUB 发布一个消息到 话题(topic) 
    PUB <topic_name>\n [ 四字节消息的大小 ][ 消息的内容 ]
  • SUB 订阅话题(topic) /通道(channel) 
    SUB <topic_name> <channel_name>\n
  • RDY 更新 RDY 状态 (表示客户端已经准备好接收N 消息) 
    RDY <count>\n
  • FIN 完成一个消息 (表示成功处理) 
    FIN <message_id>\n

生产者产生消息的过程比较简单,就是一个PUB命令,先读取四字节的消息大小,而后根据消息大小读取消息内容,而后将内容写到topic.MessageChan中。 咱们重点来看下消费者是如何从nsq中读取消息的。 1. 消费者首先须要发送SUB命令,告诉nsqd它想订阅哪一个Channel,而后nsqd将该Client与Channel创建对应关系。 2. 消费者发送RDY命令,告诉服务端它以准备好接受count个消息,服务端则向消费者发送count个消息,若是消费者想继续接受消息就须要不断发送RDY命令告诉服务端本身准备好接受消息(相似TCP协议中滑动窗口的概念,消费者并非按照顺序一个个的消费消息,NSQD最多能够同时count个消息给消费者,每推送给消费者一个消息count数目减一,当消费者处理完消息回复FIN指令时count+1)。

相关文章
相关标签/搜索