在前一篇教程中,咱们建立了一个工做队列,咱们假设在工做队列后的每个任务都只被调度给一个消费者。在这一部分,咱们将作一些彻底不同的事情,调度同一条消息给多个消费者,也就是有名的“发布-订阅”模式。为了阐述这种模式,咱们将构建一个简单的日志系统。该系统将由两部分组成:一部分发送日志消息,另外一部分接收而且打印日志消息,在这个日志系统中,每一份运行着的接收程序都将会收到消息。这样咱们能够运行一个接收者把日志写入到磁盘中,同时能够运行另外一个接收者将日志打印到显示器上面。也就是说,发布的日志消息会被广播到全部的接收者。html
在前面的教程中,咱们发送消息到队列,而后从队列中接收消息。如今开始介绍RabbitMQ完整的消息模式。缓存
让咱们快速的复习一下在前面的教程中讲过的内容:安全
RabbitMQ消息模式的核心是生产者从不直接发送消息到队列。事实上,生产者每每不知道他产生的消息会被分发到哪些队列,它只能将消息发送到一个交换器。交换器很是简单,它一方面从生产者接收消息,另外一方面又将消息压入队列中。交换器必须清楚的知道要用接收到的消息作什么,是应当追加到某个指定的队列?或者追加到不少队列?或者应当丢弃?要完成这些的规则都被定义在交换器的类型中。服务器
有几种可用的交换器类型:direct、topic、headers和fanout。本文主要关注最后一种类型:fanout,让咱们建立一个这种类型的交换器,命名为logs:函数
1 channel.ExchangeDeclare("logs", "fanout");
类型为fanout的交换器很是简单,顾名思义,它会广播全部收到的消息到它知道的全部的队列,而这也正是咱们的日志系统所须要的。spa
交换器清单日志
为了展现服务器上交换器的清单,你能够运行在任什么时候候都特别有用的rabbitmqctl:code
1 $ sudo rabbitmqctl list_exchanges 2 Listing exchanges ... 3 direct 4 amq.direct direct 5 amq.fanout fanout 6 amq.headers headers 7 amq.match headers 8 amq.rabbitmq.log topic 9 amq.rabbitmq.trace topic 10 amq.topic topic 11 logs fanout 12 ...done.
在清单里,有一些amp.*样式的交换器和一个默认(未命名)的交换器,这些都是默认建立的,但并非说你立刻就须要使用它们。htm
匿名交换器blog
在前面的教程中咱们并不知晓交换器的任何信息,可是任然能够将消息发送到队列中,那是由于咱们使用了默认的交换器,使用空字符串表示("")。
回忆一下以前是如何发布消息的:
1 var message = GetMessage(args); 2 var body = Encoding.UTF8.GetBytes(message); 3 channel.BasicPublish(exchange: "", 4 routingKey: "hello", 5 basicProperties: null, 6 body: body);
第一个参数就是交换器的名称,空字符串指代的是默认交换器或者是匿名交换器,若是队列存在,消息将经过指定的routingKey路由到队列。
如今咱们能够将消息发布到上面定义的命名交换器了:
1 var message = GetMessage(args); 2 var body = Encoding.UTF8.GetBytes(message); 3 channel.BasicPublish(exchange: "logs", 4 routingKey: "", 5 basicProperties: null, 6 body: body);
你或许还记得咱们以前使用的有指定名称的队列(还记得hello和task_queue么?)。能为队列命名对咱们来讲是相当重要的,咱们须要指定给消费者相同的队列。当你想在生产者和消费者间共享队列时,给队列指定一个名字就显得特别重要了。
可是这并非咱们日志系统的问题。咱们但愿能监听到全部消息,而不只仅是其中一个子集;咱们对当前流入的消息感兴趣而不是以前的旧信息。为了解决这个问题,咱们须要作两件事:第1、不管什么时候链接到RabbitMQ,咱们须要一个新的空队列,为此咱们能够建立一个拥有随机名称的队列或者更好的是直接让RabbitMQ服务替咱们生成一个随机名称;第2、一旦消费者断开链接,队列应当被自动删除。
在.NET 客户端,咱们经过提供无参数的QueueDeclare()函数能够建立一个不持久化、独占的、自动删除的拥有随机名称的队列:
1 var queueName = channel.QueueDeclare().QueueName;
这样queueName就是一个随机的队列名称,看起来会是这样的:amq.gen-JzTY20BRgKO-HjmUJj0wLg。
咱们已经建立了一个fanout类型的交换器和一个队列,如今须要告诉交换器把消息发送到咱们的队列。交换器和队列的关系就叫作绑定。
1 channel.QueueBind(queue: queueName, 2 exchange: "logs", 3 routingKey: "");
到目前为止,交换器logs将能添加消息到咱们的队列中了。
绑定清单
你能够经过rabbitmqctl list_bingdings命令查看绑定清单。
发送日志的生产者程序和以前教程里面的没有太多不一样,最重要的改变是如今咱们但愿将消息发送到logs交换器,而不是以前的匿名交换器。当发送消息的时候,咱们须要指定一个routingKey,可是在使用fanout类型交换器的时候,它的值将被忽略。下面是EmitLog.cs文件里面的代码:
1 using System; 2 using RabbitMQ.Client; 3 using System.Text; 4 5 class EmitLog 6 { 7 public static void Main(string[] args) 8 { 9 var factory = new ConnectionFactory() { HostName = "localhost" }; 10 using(var connection = factory.CreateConnection()) 11 using(var channel = connection.CreateModel()) 12 { 13 channel.ExchangeDeclare(exchange: "logs", type: "fanout"); 14 15 var message = GetMessage(args); 16 var body = Encoding.UTF8.GetBytes(message); 17 channel.BasicPublish(exchange: "logs", 18 routingKey: "", 19 basicProperties: null, 20 body: body); 21 Console.WriteLine(" [x] Sent {0}", message); 22 } 23 24 Console.WriteLine(" Press [enter] to exit."); 25 Console.ReadLine(); 26 } 27 28 private static string GetMessage(string[] args) 29 { 30 return ((args.Length > 0) 31 ? string.Join(" ", args) 32 : "info: Hello World!"); 33 } 34 }
如你所见,在建立连接以后咱们申明了交换器,这一步用于禁止发布到不存在的交换器是颇有必要的。若是没有队列绑定到交换器发布的消息将会丢失,这是没有问题的;若是没有消费者监听消息,咱们能够安全的销毁它。
ReceiveLog.cs中的代码:
1 using System; 2 using RabbitMQ.Client; 3 using RabbitMQ.Client.Events; 4 using System.Text; 5 6 class ReceiveLogs 7 { 8 public static void Main() 9 { 10 var factory = new ConnectionFactory() { HostName = "localhost" }; 11 using(var connection = factory.CreateConnection()) 12 using(var channel = connection.CreateModel()) 13 { 14 channel.ExchangeDeclare(exchange: "logs", type: "fanout"); 15 16 var queueName = channel.QueueDeclare().QueueName; 17 channel.QueueBind(queue: queueName, 18 exchange: "logs", 19 routingKey: ""); 20 21 Console.WriteLine(" [*] Waiting for logs."); 22 23 var consumer = new EventingBasicConsumer(channel); 24 consumer.Received += (model, ea) => 25 { 26 var body = ea.Body; 27 var message = Encoding.UTF8.GetString(body); 28 Console.WriteLine(" [x] {0}", message); 29 }; 30 channel.BasicConsume(queue: queueName, 31 noAck: true, 32 consumer: consumer); 33 34 Console.WriteLine(" Press [enter] to exit."); 35 Console.ReadLine(); 36 } 37 } 38 }
像以前那样编译,工做就完成了。
1 $ csc /r:"RabbitMQ.Client.dll" EmitLogs.cs 2 $ csc /r:"RabbitMQ.Client.dll" ReceiveLogs.cs
若是你想将日志保存到文件中,打开控制台而后输入:
1 $ ReceiveLogs.exe > logs_from_rabbit.log
若是你想在屏幕上看到日志,打开一个新的终端,执行下面的代码:
1 $ ReceiveLogs.exe
固然,发送日志输入:
1 $ EmitLog.exe
使用rabbitmqctl list_bindings命令,能够看到代码确如咱们但愿的那样建立了绑定和队列。若是同时运行两个消费者(ReceiveLogs.cs)你将能看到下面这样的信息:
1 $ sudo rabbitmqctl list_bindings 2 Listing bindings ... 3 logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue [] 4 logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue [] 5 ...done.
结果很是的直观:数据从交换器logs发送到两个服务自动指定名称的队列,这正是咱们以前预期的。
要了解如何监听消息的子集,让咱们进入下一篇。
原文连接:http://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html