RabbitMQ 实践之在处理异步任务中的流程

1、背景:

我司的系统,用户能够建立任务,启动任务,但任务的运行须要很长的时间,因此采用消息队列的方式,后台异步处理。前端

这里所用到的是 RabbitMQ ,对应的 Node.js 库为 amqplib ( 这里采用的是回调形式:require("amqplib/callback_api") )。数据库

2、MQ 处理任务的流程


① ② ③ ④ ⑤ :从前端发来 HTTP 请求,被 Producer(express) 处理,通过 Route -> Controller -> Function ,使用 amqplib 的 sendToQueue(),发送须要处理的任务的 uuid 入 MQ 队列。这时候,还要修改数据库,把该任务的状态从 "new" -> "queue"。express

⑥ ⑦ ⑧ ⑨ ⑩ ⑪ : Consumer 消化 MQ 队列吐出来的 message,即任务的 uuid。先修改数据库该任务的状态为"runnning", 而后调用"处理"模块去执行复杂运算,执行完成后,修改数据库该任务的状态是 "success" 仍是 ”fail”,而后返回 ack 信号给 MQ 。api

注:此次的需求比较简单,因此没有用到 MQ 的交换机功能。安全

3、问与答


Q:如何作好 MQ 的错误处理?

MQ 的 connection 和 channel 对象都有 "error" 和 "close" 事件,需作好相关的日志记录。尤为是 "error",要加上 reconnect 机制,防止由于某个任务致使的错误或者 MQ 自身的缘由,影响到后续任务的处理。服务器

connection.on("error", function(err) {
        // reconnect 
});

channel.on("error", function(err) {
        // reconnect 
});

最后能够根据实际须要,在全局加上 try……catch。运维

Q:如何保证 MQ 自身消息的数据安全?

为了防止 MQ server 的崩溃致使的消息损失,须要对数据作持久化。大体分两块:异步

队列持久化 + 消息持久化ui

channel.assertQueue(queue_name, {
        durable: true
        // 队列持久化
});
channel.sendToQueue(
          queue_name,
          Buffer.from(uuid),
          {
            persistent: true
            // 消息持久化
          },
          function(err, ok) { 
          
          }
);

Q:如何保证 DB 跟 MQ 数据的一致性?

一、发送 message 时

④ 中的 sendToQueue() ,须要在 createConfirmChannel() 的基础下使用,这样 sendToQueue() 的第三个参数才有 MQ 收到 message 成功与否的回调,根据这个,去结合 ② 的 DB 操做, 绑定为事务,来保证数据的一致性。日志

二、接受 message 时

channel.consume() 需开启 ack 模式,等 Consumer 端一切确认完成后,再通知 MQ 。

channel.consume(
        queue_name,
        function(msg) {
          const uuid = msg.content.toString();
                // use uuid todo…… 
        },
        {
          noAck: false
        }
);

Q:如何避免 MQ 多发、少发的问题

从上面的 如何保证 DB 跟 MQ 数据的一致性? 其实就避免了该问题的发生。

可是额外要作的是:

一、重试机制,例如 发送 message 失败,规定重试的次数。

二、善用 MQ 的 Web 控制台,地址形如 http://localhost:15672。除了关注基本的服务器负载状态,还要关注任务队列是否正常吞吐,是否有卡壳。

三、构建运维一体的后台管控系统,比上面的 2 自定义程度更高。

四、提供用户相似"提交工单"/"问题反馈"/"错误上传"的功能,查缺补漏。

相关文章
相关标签/搜索