【译】RabbitMQ:路由(Routing)

在前一篇中,咱们构建了一个简单的日志系统,咱们已经可以广播消息到许多的接收者。在这一篇中,咱们但愿增长一个特性,让订阅消息的子集成为可能。例如,咱们能够将重要的错误日志存放到日志文件(即,磁盘上面),同时将仍然全部的日志信息打印到控制台。html

绑定

在前面的例子中咱们已经建立过绑定,你应该还能记得下面的代码:算法

1 channel.QueueBind(queue: queueName,
2                   exchange: "logs",
3                   routingKey: "");

一个绑定是一个交换器和一个队列的关系,能够形象的解读为:这个队列对来自这个交换器的消息感兴趣。绑定能够拥有一个额外的参数:routingKey,为了不和BasicPublish里面的routingKey混淆,咱们称其为绑定关键字(binding Key,下面展现了如何经过一个绑定关键字建立一个绑定:spa

1 channel.QueueBind(queue: queueName,
2                   exchange: "direct_logs",
3                   routingKey: "black");

绑定关键字的意义依赖于交换器的类型,前面咱们使用过的fanout类型的交换器,直接简单的忽略了它的绑定关键字的值。3d

Direct交换器

前一篇教程中的日志系统广播全部的消息到全部的消费者,咱们但愿扩展它能根据严重程度对消息进行过滤。例如,咱们但愿写日志到磁盘的程序可以只接收重要的错误,不要在“警告”、“信息”级别的日志身上浪费磁盘空间。日志

咱们使用fanout类型的交换器,并不能带来很大的灵活性,它只能盲目的广播。做为替代,咱们将使用direct类型的交换器,这种交换器背后的路由算法很是简单,消息发送到绑定关键字(binding Key)和消息的路由关键字(routing Key)彻底匹配的队列。code

为了帮助说明,设想下面的配置:htm

                                         

在这个设置中,咱们看到direct类型的交换器X绑定了两个队列.第一个队列绑定关键字是orange;第二个队列有两个绑定,一个是black,另外一个是green。在这样的配置中,发送到使用orange为路由关键字的消息将被路由到Q1,使用blackgreen为路由关键字的消息将被路由到队列Q2,其余消息将被销毁。blog

多绑定

                                         

使用同一个绑定关键字绑定多个队列是彻底合法的,在咱们的示例中咱们能够在XQ1中添加一个以black为绑定关键字的绑定。在这种状况下,direct类型的交换器会展示出fanout类型交换器的行为,而且会广播消息到全部知足条件的队列,发送到路由关键字为black的消息会发送到Q1Q2.教程

发送日志

咱们会为日志系统使用这种模式,将消息发送到direct类型而非fanout类型的交换器。咱们将使用日志的严重等级来做为路由关键字,这样的话消息的接收程序就能够依据本身的须要选择不一样的严重等级。首先让咱们把注意力集中到发送日志上。rabbitmq

和往常同样,咱们首先须要建立一个交换器:

1 channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");

接着咱们准备好发送消息了:

1 var body = Encoding.UTF8.GetBytes(message);
2 channel.BasicPublish(exchange: "direct_logs",
3                      routingKey: severity,
4                      basicProperties: null,
5                      body: body);

为了简化问题,咱们假设严重等级是‘info’、‘warning’、‘error’。

订阅

立刻就能够像前面的教程中那样接收消息了,可是有一个不一样点,咱们要用本身感兴趣的全部日志严重程度分别建立一个绑定。

1 var queueName = channel.QueueDeclare().QueueName;
2 
3 foreach(var severity in args)
4 {
5     channel.QueueBind(queue: queueName,
6                       exchange: "direct_logs",
7                       routingKey: severity);
8 } 

组合在一块儿

                                       

EmitLogDirect.cs类的代码:

 1 using System;
 2 using System.Linq;
 3 using RabbitMQ.Client;
 4 using System.Text;
 5 
 6 class EmitLogDirect
 7 {
 8     public static void Main(string[] args)
 9     {
10         var factory = new ConnectionFactory() { HostName = "localhost" };
11         using(var connection = factory.CreateConnection())
12         using(var channel = connection.CreateModel())
13         {
14             channel.ExchangeDeclare(exchange: "direct_logs",
15                                     type: "direct");
16 
17             var severity = (args.Length > 0) ? args[0] : "info";
18             var message = (args.Length > 1)
19                           ? string.Join(" ", args.Skip( 1 ).ToArray())
20                           : "Hello World!";
21             var body = Encoding.UTF8.GetBytes(message);
22             channel.BasicPublish(exchange: "direct_logs",
23                                  routingKey: severity,
24                                  basicProperties: null,
25                                  body: body);
26             Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
27         }
28 
29         Console.WriteLine(" Press [enter] to exit.");
30         Console.ReadLine();
31     }
32 }

ReceiveLogsDirect.cs的代码:

 1 using System;
 2 using RabbitMQ.Client;
 3 using RabbitMQ.Client.Events;
 4 using System.Text;
 5 
 6 class ReceiveLogsDirect
 7 {
 8     public static void Main(string[] args)
 9     {
10         var factory = new ConnectionFactory() { HostName = "localhost" };
11         using(var connection = factory.CreateConnection())
12         using(var channel = connection.CreateModel())
13         {
14             channel.ExchangeDeclare(exchange: "direct_logs",
15                                     type: "direct");
16             var queueName = channel.QueueDeclare().QueueName;
17 
18             if(args.Length < 1)
19             {
20                 Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
21                                         Environment.GetCommandLineArgs()[0]);
22                 Console.WriteLine(" Press [enter] to exit.");
23                 Console.ReadLine();
24                 Environment.ExitCode = 1;
25                 return;
26             }
27 
28             foreach(var severity in args)
29             {
30                 channel.QueueBind(queue: queueName,
31                                   exchange: "direct_logs",
32                                   routingKey: severity);
33             }
34 
35             Console.WriteLine(" [*] Waiting for messages.");
36 
37             var consumer = new EventingBasicConsumer(channel);
38             consumer.Received += (model, ea) =>
39             {
40                 var body = ea.Body;
41                 var message = Encoding.UTF8.GetString(body);
42                 var routingKey = ea.RoutingKey;
43                 Console.WriteLine(" [x] Received '{0}':'{1}'",
44                                   routingKey, message);
45             };
46             channel.BasicConsume(queue: queueName,
47                                  noAck: true,
48                                  consumer: consumer);
49 
50             Console.WriteLine(" Press [enter] to exit.");
51             Console.ReadLine();
52         }
53     }
54 }

像一般那样编译(编译建议见教程一)。

若是你只想保存严重等级为‘warning’、‘error’的日志到文件中,只须要打开控制台而后输入:

1 $ ReceiveLogsDirect.exe warning error > logs_from_rabbit.log

若是你想在屏幕上看到全部的日志,打开另外一个终端输入:

1 $ ReceiveLogsDirect.exe info warning error
2  [*] Waiting for logs. To exit press CTRL+C

若是想发送‘error’级别的日志,举例来讲,你只须要输入以下信息:

1 $ EmitLogDirect.exe error "Run. Run. Or it will explode."
2  [x] Sent 'error':'Run. Run. Or it will explode.' 

进入教程五,了解如何基于模式(pattern)监听消息。

 

原文连接:http://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html

相关文章
相关标签/搜索