最近在开发一个小型前端监控项目,因为技术栈中使用到了 Node + Redis 做为消息队列实现,所以这里记录下在 Node 中经过 Redis 来实现消息队列时的 使用方法
和 注意事项
前端
消息队列,是一种存放 消息
是队列结构,能够用来解决 分布式系统通讯 从而解耦系统模块
、异步任务处理
、请求消峰限流
的问题。node
既然叫作队列,那它通常是从一侧推入消息,从另外一侧消费消息;大概是以下的流程。redis
在个人需求当中,我用消息队列来作异步的入库处理。后端
我经过 Node 作了一个对外的日志接收层(即图中 Koa Server)用于接收上报日志,当Koa Server接收完成会当即给用户响应 OK,由于用户是不必去感知后端日志的入库结果的。promise
所以 Koa Server 收到日志后,将消息放入 Redis 消息队列便可。另一端,我启动了一个 消费
程序(即上图中的日志入库模块,它也是一个 Node 脚本)来对MQ消息进行读取并进行入库操做。bash
消息队列,其实有 2种类型。一种是基于 队列模型
的,一种是基于 订阅发布模式
的。网络
对于 订阅发布模式
来讲,是指的多个消费者均可以订阅某一个 channel 的消息,当channel中来了消息,全部的订阅者都会收到通知,而且全部的订阅者均可以对同一个消息进行处理(消费)。数据结构
对于 队列模型
来讲,当消息入队后,在另外一端只出队一次,若是有多个消费者在等待这个队列,那么只有一个消费者能拿到这个消息进行处理。app
在 Redis 中,以上 2 种模型,分别经过 pub/sub
功能和 list
结构能够来实现。异步
对于个人日志接收场景来讲,我指望的是不管我后端有多少个 入库消费者
,我但愿同一条上报只能入库一次。所以对我来讲,我须要使用 队列模型
来实现消息队列,即便用 Redis 的 List 结构。
咱们经过 redis-cli 来简单实验下 list 结构是如何当作消息队列的。
首先,经过 lpush 命令往 redis 中某个队列的左侧推入一条消息:
lpush my_mq abc
这样,咱们就往 my_mq
这个队列推入了一条内容为 abc
的消息。因为此时并无消费者,因此这条消息依然存在于队列当中。咱们甚至能够再次往里推入第2条 def
消息,并经过 llen
命令来查看当前队列的长度。
接下来,咱们在另一个命令行窗口输入:
rpop my_mq
意思是从 my_mq
队列的右侧拿出一条消息。结果:
Redis 的 List 结构,为了方便你们当作消息队列。提供了一种阻塞模式。 阻塞和非阻塞有什么区别呢?
咱们用一个新命令行窗口,去执行 阻塞等待消息
:
brpop my_mq 0
注意后面要加一个 超时时间
,0就表示一直阻塞等待。而后,咱们看到 redis 命令行就阻塞在这里了,处于等待消息的状态:
而若是使用 rpop
非阻塞命令的话,则会返回空并直接退出等待:
所以,能够发现,阻塞非阻塞模式,最大的区别:是在于当消息队列为空的时候,阻塞模式不会退出等待,而非阻塞模式则会直接返回空并退出等待。
当 brpop
正在等待的时候,咱们往队列中 push 一个消息:
lpush my_mq 123
能够看到,阻塞模式的消费端,收到了 123 这个消息,同时本身也退出了等待:
这说明:
所以 redis 所谓的阻塞,是 当还未等到1条消息时,则阻塞等待;当等到1条消息,即马上退出
;它并不会循环阻塞---即等到消息后它就再也不阻塞监听这个队列了。 这将给咱们编写 Node 代码提供一些启发。
到了重点了。咱们在 Node 中编码来使用 redis 消息队列,跟在 cli 界面使用的方式是同样的。可是须要咱们考虑如何编写 消费者
端的代码,才能实现所谓的 持续监听队列
。毕竟,咱们的 消费者
是须要常驻进程,持续监听队列消息的。并非说 收到一个消息就退出进程
。
所以,咱们须要编写一个
首先,咱们能够这样编写代码来在 Node 中建立 redis 客户端:
const redis = require('promise-redis-client') let client = redis.createClient(...options) client.on('error', err => { console.log('redis连接出错') }) client.on('ready', () => { console.log('redis ready') })
为了实现 当redis客户端建立完毕,再开启消息队列监听
,咱们把上面的代码,封装成一个模块,用 promise 方式导出:
// redis.js const redis = require('promise-redis-client') exports.createClient = function() { return new Promise((resolve, reject) => { let client = redis.createClient(...options) client.on('error', err => { console.log('redis 链接出错') reject(err) }) client.on('ready', () => { console.log('redis ready') resolve(client) }) }) }
OK,接下来,咱们能够去 app.js
中编写队列的消费者代码。为了更优雅的使用 async/await,咱们能够这样来编写一个 startWait 函数:
async function startWaitMsg(client) { ... }
而后,在 client ready 的时候,去启动它:
const { createClient } = require('./redis.js') const c = createClient() client.then(async c => { await startWaitMsg(c) })
最难的地方在于,startWaitMsg 函数该如何编写。因为咱们使用了 promise 版本的 redis库。所以,咱们能够像这样去读取一个消息:
async function startWaitMsg(client) { await client.rpop('my_mq') }
但这样写的话,redis返回消息后,node继续日后执行,最终 startWaitMsg 函数就执行结束了。尽管整个 Node 进程会由于 redis 链接未断开而不会退出,但 node 此时已经没法再次去执行 client.rpop 这句代码了,也所以没法再次从消息队列中获取新来的消息。
咱们想到,可使用循环来实现 持续监听队列
。因而,把代码改为:
async function startWaitMsg(client) { while(true) { await client.rpop('my_mq') } }
如此便实现了 持续执行 rpop 指令
。然而,若是你在 rpop 代码后面加一行日志打印的话,会观察到 client.rpop 在持续打印 null。
这是由于,rpop 指令是 非阻塞的
,所以当队列没有消息,他便返回一个 null,由此触发你的 while 循环在不断执行。这会致使咱们程序占用过多的 cpu时间片,且对 redis 网络IO有过多的不必的消耗。
整个while循环不停的执行,只有执行rpop这一行的时候会短暂释放一下EventLoop给其余代码,这对脚本性能影响也会较大。国家提倡节能减排,这显然不是最优雅的。
让咱们来用上 redis 队列的阻塞模式试试。
async function startWaitMsg(c) { while(true) { const res = await c.brpop('my_mq', 0) console.log('收到消息', res) } }
经过 brpop 指令,可让 brpop 代码阻塞在这里。这里所谓的 阻塞
并非对 Node 程序的阻塞,而是 redis 客户端自身的阻塞。实际上对 Node 进程来讲,不管是 rpop
仍是 brpop
都是 非阻塞
的异步 IO操做,只是在消息队列为空时 rpop
底层会马上返回null,从而node进程会 resolve一个空,而 brpop
会在底层redis阻塞等待消息,消息到达后再给 Node 进程通知 resolve。
所以,brpop 对 Node 来讲,能够避免本身实现队列的内容轮询,能够在等待IO回调期间将cpu留给其余任务。从而大大减小 Node 进程的 CPU 消耗。
在代码运行过程当中,出现了一个新的问题: redis 客户端会在某些状况下断开链接(可能因为网络等缘由)。而经过分析日志发现:一旦发生链接异常,咱们的消费者脚本就没法继续接收新的消息了(个人日志入库功能失效)。
通过分析,发现问题缘由依然在于咱们的 while 语句 和 brpop 的配合问题。
当 redis client 对象发生链接异常时,会向当前正在等待的 brpop
代码抛出一个 reject 异常。咱们回看上述代码的 startWait 函数:
async function startWaitMsg(c) { while(true) { const res = await c.brpop('my_mq', 0) console.log('收到消息', res) } }
若是 await brpop 这一行抛出 reject 异常,因为咱们未捕获该异常,则异常会抛出 startWaitMsg 函数,结果就是 while 循环被退出了。
事实上,当链接出现问题,咱们须要对 client 进行重连。不过,这个重连机制,redisclient 会自动进行,所以咱们的代码要作的仅仅只须要保证while循环能在异常时恢复
。因而,咱们在发生异常时,continue 一下:
async function startWaitMsg(c) { while(true) { let res = null try { res = await c.brpop('my_mq', 0) console.log('收到消息', res) } catch(err) { console.log('brpop 出错,从新brpop') continue } // ... 消息处理任务 } }
因为 redis 客户端内部的重连过程不会再触发 reject (只是断开链接的时候触发一次),所以 continue 以后的 brpop 又会从新 "阻塞" 等待,由此,咱们的 消费者
即可以正常活着了。
const redis = require('promise-redis-client') exports.createClient = function() { return new Promise((resolve, reject) => { let client = redis.createClient(...options) client.on('error', err => { console.log('redis 链接出错') reject(err) }) client.on('ready', () => { console.log('redis ready') resolve(client) }) }) }
app.js
const { createClient } = require('./redis.js') const c = createClient() client.then(async c => { await startWaitMsg(c) // 启动消息监听 }) async function startWaitMsg(c) { while(true) { let res = null try { res = await c.brpop('my_mq', 0) console.log('收到消息', res) } catch(err) { console.log('brpop 出错,从新brpop') continue } // ... 消息处理任务 } }
队列模式
的消息队列
brpop
阻塞指令的使用,能够避免 cpu 空转来监听队列