rabbitMQ是一个在AMQP协议标准基础上完整的,可服用的企业消息系统。他遵循Mozilla Public License开源协议。采用 Erlang 实现的工业级的消息队列(MQ)服务器。html
RabbitMQ的官方站:http://www.rabbitmq.com/
AMQP(高级消息队列协议) 是一个异步消息传递所使用的应用层协议规范,做为线路层协议,而不是API(例如JMS),AMQP 客户端可以无视消息的来源任意发送和接受信息。AMQP的原始用途只是为金融界提供一个能够彼此协做的消息协议,而如今的目标则是为通用消息队列架构提供通用构建工具。所以,面向消息的中间件 (MOM)系统,例如发布/订阅队列,没有做为基本元素实现。反而经过发送简化的AMQ实体,用户被赋予了构建例如这些实体的能力。这些实体也是规范的一 部分,造成了在线路层协议顶端的一个层级:AMQP模型。这个模型统一了消息模式,诸如以前提到的发布/订阅,队列,事务以及流数据,而且添加了额外的特性,例如更易于扩展,基于内容的路由。git
AMQP当中有四个概念很是重要github
virtual host
,虚拟主机exchange
,交换机queue
,队列binding
,绑定一个虚拟主机持有一组交换机、队列和绑定。数据库
为何须要多个虚拟主机呢?由于RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。所以,若是须要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别建立一个虚拟主机。每个RabbitMQ服务器都有一个默认的虚拟主机/
。编程
何谓虚拟主机(virtual host),交换机(exchange),队列(queue)和绑定(binding)windows
队列(Queues)是你的消息(messages)的终点,能够理解成装消息的容器。消息就一直在里面,直到有客户端(也就是消费者,Consumer)链接到这个队列而且将其取走为止。不过,也能够将一个队列配置成这样的:一旦消息进入这个队列,此消息就被删除。数组
队列是由消费者(Consumer)经过程序创建的,不是经过配置文件或者命令行工具。这没什么问题,若是一个消费者试图建立一个已经存在的队列,RabbitMQ会直接忽略这个请求。所以咱们能够将消息队列的配置写在应用程序的代码里面。浏览器
而要把一个消息放进队列前,须要有一个交换机(Exchange)。服务器
交换机(Exchange)能够理解成具备路由表的路由程序。每一个消息都有一个称为路由键(routing key)的属性,就是一个简单的字符串。交换机当中有一系列的绑定(binding),即路由规则(routes)。(例如,指明具备路由键 “X” 的消息要到名为timbuku的队列当中去。)架构
消费者程序(Consumer)要负责建立你的交换机。交换机能够存在多个,每一个交换机在本身独立的进程当中执行,所以增长多个交换机就是增长多个进程,能够充分利用服务器上的CPU核以便达到更高的效率。例如,在一个8核的服务器上,能够建立5个交换机来用5个核,另外3个核留下来作消息处理。相似的,在RabbitMQ的集群当中,你能够用相似的思路来扩展交换机一边获取更高的吞吐量。
交换机如何判断要把消息送到哪一个队列?你须要路由规则,即绑定(binding)。一个绑定就是一个相似这样的规则:将交换机“desert(沙漠)”当中具备路由键“阿里巴巴”的消息送到队列“hideout(山洞)”里面去。换句话说,一个绑定就是一个基于路由键将交换机和队列链接起来的路由规则。例如,具备路由键“audit”的消息须要被送到两个队列,“log-forever”和“alert-the-big-dude”。要作到这个,就须要建立两个绑定,每一个都链接一个交换机和一个队列,二者都是由“audit”路由键触发。在这种状况下,交换机会复制一份消息而且把它们分别发送到两个队列当中。交换机不过就是一个由绑定构成的路由表。
交换机有多种类型。他们都是作路由的,可是它们接受不一样类型的绑定。为何不建立一种交换机来处理全部类型的路由规则呢?由于每种规则用来作匹配分子的CPU开销是不一样的。例如,一个“topic”类型的交换机试图将消息的路由键与相似“dogs.*”的模式进行匹配。匹配这种末端的通配符比直接将路由键与“dogs”比较(“direct”类型的交换机)要消耗更多的CPU。若是你不须要“topic”类型的交换机带来的灵活性,你能够经过使用“direct”类型的交换机获取更高的处理效率。那么有哪些类型,他们又是怎么处理的呢?
Exchange
你花了大量的时间来建立队列、交换机和绑定,而后,服务器程序挂了。你的队列、交换机和绑定怎么样了?还有,放在队列里面可是还没有处理的消息们呢?
若是你是用默认参数构造的这一切的话,那么,他们都灰飞烟灭了。RabbitMQ重启以后会干净的像个新生儿。你必须重作全部的一切,亡羊补牢,如何避免未来再度发生此类杯具?
队列和交换机有一个建立时候指定的标志durable。durable的惟一含义就是具备这个标志的队列和交换机会在重启以后从新创建,它不表示说在队列当中的消息会在重启后恢复。那么如何才能作到不仅是队列和交换机,还有消息都是持久的呢?
可是首先须要考虑的问题是:是否真的须要消息的持久化?若是须要重启后消息能够回复,那么它须要被写入磁盘。但即便是最简单的磁盘操做也是要消耗时间的。因此须要衡量判断。
当你将消息发布到交换机的时候,能够指定一个标志“Delivery Mode”(投递模式)。根据你使用的AMQP的库不一样,指定这个标志的方法可能不太同样。简单的说,就是将Delivery Mode设置成2,也就是持久的(persistent)便可。通常的AMQP库都是将Delivery Mode设置成1,也就是非持久的。因此要持久化消息的步骤以下:
绑定(Bindings)怎么办?绑定没法在建立的时候设置成durable。没问题,若是你绑定了一个durable的队列和一个durable的交换机,RabbitMQ会自动保留这个绑定。相似的,若是删除了某个队列或交换机(不管是否是durable),依赖它的绑定都会自动删除。
注意:
在Windows上安装Rabbit MQ 指南,最好的是这篇《Rabbit MQ Windows Installation guide》,其中还包括了使用.NET RabbitMQ.Client Nuget 包访问Rabbit MQ的示例代码。
安装Rabbit MQ
Rabbit MQ 是创建在强大的Erlang OTP平台上,所以安装Rabbit MQ的前提是安装Erlang。经过下面两个链接下载安装3.2.3 版本:
默认安装的Rabbit MQ 监听端口是5672
激活Rabbit MQ's Management Plugin
使用Rabbit MQ 管理插件,能够更好的可视化方式查看Rabbit MQ 服务器实例的状态,你能够在命令行中使用下面的命令激活:
"C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin\rabbitmq-plugins.bat" enable rabbitmq_management
要重启服务才能生效,能够执行
net stop RabbitMQ && net start RabbitMQ
下面咱们使用rabbitmqctl控制台命令(位于C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin>)来建立用户,密码,绑定权限等。
Microsoft Windows [版本 6.3.9600]
(c) 2013 Microsoft Corporation。保留全部权利。
c:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin 的目录
2014/11/01 15:04 <DIR> .
2014/11/01 15:04 <DIR> ..
2014/01/23 22:57 817 rabbitmq-echopid.bat
2014/01/23 22:57 1,900 rabbitmq-plugins.bat
2014/01/23 22:57 4,356 rabbitmq-server.bat
2014/01/23 22:57 7,123 rabbitmq-service.bat
2014/01/23 22:57 1,621 rabbitmqctl.bat
5 个文件 15,817 字节
2 个目录 96,078,618,624 可用字节
c:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin>rabbitmqctl.ba
t list_users
Listing users ...
guest [administrator]
...done.
c:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin>rabbitmqctl.ba
t list_vhosts
Listing vhosts ...
/
...done.
c:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin>rabbitmqctl.ba
t add_user geffzhang zsy@2014
Creating user "geffzhang" ...
...done.
c:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin>rabbitmqctl.ba
t list_users
Listing users ...
geffzhang []
guest [administrator]
...done.
c:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin>rabbitmqctl.ba
t set_user_tags geffzhang administrator
Setting tags for user "geffzhang" to [administrator] ...
...done.
c:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin>rabbitmqctl.ba
t set_permissions -p / geffzhang ".*" ".*" ".*"
Setting permissions for user "geffzhang" in vhost "/" ...
...done.
c:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin>rabbitmqctl.ba
t list_users
Listing users ...
geffzhang [administrator]
guest [administrator]
...done.
使用浏览器打开http://localhost:15672
访问Rabbit Mq的管理控制台,使用刚才建立的帐号登录系统:
在.NET上使用Rabbit MQ
经过Nuget 获取Rabbit MQ NET client bindings from NuGet:
PM> Install-Package RabbitMQ.Client
咱们最多见的一个场景是发送和接收Rabbit MQ 持久化消息:
第一步是声明durable Exchange 和 Queue
private readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory { HostName = "Geffzhang-NB", UserName="geffzhang", Password ="zsy@2014", VirtualHost ="/" };
const string ExchangeName = "test.exchange";
const string QueueName = "test.queue";
using (IConnection conn = rabbitMqFactory.CreateConnection())
using (IModel channel = conn.CreateModel())
{
channel.ExchangeDeclare(ExchangeName, "direct", durable:true, autoDelete:false, arguments:null);
channel.QueueDeclare(QueueName, durable:true, exclusive:false, autoDelete:false,arguments:null);
channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
}
下面对上面代码进行说明:
1. 使用ConnectionFactory建立链接,虽然建立时指定了多个server address,但每一个connection只与一个物理的server进行链接。
2. 定义交换方式 ,建立了Direct Exchange和Durable Queue,并使用QueueName做为routing key ,能够把消息直接投递到某个队列。rabbitmq交换方式分为三种,分别是:
Direct Exchange – 处理路由键。须要将一个队列绑定到交换机上,要求该消息与一个特定的路由键彻底匹配。这是一个完整的匹配。若是一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。
Fanout Exchange – 不处理路由键。你只须要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的全部队列上。很像子网广播,每台子网内的主机都得到了一份复制的消息。Fanout交换机转发消息是最快的。
Topic Exchange – 将路由键和某模式进行匹配。此时队列须要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配很少很多一个词。所以“audit.#”可以匹配到“audit.irs.corporate”,可是“audit.*” 只会匹配到“audit.irs”。
运行上述代码,能够在Rabbit MQ的管理控制台上看到test.exchange Exchange 绑定到 建立的队列 test.queue
第二步就是发布持久化消息到队列
Exchange和Queue创建好之后,就能够发送消息到队列了。RabbitMq 能够接受byte[]的数据,字符串采用utf-8编码的字节数组。确保消息可持久化的,须要设置PersistMode为true,参看下面的代码:
var props = channel.CreateBasicProperties();
props.SetPersistent(true);
var msgBody = Encoding.UTF8.GetBytes("Hello, World!");
channel.BasicPublish(ExchangeName, routingKey:QueueName, basicProperties:props, body:msgBody);
第三步就是消费消息了,有几种不一样的方法从队列中消费消息,最多见的是使用BasicGet
:
BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck: true);
var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
NoAck:true 告诉RabbitMQ当即从队列中删除消息,另外一个很是受欢迎的方式是从队列中删除已经确认接收的消息,能够经过单独调用BasicAck 进行确认:
BasicGetResult msgResponse = channel.BasicGet(QueueName, noAck:false);
//process message ...
channel.BasicAck(msgResponse.DeliveryTag, multiple:false);
使用BasicAck方式来告之是否从队列中移除该条消息,这一点很重要,由于在某些应用场景下,好比从队列中获取消息并用它来操做数据库或日志文件时,若是出现操做失败时,则该条消息应该保留在队列中,只到操做成功时才从队列中移除。
另一种方法是经过基于推送的事件订阅。您能够使用内置的 QueueingBasicConsumer 提供简化的编程模型,经过容许您在共享队列上阻塞,直到收到一条消息,例如
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(QueueName, noAck: true, consumer: consumer);
var msgResponse = consumer.Queue.Dequeue(); //blocking
var msgBody = Encoding.UTF8.GetString(msgResponse.Body);