RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然:html
单向解耦java
双向解耦(如:RPC)服务器
例如一个日志系统,很容易使用RabbitMQ简化工做量,一个Consumer能够进行消息的正常处理,另外一个Consumer负责对消息进行日志记录,只要在程序中指定两个Consumer所监听的queue以相同的方式绑定到同一个exchange便可,剩下的消息分发工做由RabbitMQ完成。 并发
使用RabbitMQ server须要:负载均衡
1. ErLang语言包;分布式
2. RabbitMQ安装包;性能
RabbitMQ同时提供了java的客户端(一个jar包)。测试
1. 接收消息,转发消息到绑定的队列。四种类型:direct, topic, headers and fanoutfetch
direct:转发消息到routigKey指定的队列优化
topic:按规则转发消息(最灵活)
headers:(这个尚未接触到)
fanout:转发消息到全部绑定队列
2. 若是没有队列绑定在交换机上,则发送到该交换机上的消息会丢失。
3. 一个交换机能够绑定多个队列,一个队列能够被多个交换机绑定。
4. topic类型交换器经过模式匹配分析消息的routing-key属性。它将routing-key和binding-key的字符串切分红单词。这些单词之间用点隔开。它一样也会识别两个通配符:#匹配0个或者多个单词,*匹配一个单词。例如,binding key:*.stock.#匹配routing key:usd.stcok和eur.stock.db,可是不匹配stock.nana。
还有一些其余的交换器类型,如header、failover、system等,如今在当前的RabbitMQ版本中均未实现。
5. 由于交换器是命名实体,声明一个已经存在的交换器,可是试图赋予不一样类型是会致使错误。客户端须要删除这个已经存在的交换器,而后从新声明而且赋予新的类型。
6. 交换器的属性:
- 持久性:若是启用,交换器将会在server重启前都有效。
- 自动删除:若是启用,那么交换器将会在其绑定的队列都被删除掉以后自动删除掉自身。
- 惰性:若是没有声明交换器,那么在执行到使用的时候会致使异常,并不会主动声明。
1. 队列是RabbitMQ内部对象,存储消息。相同属性的queue能够重复定义。
2. 临时队列。channel.queueDeclare(),有时不须要指定队列的名字,并但愿断开链接时删除队列。
3. 队列的属性:
- 持久性:若是启用,队列将会在server重启前都有效。
- 自动删除:若是启用,那么队列将会在全部的消费者中止使用以后自动删除掉自身。
- 惰性:若是没有声明队列,那么在执行到使用的时候会致使异常,并不会主动声明。
- 排他性:若是启用,队列只能被声明它的消费者使用。
这些性质能够用来建立例如排他和自删除的transient或者私有队列。这种队列将会在全部连接到它的客户端断开链接以后被自动删除掉。它们只是短暂地链接到server,可是能够用于实现例如RPC或者在AMQ上的对等通讯。4. RPC的使用是这样的:RPC客户端声明一个回复队列,惟一命名(例如用UUID),而且是自删除和排他的。而后它发送请求给一些交换器,在消息的reply-to字段中包含了以前声明的回复队列的名字。RPC服务器将会回答这些请求,使用消息的reply-to做为routing key(默认绑定器会绑定全部的队列到默认交换器,名称为“amp.交换器类型名”)发送到默认交换器。注意这仅仅是惯例而已,能够根据和RPC服务器的约定,它能够解释消息的任何属性(甚至数据体)来决定回复给谁。
1. 消息在队列中保存,以轮询的方式将消息发送给监听消息队列的消费者,能够动态的增长消费者以提升消息的处理能力。
2. 为了实现负载均衡,能够在消费者端通知RabbitMQ,一个消息处理完以后才会接受下一个消息。
channel.basic_qos(prefetch_count=1)
注意:要防止若是全部的消费者都在处理中,则队列中的消息会累积的状况。
3. 消息有14个属性,最经常使用的几种:
deliveryMode:持久化属性
contentType:编码
replyTo:指定一个回调队列
correlationId:消息id
实例代码:
4. 消息生产者能够选择是否在消息被发送到交换器而且还未投递到队列(没有绑定器存在)和/或没有消费者可以当即处理的时候获得通知。经过设置消息的mandatory和/或immediate属性为真,这些投递保障机制的能力获得了强化。
5. 此外,一个生产者能够设置消息的persistent属性为真。这样一来,server将会尝试将这些消息存储在一个稳定的位置,直到server崩溃。固然,这些消息确定不会被投递到非持久的队列中。
1. 消息ACK,通知RabbitMQ消息已被处理,能够从内存删除。若是消费者因宕机或连接失败等缘由没有发送ACK(不一样于ActiveMQ,在RabbitMQ里,消息没有过时的概念),则RabbitMQ会将消息从新发送给其余监听在队列的下一个消费者。
channel.basicConsume(queuename, noAck=false, consumer);
2. 消息和队列的持久化。定义队列时能够指定队列的持久化属性(问:持久化队列如何删除?)
channel.queueDeclare(queuename, durable=true, false, false, null);
发送消息时能够指定消息持久化属性:
channel.basicPublish(exchangeName, routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
这样,即便RabbitMQ服务器重启,也不会丢失队列和消息。
3. publisher confirms
4. master/slave机制,配合Mirrored Queue,这种状况下,publisher会正常发送消息和接收消息的confirm,但对于subscriber来讲,须要接收Consumer Cancellation Notifications来获得主节点失败的通知,而后re-consume from the queue,此时要求client有处理重复消息的能力。注意:若是queue在一个新加入的节点上增长了一个slave,此时slave上没有此前queue的信息(目前尚未同步机制)。
(经过命令行或管理插件能够查看哪一个slave是同步的:
rabbitmqctl list_queues name slave_pids synchronised_slave_pids)
当一个slave从新加入mirrored-queue时,若是queue是durable的,则会被清空。
1. 不支持跨网段(如需支持,须要shovel或federation插件)
2. 能够随意的动态增长或减小、启动或中止节点,容许节点故障
3. 集群分为RAM节点和DISK节点,一个集群最好至少有一个DISK节点保存集群的状态。
4. 集群的配置能够经过命令行,也能够经过配置文件,命令行优先。
在Openstack中,组件之间对RabbitMQ使用基本都是“Remote Procedure Calls”的方式。每个Nova服务(好比计算服务、存储服务等)初始化时会建立两个队列,一个名为“NODE-TYPE.NODE-ID”,另外一个名为“NODE-TYPE”,NODE-TYPE是指服务的类型,NODE-ID指节点名称。
从抽象层面上讲,RabbitMQ的组件的使用相似于下图所示:
每一个服务会绑定两个队列到同一个topic类型的exchange,从不一样的队列中接收不一样类型的消息。消息的发送者若是关心消息的返回值,则会监听另外一个队列,该队列绑定在一个direct类型的exchange。接受者收到消息并处理后,会将消息的返回发送到此exchange。
在Openstack中,若是不关心消息返回,消息的流程图以下:
若是关心消息返回值,流程图以下:
曾经有过一我的作过一个测试(http://www.cnblogs.com/amityat/archive/2011/08/31/2160293.html),发送1百万个并发消息,对性能有很高的需求,因而做者对比了RabbitMQ、MSMQ、ActiveMQ、ZeroMQueue,整个过程共产生1百万条1K的消息。测试的执行是在一个Windows Vista上进行的,测试结果以下:
虽然ZeroMQ性能较高,但这个产品不提供消息持久化,须要本身实现审计和数据恢复,所以在易用性和HA上不是使人满意,经过测试结果能够看到,RabbitMQ的性能确实不错。
我在本机也作了一些测试,但个人测试是基于组件的原生配置,没有作任何的配置优化,所以总觉的不靠谱。我只测试了RabbitMQ和ActiveMQ两款产品,虽然网上都说ActiveMQ性能不如前者,但平心而论,ActiveMQ提供了不少配置,存在很大的调优空间,也许修改一个配置参数就会使组件的性能有一个质的飞跃。