先决条件
本教程假定 RabbitMQ 已经安装,并运行在localhost
标准端口(5672)。若是你使用不一样的主机、端口或证书,则须要调整链接设置。javascript从哪里得到帮助
若是您在阅读本教程时遇到困难,能够经过邮件列表 联系咱们。html
发布/订阅#
(使用 .NET Client)java
在 教程[2] 中,咱们建立了一个工做队列,假设在工做队列中的每个任务都只被分发给一个 Worker。那么在这一章节,咱们要作与之彻底不一样的事,那就是咱们将要把一条消息分发给多个消费者。这种模式被称为“发布/订阅”。nginx
为了说明、体现这种模式,咱们将会建一个简单的日志系统。它将会包含两个程序 - 第一个用来发送日志消息,第二个用来接收并打印它们。git
在咱们创建的日志系统中,每一个接收程序的运行副本都会收到消息。这样咱们就能够运行一个接收程序接收消息并将日志写入磁盘;同时运行另一个接收程序接收消息并将日志打印到屏幕上。github
实质上,发布的日志消息将会被广播给全部的接收者。docker
交换器#
在教程的前几部分,咱们是发送消息到队列并从队列中接收消息。如今是时候介绍 Rabbit 中完整的消息传递模型了。shell
让咱们快速回顾一下前面教程中的内容:bash
- 生产者是发送消息的用户应用程序。
- 队列是存储消息的缓冲区。
- 消费者是接收消息的用户应用程序。
在 RabbitMQ 中,消息传递模型的核心理念是生产者历来不会把任何消息直接发送到队列,其实,一般生产者甚至不知道消息是否会被分发到任何队列中。服务器
然而,生产者只能把消息发送给交换器。交换器很是简单,一方面它接收来自生产者的消息,另外一方面又会把接收的消息推送到队列中。交换器必须明确知道该如何处理收到的消息,应该追加到一个特定队列中?仍是应该追加到多个队列中?或者应该把它丢弃?这些规则都被定义在交换器类型中。
目前交换器类型有这几种:direct
,topic
,headers
和fanout
。咱们先重点关注最后一个fanout
,咱们建立一个这种类型的交换器,将其命名为logs
:
channel.ExchangeDeclare("logs", "fanout");
fanout
类型交换器很是简单,正如您可能从名字中猜出的那样,它会把收到的全部消息广播到它已知的全部队列中。这恰巧是咱们的日志系统目前所须要的。
列举交换器
要列举出服务器上的交换器,您可使用很是有用的rabbitmqctl
命令行工具:sudo rabbitmqctl list_exchanges
执行上述命令后,出现的列表中将会有一些
amq.*
交换器和默认(未命名)交换器。这些是默认建立的,不过目前您可能用不到它们。默认交换器
在教程的前些部分,咱们对交换器这一律念还一无所知,但仍然能够把消息发送到队列。之因此这样,是由于咱们使用了一个用空字符串(""
)标识的默认交换器。回顾一下咱们以前如何发布消息:
var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
第一个参数就是交换器的名称,空字符串表示默认或匿名交换器:将消息路由到
routingKey
指定的队列(若是存在)中。
如今,咱们能够把消息发布到咱们指定的交换器:
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);
临时队列#
您是否还记得以前咱们使用过的队列,它们都有一个特定的名称(记得应该是hello
和task_queue
吧)。给队列命名对咱们来讲是相当重要的 -- 由于咱们可能须要多个 Worker 指向同一个队列;当您想要在生产者和消费者之间共享队列时,给队列一个名称也是很是重要的。
可是,咱们建立的日志系统并不但愿如此。咱们但愿监听全部的日志消息,而不只仅是其中一部分。咱们也只对目前流动的消息感兴趣,而不是旧消息。为解决这个问题,咱们须要作好两件事。
首先,咱们不管什么时候链接 Rabbit,都须要一个新的、空的队列。要作到这一点,咱们可使用随机名称来建立队列,或许,甚至更好的方案是让服务器为咱们选择一个随机队列名称。
其次,一旦咱们与消费者断开链接,与之相关的队列应该被自动删除。
在 .NET 客户端中,若是不向QueueDeclare()
方法提供任何参数,实际上就是建立了一个非持久化、独占、且自动删除的随机命名队列:
var queueName = channel.QueueDeclare().QueueName;
您能够在 队列指南 中了解更多关于exclusive
参数和其余队列属性的信息。
此时,queueName
包含一个随机队列名称。例如,它看起来可能像amq.gen-JzTY20BRgKO-HjmUJj0wLg
。
绑定#
咱们已经建立好了一个fanout
交换器和一个队列。如今咱们须要告诉交换器把消息发送到咱们的队列。而交换器和队列之间的关系就称之为绑定。
// 把一个队列绑定到指定交换器。 channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
从如今起,logs
交换器会把消息追加到咱们的队列中。
列举绑定
您可使用(您或许已经猜到了),列举出现有的绑定。sudo rabbitmqctl list_bindings
组合在一块儿#
生产者程序负责分发消息,这与以前的教程看起来没有太大区别。
最重要的变化是咱们如今想把消息发布到咱们的logs
交换器,而不是匿名交换器。在发送时咱们须要提供一个路由键routingKey
,可是对于fanout
交换器,它的值能够被忽略。这里是EmitLog.cs
文件的代码:
using System; using RabbitMQ.Client; using System.Text; class EmitLog { public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "logs", type: "fanout"); var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } private static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "info: Hello World!"); } }
(EmitLog.cs 源码)
如你所见,在创建链接后,咱们声明了交换器。这一步很是有必要,由于发布消息到一个不存在的交换器,这种状况是被禁止的。
若是没有队列绑定到交换器上,消息将会丢失,但这对咱们来讲并无什么没问题;若是没有消费者正在监听,咱们是能够放心地把消息丢弃的。
ReceiveLogs.cs
的代码:
using System; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text; class ReceiveLogs { public static void Main() { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "logs", type: "fanout"); var queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queue: queueName, exchange: "logs", routingKey: ""); Console.WriteLine(" [*] Waiting for logs."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] {0}", message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
(ReceiveLogs.cs 源码)
按照 教程[1]中的设置说明生成EmitLogs
和ReceiveLogs
项目。
若是您想把日志保存到文件中,只需打开一个控制台并输入:
cd ReceiveLogs
dotnet run > logs_from_rabbit.log
若是你想在屏幕上看到日志,我能够新开一个终端并运行:
cd ReceiveLogs
dotnet run
固然,分发日志须要输入:
cd EmitLog
dotnet run
使用rabbitmqctl list_bindings
命令,您能够验证代码是否真正建立了咱们想要的绑定和队列。当有两个ReceiveLogs.cs
程序运行时,您应该看到以下所示的内容:
sudo rabbitmqctl list_bindings
# => Listing bindings ... # => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] # => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] # => ...done.
对执行结果的解释简洁明了:来自logs
交换器的数据转发到了两个由服务器随机分配名称的队列。这正是咱们期待的结果。
想要了解如何监听消息的这一块内容,让咱们继续阅读 教程[4]。
写在最后#
本文翻译自 RabbitMQ 官方教程 C# 版本。如本文介绍内容与官方有所出入,请以官方最新内容为准。水平有限,翻译的很差请见谅,若有翻译错误还请指正。
- 原文连接:RabbitMQ tutorial - Publish/Subscribe
- 实验环境:RabbitMQ 3.7.4 、.NET Core 2.1.三、Visual Studio Code
- 最后更新:2018-06-11