本系列文章均来自官网原文,属于我的翻译,若有雷同,权当我的归档,忽喷.html
.NET/C# RabbitMQ 客户端下载地址:https://github.com/rabbitmq/rabbitmq-dotnet-clientgit
关于RabbitMQ在windows 平台的安装和管理配置请参考:http://www.cnblogs.com/grayguo/p/5300776.htmlgithub
确保安装成功:windows
这部分会写两个程序,一个消息生产者发送一个消息;一个消费者接受消息而后输出到控制台,在这个过程当中我会忽略一些.Net的细节,把注意力放在这个简单的"hello word" 消息程序上。数组
在下图中"P" 是咱们的生产者,"C" 是咱们的消费者,二者之间的中间这是咱们的消息队列--一个隐藏在消费者后面的消息缓冲区.服务器
建立一个Send.cs 来写发送程序,发送方会链接RabbitMQ 服务器,发送消息,而后退出.异步
class Send { public static void Main() { var factory = new ConnectionFactory() { HostName = "192.168.15.128" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { ... } } } }
首先须要建立一个链接工厂去链接咱们的RabbitMQ服务器,这里咱们使用RabbitMQ-dotnet-client 提供的类库来进行回话的建立.socket
这里的这里的connection 链接已经为咱们把socket 链接, 版本协议和认证会话,都已经为咱们作了.这里我链接的服务器是"192.168.15.128"(因为我把RabbitMQ的测试环境搭栽了一台虚拟机上,若是是本机可写成"localhost"),直接指定服务器IP便可.async
而后咱们在这个链接上建立了一次回话(channel),咱们所作的大部分Api操做都要基于会话进行。ide
为了发送消息,咱们须要建立一个队列用来存储消息,而后能够把咱们的消息发送到该队列上,建立队列代码:
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false,arguments: null); //queue:队名名 //durable:是否持久化 //exclusive:是否排他 //autoDelete:自动删除
注:建立队列的API是具备幂等性的--即只有当所指定的队列不存在时才会去建立.
而后发送消息:
string message = "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "",routingKey: "hello", basicProperties: null, body: body);
发送的消息必须是字节数组,咱们能够本身指定所需的编码
完整代码以下:
public class Send { public static void Main() { var factory = new ConnectionFactory() { HostName = "192.168.15.128" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); string message = "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } }
运行代码,能够经过客户端管理工具看到结果.
能够看到名字为"hello"的队列被建立,而且有一个消息一经存储在队列当中.
注:正常来讲咱们的消息是需通过交换机(exchange)进行路由(route)才能到达队列的,这里建立完队列而后直接(没有手动绑定exchange和Queue)发送 routingKey为"hello"的消息到名为""的交换机上之因此成功,是由于当咱们建立一个队列的时候,RabbitMQ会自动把咱们把新建的队列和RoutingKey为该队列名绑定到一个默认名为""的的交换机上。
RabbitMQ会主动把消息推送给咱们的消息接收者,不像消息发送者发送单个消息,咱们会让消息接收者持续化的监听消息而且打印出来.
建立一个Receive.cs 来写接收消息的代码
class Receive { private static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "192.168.15.128" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); } } }
这里的初始代码和Send.cs 基本上是一行的,创先链接,建立会话,这里之因此一样进行名为"hello"的队列的建立,是为了防止客户端先启动,找不到求请求的目标队列.
链接服务器后,咱们告诉RabbitMQ主动把消息推送给咱们,因为RabbitMQ推送消息是异步(asynchronously)进行的,因此咱们使用EventingBasicConsumer.Received 来进行消息的接受.
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); }; channel.BasicConsume(queue: "hello",noAck: true,consumer: consumer);
//noAck(no manual acks):ack的概念:当Consumer接收到消息、处理任务完成以后,会发送带有这个消息标示符的ack,来告诉server这个消息接收到并处理完成.
//若是设置为true,这个Consumer在收到消息以后会立刻返回ack(由程序自动完成 noAck=true)
//设置为 false:须要手动发送,否者RabbitMQ会一直等处处理某个消息的Consumer的连接失去以后,才肯定这个消息没有正确处理,从而RabbitMQ重发这个消息
完整代码以下:
class Receive { private static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "192.168.15.128" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); }; channel.BasicConsume(queue: "hello", noAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
运行代码Send:
成功发送。
运行代码Reveive:
消息成功接收.