[译] RabbitMQ tutorials (3) ---- 'Pub/Sub' (Javascript)

发布与订阅 (Publish/Subscribe)

在以前的章节中,咱们建立了工做队列,以前的工做队列的假设是每一个任务只被分发到一个worker。在这一节中,咱们会作一些彻底不同的事--把一条消息发送给多个消费者,这个模式叫作“发布/订阅”(publish/subscribe)。javascript

举个例子,咱们要构建一个简易的日志系统。由两个程序组成---一个来发出日志消息,另外一个接收并把消息显示出来。java

在咱们的日志系统当中,每个正在运行的接收程序都会收到消息。这样,咱们能够运行一个receiver并把log定向到磁盘,而后再跑一个receiver,看看它是否会在屏幕上显示日志。node

事实上,被发布的消息会被广播到全部的receiver那里。git

交换器(Exchanges)

在以前的引导中,咱们从一个队列中作了收发的操做。是时候介绍在Rabbit中的所有的消息模型了。github

让咱们先快速地回顾一下以前学习的,api

  • producer 是一个发送消息的应用安全

  • queue 是一个存储消息的buffer服务器

  • consumer 是一个接收消息的应用less

RabbitMQ中,消息模型的核心思想是生产者毫不会把消息直接发到队列。实际上,生产者一般不知道一条消息是否已经被发送到任意一个队列中。学习

生产者只能把消息发到交换器。交换器是个简单的东西。一方面接收从生产者那边来的消息,另外一方面把他们push到队列中。交换器必定要知道当它们接收到消息以后要如何处理。是否要追加到一个特殊的队列?是否要追加到许多的队列?或者丢掉这条消息?这些规则被定义为交换类型。

exchanges

如下是可使用的交换类型:direct, topic, header, fanout。咱们介绍一下最后一个--fanout。让咱们先建立一个fanout类型的交换器“logs”:

ch.assertExchange('logs', 'fanout', {durable: false})

fanout类型的交换器很是简单,咱们能够从单单从名字上猜想,它就是把它接收到的消息广播给全部已知的队列。这也就是咱们的logger所须要的。


列出全部的交换器(Listing exchanges)
你可使用rabbitmqctl

$sudo rabbitmqctl list_exchanges
Listing exchanges ...
direct
amq.direct      direct
amq.fanout      fanout
amq.headers     headers
amq.match       headers
amq.rabbitmq.log        topic
amq.rabbitmq.trace      topic
amq.topic       topic
logs    fanout
...done.

在列表中,一些amq.*的交换器和一些默认的(未命名的),都是被默认建立的,可是多是你用不到的

未命名的交换器(Nameless exchange)
在以前的章节中咱们未提过交换器,可是咱们仍然可以把消息传到队列中,这就是咱们使用了默认的交换器,由于咱们使用了空的字符串("")。
以前咱们是这样发布一条消息的
ch.sendToQueue('hello', new Buffer('Hello World!'));
这里咱们使用默认的或者未命名的交换器,若是第一个参数存在的话,消息会被路由到这个参数名的队列。


如今,咱们可使用咱们定义好的交换器

ch.publish('logs', '', new Buffer('Hello World!'));

第二个参数为空的话表明咱们不想把消息推到指定的队列,只是想发布到logs的交换器中。

临时队列 (Temporary queues)

你还记得咱们以前用的声明过的队列(hello 和 task_queue)吗?。可以指明一个队列的名字对咱们来讲是重要的--咱们须要把workers指到相同的队列。
当你想要分享给消费者和生产者队列的时候,给队列起一个名字很重要。

但着不是咱们logger这个程序须要的,咱们想监听全部的log消息,不是一部分log消息。一样的,咱们对正在流动的消息也感兴趣(not in the old ones).咱们须要完成两件事情:
第一,无论咱们何时链接Rabbit,都须要一个新的,空的队列。咱们能够建立一个随机的队列名字,或者让服务器为咱们随机选择一个队列名字。
第二,无论咱们何时断开与消费者的链接,队列须要自动销毁。

amqp.node的客户端中,当咱们传入字符串的时候,能够建立一个带有名字的未持久化的队列

ch.assertQueue('', {exclusive: true});

这个方法返回一个带有随机名字的队列实例,好比amq.gen-JzTY20BRgKO-HjmUJj0wLg
当链接被断开的时候,这个队列会被销毁,由于咱们在声明的时候{exclusive:true}

绑定 (Bindings)

binding

咱们已经建立了一个fanout类型的交换器和一个队列,如今咱们须要告诉交换器把消息发送给队列,队列与交换器之间的关系咱们称之为绑定。
ch.bindQueue(queue_name, 'logs', '');
如今开始,logs的交换器为追加消息到咱们的队列

Listing bindings:

你能够列出已经存在的绑定关系,你应该猜到。rabbitmqctl list_bindings

整合(Putting it all together)

all

生产者的程序,用来发出log消息,和以前章节没有太多的不一样,最重要的改变就是如今咱们是把消息发布到咱们的logs的交换器中,而不是以前的在未声明的状况下使用。发送的时候咱们须要提供一个路由键,可是在fanout类型当中,这个能够忽略。下面是emit_log.js的代码

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var ex = 'logs';
    var msg = process.argv.slice(2).join(' ') || 'Hello World!';

    ch.assertExchange(ex, 'fanout', {durable: false});
    ch.publish(ex, '', new Buffer(msg));
    console.log(" [x] Sent %s", msg);
  });

  setTimeout(function() { conn.close(); process.exit(0) }, 500);
});

(emit_log.js 源码)

正如你所见,在与交换器创建链接以后。有一点很关键,向不存在的交换器发布消息是被禁止的。
若是仍然没有队列绑定交换器,消息会丢失。可是对咱们来讲还好,若是仍然没有消费者监听,咱们能够安全地丢弃这些消息。

receive_logs.js的代码

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var ex = 'logs';

    ch.assertExchange(ex, 'fanout', {durable: false});

    ch.assertQueue('', {exclusive: true}, function(err, q) {
      console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue);
      ch.bindQueue(q.queue, ex, '');

      ch.consume(q.queue, function(msg) {
        console.log(" [x] %s", msg.content.toString());
      }, {noAck: true});
    });
  });
});

(receive_logs,js源码)

若是你想要保存log,你能够打开控制台输入

$ ./receive_logs.js > logs_from_rabbit.log

若是你想在屏幕上看到log,再打开一个控制台

$ ./receive_logs.js

固然,须要发出logs

$ ./emit_log.js

使用rabbitmqctl list_bindings,你能够肯定刚才的代码确实建立了交换器和队列,有两个receive_logs.js的程序在运行。

$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
...done.

这个结果的简要解释:数据从logs交换器到两个服务器分配的队列。这也是咱们想要的结果。

要如何监听一部分的消息?让咱们移到下一章。

相关文章
相关标签/搜索