实际上,RabbitMQ的生产者并不会直接把消息发送给队列,甚至生产者都不知道消息是否会被发送给一个队列。对于生产者而言,它们只能把消息发送到Exchange,一个Exchange所完成的工做至关简单,一方面,它从生产者那里接收消息;另外一方面它将消息存入队列中。一个Exchange须要准确的知道它要如何处理它接收到的消息,例如,它须要把消息转发到特定的队列,仍是进行广播处理,或者直接将它丢弃。能够经过exchange type来定义Exchange处理消息的规则。
整个框架结构图如图所示。html
Exchange types有如下几种:direct、topic、headers和fanout。若是咱们没有定义Exchange,那么系统就会默认使用一个默认的Exchange,名为:"",就像咱们入门篇里的同样,它会本身建立一个""的默认Exchange,而后将消息转发给特定routingKey的队列。框架
使用direct exchange时,会将exchange与特定的队列进行绑定,转发时由routingkey进行队列的匹配,如图所示。spa
在direct类型的exchange中,只有这两个routingkey彻底相同,exchange才会选择对应的binding进行消息路由,代码示例以下所示:设计
channel.QueueBind(queue: "create_pdf_queue", exchange: "pdf_events", routingKey: "pdf_create", arguments: null);
绑定时须要设置:队列名、exchange名和它们的routingkey。3d
channel.BasicPublish(exchange: "pdf_events", routingKey: "pdf_create", basicProperties: properties, body: body);
生产者发布消息时,须要设置exchange名和routingKey,若是exchange名和routingKey都与上述绑定的彻底一致,那么该exchange就会将这条消息路由到队列。rest
此类exchange与direct相似,惟一不一样的地方是,direct类型要求routingKey彻底一致,而这里能够可使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配很少很多一个词。所以“JiangYuZhou.#”可以匹配到“JiangYuZhou.pets.cat”,可是“JiangYuZhou.*” 只会匹配到“JiangYuZhou.money”。
因此,Topic Exchange 使用很是灵活,topic exchange如图所示。code
例如,咱们首先声明一个topic exchange,它的名称为"agreements":orm
// Topic类型的exchange, 名称 agreements channel.ExchangeDeclare(exchange: "agreements", type: ExchangeType.Topic, durable: true, autoDelete: false, arguments: null);
而后,咱们声明三个队列,它们分别以下:htm
// 建立berlin_agreements队列 channel.QueueDeclare(queue: "berlin_agreements", durable: true, exclusive: false, autoDelete: false, arguments: null); //建立 all_agreements 队列 channel.QueueDeclare(queue: "all_agreements", durable: true, exclusive: false, autoDelete: false, arguments: null); //建立 headstore_agreements 队列 channel.QueueDeclare(queue: "headstore_agreements", durable: true, exclusive: false, autoDelete: false, arguments: null);
最后,咱们将agreements exchange分别与上面的三个队列以不一样通配符的routingKey进行绑定:blog
//绑定 agreements --> berlin_agreements 使用routingkey:agreements.eu.berlin.# channel.QueueBind(queue: "berlin_agreements", exchange: "agreements", routingKey: "agreements.eu.berlin.#", arguments: null); //绑定 agreements --> all_agreements 使用routingkey:agreements.# channel.QueueBind(queue: "all_agreements", exchange: "agreements", routingKey: "agreements.#", arguments: null); //绑定 agreements --> headstore_agreements 使用routingkey:agreements.eu.*.headstore channel.QueueBind(queue: "headstore_agreements", exchange: "agreements", routingKey: "agreements.eu.*.headstore", arguments: null);
这时咱们若是发送下列消息:
var message = "hello world"; var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.Persistent = true; channel.BasicPublish(exchange: "agreements", routingKey: "agreements.eu.berlin", basicProperties: properties, body: body);
该消息设置的exchange为"agreements",routingKey为"agreements.eu.berlin",因此它能够匹配上面的"agreements.eu.berlin.#"和"agreements.#",消息被转发到了"berlin_agreements"和"all_agreements"队列。
该exchange无需对routingKey进行匹配操做,而是很简单的直接将消息路由到全部绑定的队列中,如图所示。
此类型的路由规是根据header来判断的,首先须要以键值对的形式设置header的参数,在绑定exchange的时候将header以arguments的形式传递进去,传递参数时,键为"x-match"的header能够设置它的值为all或any,其中,all表示只有当发布的消息匹配该header中除"x-match"之外的全部值时,消息才会被转发到该队列;any表示当发布的消息匹配该header种除"x-match"外的任意值时,该消息会被转发到匹配队列。
代码操练
最后咱们以header exchange为例,演示咱们的Exchange。首先咱们建立四个项目,其中一个做为生产者,另做三个均做为消费者,而且使用:
dotnet add package RabbitMQ.Client
给四个项目均安装上RabbitMQ的.NET包,并进行restore,项目结构如图所示:
开始编写Send端的代码,其中,RabbitMQ仍是使用咱们在上一章种使用的Docker中RabbitMQ,程序以下:
using System; using System.Collections.Generic; using System.Text; using RabbitMQ.Client; namespace Send { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "148.70.210.208" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //声明Headers类型的exchange,名称为agreements channel.ExchangeDeclare(exchange: "agreements", type: ExchangeType.Headers, autoDelete: false, arguments: null); //建立队列queue.A channel.QueueDeclare(queue: "queue.A", durable: true, exclusive: false, autoDelete: false, arguments: null); //建立队列queue.B channel.QueueDeclare(queue: "queue.B", durable: true, exclusive: false, autoDelete: false, arguments: null); //建立队列queue.C channel.QueueDeclare(queue: "queue.C", durable: true, exclusive: false, autoDelete: false, arguments: null); //绑定agreements=>queue.A,使用arguments(format=pdf、type=report、x-match=all) //只有当header中同时知足format=pdf、type=report时,消息才会被转发到队列A Dictionary<string, object> aHeader = new Dictionary<string, object>(); aHeader.Add("format", "pdf"); aHeader.Add("type", "report"); aHeader.Add("x-match", "all"); channel.QueueBind(queue: "queue.A", exchange: "agreements", routingKey: string.Empty, arguments: aHeader); //绑定agreements=>queue.B,使用arguments(format=pdf、type=log、x-match=any) //当header中知足format=pdf或type=log任意一个时,消息就会被转发到队列B Dictionary<string, object> bHeader = new Dictionary<string, object>(); bHeader.Add("format", "pdf"); bHeader.Add("type", "log"); bHeader.Add("x-match", "any"); channel.QueueBind(queue: "queue.B", exchange: "agreements", routingKey: string.Empty, arguments: bHeader); //绑定agreements=>queue.C,使用arguments(format=zip、type=report、x-match=all) //当header中同时知足format=zip和type=report时,消息会被转发到队列C Dictionary<string, object> cHeader = new Dictionary<string, object>(); cHeader.Add("format", "zip"); cHeader.Add("type", "report"); cHeader.Add("x-match", "all"); channel.QueueBind(queue: "queue.C", exchange: "agreements", routingKey: string.Empty, arguments: cHeader); string message1 = "hello world From 1"; var body = Encoding.UTF8.GetBytes(message1); var properties1 = channel.CreateBasicProperties(); properties1.Persistent = true; Dictionary<string, object> mHeader1 = new Dictionary<string, object>(); mHeader1.Add("format", "pdf"); mHeader1.Add("type", "report"); properties1.Headers = mHeader1; //这条消息会被转发到queue.A和queue.B //queue.A 的binding (format=pdf, type=report, x-match=all) //queue.B 的binding (format=pdf, type=log, x-match=any) channel.BasicPublish(exchange: "agreements", routingKey: string.Empty, basicProperties: properties1, body: body); string message2 = "hello world From 2"; body = Encoding.UTF8.GetBytes(message2); var properties2 = channel.CreateBasicProperties(); properties2.Persistent = true; Dictionary<string, object> mHeader2 = new Dictionary<string, object>(); mHeader2.Add("type", "log"); properties2.Headers = mHeader2; //这条消息会被转发到queue.B //queue.B 的binding (format = pdf, type = log, x-match = any) channel.BasicPublish(exchange: "agreements", routingKey: string.Empty, basicProperties: properties2, body: body); string message3 = "hello world From 3"; body = Encoding.UTF8.GetBytes(message3); var properties3 = channel.CreateBasicProperties(); properties3.Persistent = true; Dictionary<string, object> mHeader3 = new Dictionary<string, object>(); mHeader3.Add("format", "zip"); properties3.Headers = mHeader3; //这条消息不会被路由 //队列C要求同时知足两个条件,这里只知足了一个,没有匹配的队列 channel.BasicPublish(exchange: "agreements", routingKey: string.Empty, basicProperties: properties3, body: body); } } } } }
运行程序后,能够看到,queue.A中匹配了三条消息、queue.B中匹配了两条、queue.C中没有匹配到任何消息。
能够看到,队列A中匹配了一条信息,即Message 1,队列B中匹配了两条信息,即Message 1和Message2,队列C中没有匹配信息,符合咱们程序的编写,下面用接收端进行接收。
接收端分别写了三个程序,分别接收队列A、B、C的消息,它们除了绑定队列名称不一样外,其他所有相同,下面是绑定队列A的接收程序:
using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace Recieve1 { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "148.70.210.208" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //注意要与发送端的声明一致 channel.ExchangeDeclare(exchange: "agreements", type: ExchangeType.Headers, autoDelete: false, arguments: null); //绑定了queue.C和agreements Exchange channel.QueueBind(queue: "queue.A", exchange: "agreements", routingKey: string.Empty); Console.WriteLine("Waiting for messages"); var consumer = new EventingBasicConsumer(channel); //绑定接收完成事件 consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine($"Recieve Message:{message}"); }; channel.BasicConsume(queue: "queue.A", autoAck: true, consumer: consumer); Console.WriteLine("Press [enter] to exit"); Console.ReadLine(); } } } } }
最后,咱们分别运行这三个接收程序:
符合程序设计。
参考:JulyLuo——http://www.javashuo.com/article/p-cxwtfcin-eu.html