Node.js+Redis实现消息队列的最佳实践

问题来由

最近在开发一个小型前端监控项目,因为技术栈中使用到了 Node + Redis 做为消息队列实现,所以这里记录下在 Node 中经过 Redis 来实现消息队列时的 使用方法注意事项前端

什么是消息队列

消息队列,是一种存放 消息 是队列结构,能够用来解决 分布式系统通讯 从而解耦系统模块异步任务处理请求消峰限流的问题。node

既然叫作队列,那它通常是从一侧推入消息,从另外一侧消费消息;大概是以下的流程。redis

image.png

在个人需求当中,我用消息队列来作异步的入库处理。后端

image.png

我经过 Node 作了一个对外的日志接收层(即图中 Koa Server)用于接收上报日志,当Koa Server接收完成会当即给用户响应 OK,由于用户是不必去感知后端日志的入库结果的。promise

所以 Koa Server 收到日志后,将消息放入 Redis 消息队列便可。另一端,我启动了一个 消费 程序(即上图中的日志入库模块,它也是一个 Node 脚本)来对MQ消息进行读取并进行入库操做。bash

Redis 如何作消息队列

消息队列,其实有 2种类型。一种是基于 队列模型 的,一种是基于 订阅发布模式的。网络

对于 订阅发布模式 来讲,是指的多个消费者均可以订阅某一个 channel 的消息,当channel中来了消息,全部的订阅者都会收到通知,而且全部的订阅者均可以对同一个消息进行处理(消费)。数据结构

对于 队列模型 来讲,当消息入队后,在另外一端只出队一次,若是有多个消费者在等待这个队列,那么只有一个消费者能拿到这个消息进行处理。app

在 Redis 中,以上 2 种模型,分别经过 pub/sub 功能和 list 结构能够来实现。异步

对于个人日志接收场景来讲,我指望的是不管我后端有多少个 入库消费者,我但愿同一条上报只能入库一次。所以对我来讲,我须要使用 队列模型 来实现消息队列,即便用 Redis 的 List 结构。

CLI 简单实验

咱们经过 redis-cli 来简单实验下 list 结构是如何当作消息队列的。

首先,经过 lpush 命令往 redis 中某个队列的左侧推入一条消息:

lpush my_mq abc

这样,咱们就往 my_mq 这个队列推入了一条内容为 abc 的消息。因为此时并无消费者,因此这条消息依然存在于队列当中。咱们甚至能够再次往里推入第2条 def 消息,并经过 llen 命令来查看当前队列的长度。

image.png

接下来,咱们在另一个命令行窗口输入:

rpop my_mq

意思是从 my_mq 队列的右侧拿出一条消息。结果:

image.png

阻塞模式实验

Redis 的 List 结构,为了方便你们当作消息队列。提供了一种阻塞模式。 阻塞和非阻塞有什么区别呢?

咱们用一个新命令行窗口,去执行 阻塞等待消息:

brpop my_mq 0

注意后面要加一个 超时时间,0就表示一直阻塞等待。而后,咱们看到 redis 命令行就阻塞在这里了,处于等待消息的状态:
image.png
而若是使用 rpop 非阻塞命令的话,则会返回空并直接退出等待:
image.png

所以,能够发现,阻塞非阻塞模式,最大的区别:是在于当消息队列为空的时候,阻塞模式不会退出等待,而非阻塞模式则会直接返回空并退出等待。

brpop 正在等待的时候,咱们往队列中 push 一个消息:

lpush my_mq 123

能够看到,阻塞模式的消费端,收到了 123 这个消息,同时本身也退出了等待:

image.png

这说明:

  • 阻塞模式: 当队列为空时,(即没有等到消息时),则一直阻塞着;等到一条消息就退出
  • 非阻塞模式:当队列为空(即没有等到消息),也不阻塞,而是直接返回null退出

所以 redis 所谓的阻塞,是 当还未等到1条消息时,则阻塞等待;当等到1条消息,即马上退出;它并不会循环阻塞---即等到消息后它就再也不阻塞监听这个队列了。 这将给咱们编写 Node 代码提供一些启发。

Node 如何使用

到了重点了。咱们在 Node 中编码来使用 redis 消息队列,跟在 cli 界面使用的方式是同样的。可是须要咱们考虑如何编写 消费者 端的代码,才能实现所谓的 持续监听队列。毕竟,咱们的 消费者 是须要常驻进程,持续监听队列消息的。并非说 收到一个消息就退出进程

所以,咱们须要编写一个

  • 能常驻的Node进程,可以持续的等待 redis 队列消息
  • 当收到1条消息,便由 Node 脚本处理;处理完要继续等待队列中下一条消息。如此循环往复。

首先,咱们能够这样编写代码来在 Node 中建立 redis 客户端:

const redis = require('promise-redis-client')
let client = redis.createClient(...options)
client.on('error', err => {
    console.log('redis连接出错')
})
client.on('ready', () => {
    console.log('redis ready')
})

为了实现 当redis客户端建立完毕,再开启消息队列监听,咱们把上面的代码,封装成一个模块,用 promise 方式导出:

// redis.js
const redis = require('promise-redis-client')
exports.createClient = function() {
    return new Promise((resolve, reject) => {
        let client = redis.createClient(...options)
        client.on('error', err => {
            console.log('redis 链接出错')
            reject(err)
        })
        client.on('ready', () => {
            console.log('redis ready')
            resolve(client)
        })
    })
}

OK,接下来,咱们能够去 app.js 中编写队列的消费者代码。为了更优雅的使用 async/await,咱们能够这样来编写一个 startWait 函数:

async function startWaitMsg(client) {
    ...
}

而后,在 client ready 的时候,去启动它:

const { createClient } = require('./redis.js')
const c = createClient()
client.then(async c => {
    await startWaitMsg(c)
})

最难的地方在于,startWaitMsg 函数该如何编写。因为咱们使用了 promise 版本的 redis库。所以,咱们能够像这样去读取一个消息:

async function startWaitMsg(client) {
    await client.rpop('my_mq')
}

但这样写的话,redis返回消息后,node继续日后执行,最终 startWaitMsg 函数就执行结束了。尽管整个 Node 进程会由于 redis 链接未断开而不会退出,但 node 此时已经没法再次去执行 client.rpop 这句代码了,也所以没法再次从消息队列中获取新来的消息。

循环实现持续等待

咱们想到,可使用循环来实现 持续监听队列。因而,把代码改为:

async function startWaitMsg(client) {
    while(true) {
      await client.rpop('my_mq')
    }
}

如此便实现了 持续执行 rpop 指令。然而,若是你在 rpop 代码后面加一行日志打印的话,会观察到 client.rpop 在持续打印 null。
image.png

这是由于,rpop 指令是 非阻塞的,所以当队列没有消息,他便返回一个 null,由此触发你的 while 循环在不断执行。这会致使咱们程序占用过多的 cpu时间片,且对 redis 网络IO有过多的不必的消耗。

整个while循环不停的执行,只有执行rpop这一行的时候会短暂释放一下EventLoop给其余代码,这对脚本性能影响也会较大。国家提倡节能减排,这显然不是最优雅的。

使用阻塞模式

让咱们来用上 redis 队列的阻塞模式试试。

async function startWaitMsg(c) {
    while(true) {
        const res = await c.brpop('my_mq', 0)
        console.log('收到消息', res)
    }
}

经过 brpop 指令,可让 brpop 代码阻塞在这里。这里所谓的 阻塞 并非对 Node 程序的阻塞,而是 redis 客户端自身的阻塞。实际上对 Node 进程来讲,不管是 rpop 仍是 brpop 都是 非阻塞 的异步 IO操做,只是在消息队列为空时 rpop 底层会马上返回null,从而node进程会 resolve一个空,而 brpop 会在底层redis阻塞等待消息,消息到达后再给 Node 进程通知 resolve。

所以,brpop 对 Node 来讲,能够避免本身实现队列的内容轮询,能够在等待IO回调期间将cpu留给其余任务。从而大大减小 Node 进程的 CPU 消耗。

redis断开没法继续消费的问题

在代码运行过程当中,出现了一个新的问题: redis 客户端会在某些状况下断开链接(可能因为网络等缘由)。而经过分析日志发现:一旦发生链接异常,咱们的消费者脚本就没法继续接收新的消息了(个人日志入库功能失效)。

通过分析,发现问题缘由依然在于咱们的 while 语句 和 brpop 的配合问题。

当 redis client 对象发生链接异常时,会向当前正在等待的 brpop 代码抛出一个 reject 异常。咱们回看上述代码的 startWait 函数:

async function startWaitMsg(c) {
    while(true) {
        const res = await c.brpop('my_mq', 0)
        console.log('收到消息', res)
    }
}

若是 await brpop 这一行抛出 reject 异常,因为咱们未捕获该异常,则异常会抛出 startWaitMsg 函数,结果就是 while 循环被退出了。

思考如何解决

事实上,当链接出现问题,咱们须要对 client 进行重连。不过,这个重连机制,redisclient 会自动进行,所以咱们的代码要作的仅仅只须要保证while循环能在异常时恢复。因而,咱们在发生异常时,continue 一下:

async function startWaitMsg(c) {
    while(true) {
        let res = null
        try {
            res = await c.brpop('my_mq', 0)
            console.log('收到消息', res)
        }
        catch(err) {
            console.log('brpop 出错,从新brpop')
            continue
        }
        // ... 消息处理任务
    }
}

因为 redis 客户端内部的重连过程不会再触发 reject (只是断开链接的时候触发一次),所以 continue 以后的 brpop 又会从新 "阻塞" 等待,由此,咱们的 消费者 即可以正常活着了。

最终代码

  • 客户端链接代码文件:redis.js
const redis = require('promise-redis-client')
exports.createClient = function() {
    return new Promise((resolve, reject) => {
        let client = redis.createClient(...options)
        client.on('error', err => {
            console.log('redis 链接出错')
            reject(err)
        })
        client.on('ready', () => {
            console.log('redis ready')
            resolve(client)
        })
    })
}

app.js

const { createClient } = require('./redis.js')

const c = createClient()
client.then(async c => {
    await startWaitMsg(c) // 启动消息监听
})

async function startWaitMsg(c) {
    while(true) {
        let res = null
        try {
            res = await c.brpop('my_mq', 0)
            console.log('收到消息', res)
        }
        catch(err) {
            console.log('brpop 出错,从新brpop')
            continue
        }
        // ... 消息处理任务
    }
}

总结

  • redis 的 list 数据结构,能够用做实现队列模式消息队列
  • Node 中能够经过 while(true) 实现队列的持续循环监听
  • 经过 brpop 阻塞指令的使用,能够避免 cpu 空转来监听队列
  • Node 中要注意 redis 链接断开时的错误处理,以免因出错致使没法从新监听队列
相关文章
相关标签/搜索