《RabbitMQ Tutorial》译文 第 3 章 发布和订阅

原文来自 RabbitMQ 英文官网教程(3.Publish and Subscribe),其示例代码采用了 .NET C# 语言。html

Markdown

In the previous tutorial we created a work queue. The assumption behind a work queue is that each task is delivered to exactly one worker. In this part we'll do something completely different -- we'll deliver a message to multiple consumers. This pattern is known as "publish/subscribe".git

在以前的教程中咱们建立了一个工做队列,其背后的设想即是每个任务刚好地递送给一个工做单元。在本教程中,咱们的作法将彻底不一样 -- 即递送消息给多个消费者,这个模式被称做“发布/订阅”。github

To illustrate the pattern, we're going to build a simple logging system. It will consist of two programs -- the first will emit log messages and the second will receive and print them.安全

为了说明这个模式,咱们将要构建一个简单日志系统。它将由两个程序组成 -- 第一个将会发送日志消息,而第二个将会接收并打印它们。app

In our logging system every running copy of the receiver program will get the messages. That way we'll be able to run one receiver and direct the logs to disk; and at the same time we'll be able to run another receiver and see the logs on the screen.less

在咱们的日志系统中,每个正在运行的接收程序副本都将得到消息。如此,咱们将运行一个接收者来将日志写入磁盘,与此同时再运行另外一个接收者,这样能够在屏幕上查看日志。dom

Essentially, published log messages are going to be broadcast to all the receivers.ide

本质上来说,已发布的日志消息将会广播给全部的接收者。ui

Exchanges

交换机

In previous parts of the tutorial we sent and received messages to and from a queue. Now it's time to introduce the full messaging model in Rabbit.this

在本教程的以前部分,咱们从队列中发送和接收消息,如今是时候来介绍 Rabbit 中完整的消息模型了。

Let's quickly go over what we covered in the previous tutorials:

  • A producer is a user application that sends messages.
  • A queue is a buffer that stores messages.
  • A consumer is a user application that receives messages.

让咱们快速温习一下以前教程中所包括的内容:

  • 生产者,用以发送消息的用户程序。
  • 队列,即存储消息的缓冲区。
  • 消费者,用以接收消息的用户程序。

The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all.

在 RabbitMQ 中,消息模型的核心理念是生产者永远不会把任何消息直接发送到队列。事实上,一般生产者甚至不知道消息是否会被递送到任何队列。

Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.

取而代之的是,生产者只能发送消息给一个交换机。交换机很简单,一方面它从生产者接收消息,另外一方面它又把消息推送到队列。可是交换机必须明确知道对它所接收到的消息该作什么。(好比)须要将消息追加到指定队列?仍是追加到多个队列?仍是要丢弃?这些规则都是在交换机类型中定义。

Markdown

There are a few exchange types available: direct, topic, headers and fanout. We'll focus on the last one -- the fanout. Let's create an exchange of this type, and call it logs:

目前有若干交换机类型可用:direct、topic、headers 以及 fanout。咱们将以最后一个为重点 -- fanout,让咱们建立一个该类型的交换机,并将其叫做“logs”:

channel.ExchangeDeclare("logs", "fanout");

The fanout exchange is very simple. As you can probably guess from the name, it just broadcasts all the messages it receives to all the queues it knows. And that's exactly what we need for our logger.

fanout 型交换机很是简单,从它的名称你可能猜出,它会广播全部已接收到的消息给全部已知的队列,这正好是咱们的日志系统所须要的。

Listing exchanges
列举出交换机

To list the exchanges on the server you can run the ever useful rabbitmqctl:

为了列举出服务端的交换机,你能够运行此前很是有用的 rabbitmqctl 命令:

sudo rabbitmqctl list_exchanges

In this list there will be some amq.* exchanges and the default (unnamed) exchange. These are created by default, but it is unlikely you'll need to use them at the moment.

在这个列表中有一些 amq.* 和默认的(未命名)交换机,这些都是默认建立的,但此时你不太可能须要用到它们。

The default exchange
默认的交换机

In previous parts of the tutorial we knew nothing about exchanges, but still were able to send messages to queues. That was possible because we were using a default exchange, which we identify by the empty string ("").

在教程的以前部分咱们对交换机还一无所知,但这并不妨碍咱们可以发送消息到队列中,这之因此成为可能,是由于咱们使用了基于空字符串来标识的默认交换机

Recall how we published a message before:

回忆一下之前咱们是如何发布消息的:

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
                    routingKey: "hello",
                    basicProperties: null,
                    body: body);

The first parameter is the the name of the exchange. The empty string denotes the default or nameless exchange: messages are routed to the queue with the name specified by routingKey, if it exists.

第一个参数就是交换机的名字。空字符串表示默认的或者匿名的交换机:根据明确的路由键(routingKey)将消息路由到已存在的队列。

Now, we can publish to our named exchange instead:

如今,咱们用具名(自定义命名)的交换机来进行发布:

var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",
                     routingKey: "",
                     basicProperties: null,
                     body: body);

Temporary queues

临时队列

As you may remember previously we were using queues which had a specified name (remember hello and task_queue?). Being able to name a queue was crucial for us -- we needed to point the workers to the same queue. Giving a queue a name is important when you want to share the queue between producers and consumers.

你可能还记得以前咱们使用过指定名称的队列(记得好像是 hello 和 task_queue?)。可以为队列命名对咱们来讲是相当重要的 -- 咱们须要将工做单元指向相同的队列。一样,当你想在生产者和消费者之间共享队列时,为队列赋予一个名字也是很是重要的。

But that's not the case for our logger. We want to hear about all log messages, not just a subset of them. We're also interested only in currently flowing messages not in the old ones. To solve that we need two things.

可是,以上并非咱们日志系统的案例。咱们想要监听全部的日志消息,而不只仅是它们的一部分。咱们只对当前正在流动的消息感兴趣,而不是旧的消息,为解决这个问题我须要作好两件事。

Firstly, whenever we connect to Rabbit we need a fresh, empty queue. To do this we could create a queue with a random name, or, even better - let the server choose a random queue name for us.

首先,不管咱们什么时候链接到 Rabbit,都须要一个崭新的、空的队列,为作到这一点咱们可使用随机名称来建立一个队列,固然更好的作法是,让服务端为咱们选择一个随机名称。

Secondly, once we disconnect the consumer the queue should be automatically deleted.

其次,一旦咱们与消费者断开链接,相关的队列也应当能自动删除。

In the .NET client, when we supply no parameters to queueDeclare() we create a non-durable, exclusive, autodelete queue with a generated name:

在 .NET 客户端中,当咱们调用 queueDeclare 方法而并未提供任何参数时,实际上就是建立了一个非持久化、独享,且自动删除的具名队列。

var queueName = channel.QueueDeclare().QueueName;

At that point queueName contains a random queue name. For example it may look like amq.gen-JzTY20BRgKO-HjmUJj0wLg.

在此处 queueName 包含的是一个随机队列名称,好比它看起来可能像 amq.gen-JzTY20BRgKO-HjmUJj0wLg。

Bindings

绑定

Markdown

We've already created a fanout exchange and a queue. Now we need to tell the exchange to send messages to our queue. That relationship between exchange and a queue is called a binding.

咱们已经建立了一个 fanout 型交换机和队列,如今咱们须要告诉交换机把消息发送到队列,那么,交换机和队列之间的关系就被称做绑定

channel.QueueBind(queue: queueName,
                  exchange: "logs",
                  routingKey: "");

From now on the logs exchange will append messages to our queue.

从如今开始,日志交换机将会把消息追加到队列中。

Listing bindings
列举绑定

You can list existing bindings using, you guessed it:

你能够列举出现有的绑定,(所使用的命令)你该能够猜到:

rabbitmqctl list_bindings

Putting it all together

融合一块儿

Markdown

The producer program, which emits log messages, doesn't look much different from the previous tutorial. The most important change is that we now want to publish messages to our logs exchange instead of the nameless one. We need to supply a routingKey when sending, but its value is ignored for fanout exchanges. Here goes the code for EmitLog.cs file:

发出日志消息的生产者程序,与以前教程看起来并没有太大不一样。如今,最重要的变化莫过于咱们想把消息发布到名为 logs 的交换器中,而非匿名。在发送消息时咱们须要提供一个路由键(routingKey),只不过它的值在 fanout 型交换机中被忽略了,针对 EmitLog.cs 文件的代码以下:

using System;
using RabbitMQ.Client;
using System.Text;

class EmitLog
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "logs", type: "fanout");

            var message = GetMessage(args);
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "logs",
                                 routingKey: "",
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }

    private static string GetMessage(string[] args)
    {
        return ((args.Length > 0)
               ? string.Join(" ", args)
               : "info: Hello World!");
    }
}

(EmitLog.cs source)

As you see, after establishing the connection we declared the exchange. This step is necessary as publishing to a non-existing exchange is forbidden.

如同你所见,在创建好链接以后咱们声明了交换机。这一步很是有必要,由于禁止向一个不存在的交换机发布消息。

The messages will be lost if no queue is bound to the exchange yet, but that's okay for us; if no consumer is listening yet we can safely discard the message.

若是尚没有任何队列绑定到交换机,消息将会丢失,但这对咱们来讲并无什么问题;若是没有任何消费者正在监听,咱们能够将消息安全地删除。

The code for ReceiveLogs.cs:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class ReceiveLogs
{
    public static void Main()
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "logs", type: "fanout");

            var queueName = channel.QueueDeclare().QueueName;
            channel.QueueBind(queue: queueName,
                              exchange: "logs",
                              routingKey: "");

            Console.WriteLine(" [*] Waiting for logs.");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] {0}", message);
            };
            channel.BasicConsume(queue: queueName,
                                 autoAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

(ReceiveLogs.cs source)

Follow the setup instructions from tutorial one to generate the EmitLogs and ReceiveLogs projects.

从教程的第一章开始,跟随安装说明来生成 EmitLogs 和 ReceiveLogs 工程。

If you want to save logs to a file, just open a console and type:

若是你想保存日志到一个文件,只需打开控制台并输入:

cd ReceiveLogs
dotnet run > logs_from_rabbit.log

If you wish to see the logs on your screen, spawn a new terminal and run:

若是你想在屏幕上查看日志,重开一个新的终端并运行:

cd ReceiveLogs
dotnet run

And of course, to emit logs type:

固然,产生日志只需输入:

cd EmitLog
dotnet run

Using rabbitmqctl list_bindings you can verify that the code actually creates bindings and queues as we want. With two ReceiveLogs.cs programs running you should see something like:

使用 rabbitmqctl list_bindings 命令你能够核实代码的确已经建立了咱们指望的绑定和队列,伴随着 ReceiveLogs.cs 程序的运行你应该能够看到相似以下内容:

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

The interpretation of the result is straightforward: data from exchange logs goes to two queues with server-assigned names. And that's exactly what we intended.

对结果的解释就很是简洁明了:来自 logs 交换机的数据将去往两个由服务端分配名称的队列,而这刚好是咱们所指望的。

相关文章
相关标签/搜索