我司的系统,用户能够建立任务,启动任务,但任务的运行须要很长的时间,因此采用消息队列的方式,后台异步处理。前端
这里所用到的是 RabbitMQ
,对应的 Node.js 库为 amqplib
( 这里采用的是回调形式:require("amqplib/callback_api") )。数据库
① ② ③ ④ ⑤ :从前端发来 HTTP 请求,被 Producer(express) 处理,通过 Route -> Controller -> Function ,使用 amqplib 的 sendToQueue(),发送须要处理的任务的 uuid 入 MQ 队列。这时候,还要修改数据库,把该任务的状态从 "new" -> "queue"。express
⑥ ⑦ ⑧ ⑨ ⑩ ⑪ : Consumer 消化 MQ 队列吐出来的 message,即任务的 uuid。先修改数据库该任务的状态为"runnning", 而后调用"处理"模块去执行复杂运算,执行完成后,修改数据库该任务的状态是 "success" 仍是 ”fail”,而后返回 ack 信号给 MQ 。api
注:此次的需求比较简单,因此没有用到 MQ 的交换机功能。安全
MQ 的 connection 和 channel 对象都有 "error" 和 "close" 事件,需作好相关的日志记录。尤为是 "error",要加上 reconnect 机制,防止由于某个任务致使的错误或者 MQ 自身的缘由,影响到后续任务的处理。服务器
connection.on("error", function(err) { // reconnect }); channel.on("error", function(err) { // reconnect });
最后能够根据实际须要,在全局加上 try……catch。运维
为了防止 MQ server 的崩溃致使的消息损失,须要对数据作持久化。大体分两块:异步
队列持久化 + 消息持久化
ui
channel.assertQueue(queue_name, { durable: true // 队列持久化 });
channel.sendToQueue( queue_name, Buffer.from(uuid), { persistent: true // 消息持久化 }, function(err, ok) { } );
④ 中的 sendToQueue() ,须要在 createConfirmChannel() 的基础下使用,这样 sendToQueue() 的第三个参数才有 MQ 收到 message 成功与否的回调,根据这个,去结合 ② 的 DB 操做, 绑定为事务,来保证数据的一致性。日志
channel.consume() 需开启 ack 模式,等 Consumer 端一切确认完成后,再通知 MQ 。
channel.consume( queue_name, function(msg) { const uuid = msg.content.toString(); // use uuid todo…… }, { noAck: false } );
从上面的 如何保证 DB 跟 MQ 数据的一致性? 其实就避免了该问题的发生。
可是额外要作的是:
一、重试机制,例如 发送 message 失败,规定重试的次数。
二、善用 MQ 的 Web 控制台,地址形如 http://localhost:15672。除了关注基本的服务器负载状态,还要关注任务队列是否正常吞吐,是否有卡壳。
三、构建运维一体的后台管控系统,比上面的 2 自定义程度更高。
四、提供用户相似"提交工单"/"问题反馈"/"错误上传"的功能,查缺补漏。