本篇文章主要介绍如何在nodeJs中使用nsq,其余实现将在后续文章输出。前端
前段时间作了一个网页生成pdf的node服务。因为puppteer和canvas生成过程当中对内存的消耗比较大,内容量大的网页生成时间过长,对于第三方组件有时候会生成出问题等缘由。node
引入了nsq,使项目实现负载均衡,消除单点故障。git
可是网上查找以后发现介绍node中加入nsq的方案不多,通过软膜硬泡终于把nsq引入了node中,但愿能把本身的收获和你们聊一下吧。github
NSQ是一个基于Go语言的分布式实时消息平台, 它具备分布式、去中心化的拓扑结构,支持无限水平扩展。无单点故障、故障容错、高可用性以及可以保证消息的可靠传递的特征。另外,NSQ很是容易配置和部署, 且支持众多的消息协议。支持多种客户端,协议简单。sql
nsq设计很简单,须要了解如下几个核心概念。npm
一、 nsqd:一个负责接收、排队、转发消息到客户端的守护进程
二、 nsqlookupd:管理拓扑信息, 用于收集nsqd上报的topic和channel,并提供最终一致性的发现服务的守护进程。
三、 Topic:一个topic就是程序发布消息的一个逻辑键,当程序第一次发布消息时就会建立topic。
四、 Channels:channel组与消费者相关,是消费者之间的负载均衡,channel在某种意义上来讲是一个“队列”。每当一个发布者发送一条消息到一个topic,消息会被复制到全部消费者链接的channel上,消费者经过这个特殊的channel读取消息,实际上,在消费者第一次订阅时就会建立channel。
Topic只能有一个channel能够有多个,不一样的channel用于分发不一样的任务
下面附上一个金典nsq示意图:canvas
在使用过程当中发现gcc版本太低致使报错。
由于node.js 4升级了v8引擎,须要gcc版本在4.8以上。后端
本项目是在eggjs基础上创建的。首先须要在项目中安装nsqjs
服务器
$ npm install nsqjs --save
在根文件app.js对nsq进行控制,nsq的配置很是简洁,nsq分为写和读两个独立的过程。网络
const nsq = require('nsqjs') module.exports = app => { app.beforeStart(async () => { // 实例化nsq的写操做 // 在config中配置nsq的host和port,这里是你配置的nsq地址 const writerNsq = new nsq.Writer(app.config.nsq.nsqHostWriter, app.config.nsq.writePort) // 链接nsq的写功能 writerNsq.connect() // 当写操做链接成功后,把其赋值到全局的app上以便写入信息 writerNsq.on('ready', () => { app.writerNsq = writerNsq })
当nsq写功能实现后,咱们能够经过publish方法向nsq队列写入信息
ctx.app.writerNsq.publish(config.nsq.topic, { // 你所须要传递的参数 })
当服务器有空闲的时候nsq会随机分配到空闲线程上实现读操做,咱们的核心业务是在读功能启用以后实现的。
读和写的初始化过程是在项目启动时候就要执行的。
项目运行过程当中,咱们只是不停的在进行读写操做而已。
// 实例化nsq的读操做 // 参数是对应须要读的topic和channel和应的nsq读的地址 const client = new nsq.Reader(app.config.nsq.topic, app.config.nsq.channel, { lookupdHTTPAddresses: app.config.nsq.nsqHostReader, maxInFlight: 1 }) // 链接nsq的读功能 client.connect() // 每当有消息队列进来的时候就会调用client.on方法 // message是我门在写的过程当中传入的信息 client.on方法('message', async msg => { // 对写入的信息格式化 let data = JSON.parse(msg.body.toString()) try { // 为了保持链接状态,处理超时状况 const touch = () => { if (!msg.hasResponded) { msg.touch() // Touch the message again a second before the next timeout. setTimeout(touch, msg.timeUntilTimeout() - 1000) } } let timeTouch = setTimeout(touch, msg.timeUntilTimeout() - 1000) let timeFinish = setTimeout(msg.finish.bind(msg), msg.timeUntilTimeout() * 3 + 1000) // 这里是项目的核心处理部分,具体内容后面文章在说明,这里返回的url即是生成pdf的网络地址 let url = await ctx.service.pdf.index.generate(data) clearTimeout(timeTouch) clearTimeout(timeFinish) // 这里表示这个队列结束告诉nsq能够放下个兄弟进来了 msg.finish() } catch (error) { // 万一出现错误也不要阻塞,nsq在失败后会从新入队 msg.finish() // 这里能够加入网络日志 console.log(error) } }); client.on('error', function(err) { // 这里监听读操做时候发生错误状况处理 // 这里能够作一些错误处理,加错误日志 console.log(err) }); }); };
刚开始说到用nsq仍是有点慌的,毕竟做为一个前端工程师一脸懵逼,可是通过仔细学习,和后端大佬的请教,发现nsq其实就是一个高效队列,简易好用,对于上手来讲仍是比较简单的。
后续文章还会对puppteer生成pdf服务的核心业务作详细介绍,也就是上文ctx.service.pdf.index.generate(data)具体实现过程。
项目地址:https://github.com/XIEJUNXIRU...
以上只是本人的学习总结,若有问题,请大神不吝赐教。