在Node.js中使用RabbitMQ系列二 任务队列

在上一篇文章在Node.js中使用RabbitMQ系列一 Hello world我有使用一个任务队列,不过当时的场景是将消息发送给一个消费者,本篇文章我将讨论有多个消费者的场景。javascript

其实,任务队列最核心解决的问题是避免当即处理那些耗时的任务,也就是避免请求-响应的这种同步模式。取而代之的是咱们经过调度算法,让这些耗时的任务以后再执行,也就是采用异步的模式。咱们须要将一条消息封装成一个任务,而且将它添加到任务队列里面。后台会运行多个工做进程(worker process),经过调度算法,将队列里的任务依次弹出来,并交给其中的一个工做进程进行处理执行。这个概念尤为适合那些HTTP短链接的web应用,它们没法在短期内处理这种复杂的任务。html

准备工做

咱们将字符串类型的消息看做是耗时的任务,而且每一个字符串消息最后带上一些点。每一个点表明该任务须要消耗的秒数。在worker进程处理的时候能够采用setTimeout函数来进行模拟。举个例子:一个伪造的耗时任务是Hello.,则这个任务会消耗1秒,一个伪造的耗时任务是Hello..,则这个任务会消耗2秒,一个伪造的耗时任务是Hello... ,而且这个任务的处理时间会耗费3秒。java

这里稍微对上篇文章的send.js文件修改,让它能够发送用户自定义的任意的消息,这个程序会将任务交给任务队列,咱们用new_task.js来进行命名:web

var q = 'task_queue';
var msg = process.argv.slice(2).join(' ') || "Hello World!";

ch.assertQueue(q, {durable: true});
ch.sendToQueue(q, new Buffer(msg), {persistent: true});
console.log(" [x] Sent '%s'", msg);

咱们的receive.js文件也要进行修改,它须要伪造任务的处理时间,让字符串类型的任务看起来须要几秒钟时间(具体取决于 . 的个数)。算法

ch.consume(q, function(msg) {
  var secs = msg.content.toString().split('.').length - 1;

  console.log(" [x] Received %s", msg.content.toString());
  setTimeout(function() {
    console.log(" [x] Done");
  }, secs * 1000);
}, {noAck: true});

这里以一个worker工做进程为例子,咱们先在左侧的shell窗口开启一个worker进程,以后在右侧shell窗口发送自定义的消息,执行看看效果:
shell

轮询调度算法(Round-robin)

使用任务队列的一个优点是可以将任务并行执行,若是须要处理大量的积压任务,咱们只须要像上面运行worker进程的方式,增长更多的worker,这个让可伸缩变得更加的容易。
首先,咱们使用item2开启2个shell窗口,并在里面运行两个worker进程,可是究竟是哪一个worker会对消息进行处理呢?这里咱们能够作个简单的实验来看看,以下图所示,左侧是两个worker进程,右侧是消息发送端:
api

默认状况下,RabbitMQ会采用Round-robin算法来分发任务队列中的任务,每次分发的时候都会将任务派发给下一个消费者,这样每一个消费者(worker进程)处理的任务数量实际上是同样多的。服务器

消息确认

处理一个复杂的任务须要耗费很长时间,这个时间段里面,可能咱们的worker进程因为某种缘由挂掉了,这种异常状况是须要考虑的。可是咱们现有的代码里面并无作这种异常的处理,当RabbitMQ将任务派发给worker进程以后,咱们当即将这个任务从内存中剔除掉了,设想下,假设worker收到消息以后,咱们立刻将进程杀死掉,这个时候任务并无被成功执行的。同时,咱们也会丢失全部派发到这个worker进程可是尚未被处理的任务信息。
可是,咱们并不想丢掉任何一个任务,若是一个worker进程挂掉,咱们更但愿可以将这个任务派发给其它的worker来处理。
为了不任务信息丢失的状况,RabbitMQ支持消息确认。在一个任务发送到了worker进程而且被成功处理完毕以后,一个ack (消息确认)的标识会从消费者发回来告诉RabbitMQ这个任务已经被处理完了,能够将它删除了。
若是一个消费者挂掉了(常见的缘由如消息通道关闭了,链接丢失,TCP链接丢失),没有向RabbitMQ发送消息确认这个ack的标识,这个时候RabbitMQ会将它重新加入到队列中,若是有其它消费者存在,那么RabbitMQ会立刻将这个任务从新派发下去。以前的例子里面咱们并无开启消息确认这个选项,如今咱们能够经过{noAck: false}来开启:异步

ch.consume(q, function(msg) {
  var secs = msg.content.toString().split('.').length - 1;

  console.log(" [x] Received %s", msg.content.toString());
  setTimeout(function() {
    console.log(" [x] Done");
    ch.ack(msg);
  }, secs * 1000);
}, {noAck: false});     // 开启消息确认标识

能够用CTRL + C来作个实验看看效果。函数

消息持久化

刚刚谈到,若是一个worker进程挂掉了,不让消息丢失的作法。可是,若是整个RabbitMQ的服务器挂掉了呢?当一个RabbitMQ服务退出或者中断的状况下,它会忘记任务队列里面的消息除非你告诉它不要丢掉,即咱们通知RabbitMQ任务队列和这些任务都是须要持久化的。

首先,咱们须要确保RabbitMQ永远不会丢失掉咱们的任务队列。

ch.assertQueue('hello', {durable: true});

可是,你会发现这样并无效果,那是由于hello这个队列咱们已经定义过,而且指定了它不须要持久化。RabbitMQ不容许咱们经过改变参数配置的方式对已经存在的任务队列进行从新定义,所以咱们须要定义一个新的任务队列。

ch.assertQueue('task_queue', {durable: true});

这行代码须要同时在生产者和消费者里面的相关代码的地方进行修改。

接下来,咱们须要经过配置persistent 选项让咱们发送的消息也是持久化的。

ch.sendToQueue(q, new Buffer(msg), {persistent: true});

公平调度

前面的例子,咱们讨论了RabbitMQ的调度方式,即采用Round-robin轮询调度算法,所以它会将消息均匀的分配给每一个worker进程。RabbitMQ并不会关注每一个worker进程有多少个消息没有确认,它只会不断的给你派发任务,无论你能不能处理的过来。这个时候,问题就出现了,设想下,假设有2个worker,其中1个worker恰好很不幸被分配了一个很是复杂的任务,可能须要耗费好几个小时的时间,另一个worker被分配的任务都比较简单,只须要几分钟就能处理完,因为RabbitMQ的任务分配问题,有不少新的任务依然会分配到那个正在处理很耗时任务的worker上面,这个worker后面的任务都会处于等待状态。幸亏,RabbitMQ能够经过prefetch(1)来指定某个worker同时最多只会派发到1个任务,一旦任务处理完成发送了确认通知,才会有新的任务派发过来。

ch.prefetch(1);

最终的代码

new_task.js 代码:

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var q = 'task_queue';
    var msg = process.argv.slice(2).join(' ') || "Hello World!";

    ch.assertQueue(q, {durable: true});
    ch.sendToQueue(q, new Buffer(msg), {persistent: true});
    console.log(" [x] Sent '%s'", msg);
  });
  setTimeout(function() { conn.close(); process.exit(0) }, 500);
});

worker.js:

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var q = 'task_queue';

    ch.assertQueue(q, {durable: true});
    ch.prefetch(1);
    console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
    ch.consume(q, function(msg) {
      var secs = msg.content.toString().split('.').length - 1;

      console.log(" [x] Received %s", msg.content.toString());
      setTimeout(function() {
        console.log(" [x] Done");
        ch.ack(msg);
      }, secs * 1000);
    }, {noAck: false});
  });
});
相关文章
相关标签/搜索