众所周知RabbitMQ使用的是AMQP协议。咱们知道AMQP是一种网络协议,可以支持符合要求的客户端应用和消息中间件代理之间进行通讯。 数组
其中消息代理扮演的角色就是从生产者那儿接受消息,并根据既定的路由规则把接受到的消息发送给消息的处理者又称消费者。由此能够看出RabbitMQ在整个消息发送,处理的过程当中有三个比较重要的角色: 服务器
生产者:producer,消息生产者,就是投递消息的程序 网络
消息代理:broker,简单来讲就是消息队列服务器实体,这里简单理解为咱们安装的RabbitMQ服务 spa
消费者:consumer,消息消费者,就是接受消息的程序 线程
接下来咱们将以一个简单的控制台程序来实现消息队列的发送及接收(使用.NET版RabbitMQ客户端): 代理
主要功能为: 一个producer发送消息,一个consumer接收消息,并在控制台打印出来。 中间件
使用Nuget添加RabbitMQ.Client程序包至项目中
Install-Package RabbitMQ.Client 队列
建立消息的生产者 Producer.cs ,发送一条消息给消费者
-
using RabbitMQ.Client;
-
using System;
-
using System.Text;
-
-
namespace RabbitMQProducer
-
{
-
public class Producer
-
{
-
public static void Send()
-
{
-
//建立链接链接到RabbitMQ服务器,就是一个位于客户端和Broker之间的TCP链接,建议共用此TCP链接,每次使用时建立一个新的channel便可,
-
var factory = new ConnectionFactory();
-
IConnection connection = null;
-
//方式1:使用AMQP协议URL amqp://username:password@hostname:port/virtual host 可经过http://127.0.0.1:15672/ RabbitMQWeb管理页面查看每一个参数的具体内容
-
factory.Uri = "amqp://guest:guest@127.0.0.1:5672//";
-
connection = factory.CreateConnection();
-
-
////方式2:使用ConnectionFactory属性赋值
-
//factory.UserName = ConnectionFactory.DefaultUser;
-
//factory.Password = ConnectionFactory.DefaultPass;
-
//factory.VirtualHost = ConnectionFactory.DefaultVHost;
-
//factory.HostName = "127.0.0.1"; //设置RabbitMQ服务器所在的IP或主机名
-
//factory.Port = AmqpTcpEndpoint.UseDefaultPort;
-
//connection = factory.CreateConnection();
-
-
////方式3:使用CreateConnection方法建立链接,默认使用第一个地址链接服务端,若是第一个不可用会依次使用后面的链接
-
//List<AmqpTcpEndpoint> endpoints = new List<AmqpTcpEndpoint>() {
-
// new AmqpTcpEndpoint() { HostName="localhost1",Port=5672},
-
// new AmqpTcpEndpoint() { HostName="localhost2",Port=5672},
-
// new AmqpTcpEndpoint() { HostName="localhost3",Port=5672},
-
// new AmqpTcpEndpoint() { HostName="localhost4",Port=5672}
-
//};
-
//connection = factory.CreateConnection(endpoints);
-
-
using (connection)
-
{
-
//建立一个消息通道,在客户端的每一个链接里,可创建多个channel,每一个channel表明一个会话任务。相似与Hibernate中的Session
-
//AMQP协议规定只有经过channel才能指定AMQP命令,因此仅仅在建立了connection后客户端仍是不能发送消息的,必需要建立一个channel才行
-
//RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,可是建议尽可能共用Connection
-
using (IModel channel = connection.CreateModel())
-
{
-
//建立一个queue(消息队列)
-
channel.QueueDeclare(
-
queue: "hello",
-
durable: false,
-
exclusive: false,
-
autoDelete: false,
-
arguments: null);
-
-
string message = "你好消费者,我是生产者发送的消息";
-
-
//往队列中发出一条消息 使用了默认交换机而且绑定路由键(route key)与队列名称相同
-
channel.BasicPublish(
-
exchange: "",
-
routingKey: "hello",
-
basicProperties: null,
-
body: Encoding.UTF8.GetBytes(message));
-
-
Console.WriteLine($"我是生产者,我发送了一条消息{message}");
-
-
Console.WriteLine(" Press [enter] to exit.");
-
Console.ReadLine();
-
}
-
}
-
}
-
}
-
}
注意:1.队列只会在它不存在的时候建立,屡次声明并不会重复建立。 事件
2.信息的内容是字节数组,也就意味着能够传递任何数据。 路由
3.建立消息的消费者Consumer.cs ,从队列中获取消息并打印到屏幕
-
using RabbitMQ.Client;
-
using RabbitMQ.Client.Events;
-
using System;
-
using System.Text;
-
-
namespace RabbitMQConsumer
-
{
-
public class Consumer
-
{
-
public static void Receive()
-
{
-
var factory = new ConnectionFactory();
-
factory.Uri = "amqp://guest:guest@127.0.0.1:5672//";
-
using (var connection = factory.CreateConnection())
-
{
-
using (var channel = connection.CreateModel())
-
{
-
//声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时建立队列
-
channel.QueueDeclare(
-
queue: "hello",
-
durable: false,
-
exclusive: false,
-
autoDelete: false,
-
arguments: null);
-
-
//建立事件驱动的消费者类型,尽可能不要使用while(ture)循环来获取消息
-
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
-
consumer.Received += (model, ea) =>
-
{
-
var body = ea.Body;
-
var message = Encoding.UTF8.GetString(body);
-
Console.WriteLine(" 我是消费者我接收到消息: {0}", message);
-
};
-
-
//指定消费队列
-
channel.BasicConsume(queue: "hello",
-
noAck: true,
-
consumer: consumer);
-
-
Console.WriteLine(" Press [enter] to exit.");
-
Console.ReadLine();
-
}
-
}
-
}
-
}
-
}
消息队列的使用过程大体以下:
-
CreateConnection 建立一个链接链接到broker
-
CreateModel 建立一个channel 使用它来发送AMQP指令
-
ExchangeDeclare 建立一个exchange 对消息进行路由
-
QueueDeclare 建立一个queue 消息队列 这是一个装载消息的容器
-
QueueBind 把exchange和queue按照路由规则绑定起来
-
BasicPublish 往队列中发送一条消息
-
BasicConsume 从队列中获取一条消息
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。
本文中因为使用了默认交换机因此并无用到 ExchangeDeclare和 QueueBind两个方法
默认交换机其实是一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机。它有一个特殊的属性使得它对于简单应用特别有用处:那就是每一个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同