从AMQP协议能够看出,MessageQueue、Exchange和Binding构成了AMQP协议的核心,下面咱们就围绕这三个主要组件 从应用使用的角度全面的介绍如何利用Rabbit MQ构建消息队列以及使用过程当中的注意事项。正则表达式
1. 声明MessageQueue安全
在Rabbit MQ中,不管是生产者发送消息仍是消费者接受消息,都首先须要声明一个MessageQueue。这就存在一个问题,是生产者声明仍是消费者声明呢?要解决这个问题,首先须要明确:服务器
a)消费者是没法订阅或者获取不存在的MessageQueue中信息。异步
b)消息被Exchange接受之后,若是没有匹配的Queue,则会被丢弃。函数
在明白了上述两点之后,就容易理解若是是消费者去声明Queue,就有可能会出如今声明Queue以前,生产者已发送的消息被丢弃的隐患。若是应用可以经过消息重发的机制容许消息丢失,则使用此方案没有任何问题。可是若是不能接受该方案,这就须要不管是生产者仍是消费者,在发送或者接受消息前,都须要去尝试创建消息队列。这里有一点须要明确,若是客户端尝试创建一个已经存在的消息队列,Rabbit MQ不会作任何事情,并返回客户端创建成功的。性能
若是一个消费者在一个信道中正在监听某一个队列的消息,Rabbit MQ是不容许该消费者在同一个channel去声明其余队列的。Rabbit MQ中,能够经过queue.declare命令声明一个队列,能够设置该队列如下属性:spa
a) Exclusive:排他队列,若是一个队列被声明为排他队列,该队列仅对首次声明它的链接可见,并在链接断开时自动删除。这里须要注意三点:其一,排他队列是基于链接可见的,同一链接的不一样信道是能够同时访问同一个链接建立的排他队列的。其二,“首次”,若是一个链接已经声明了一个排他队列,其余链接是不容许创建同名的排他队列的,这个与普通队列不一样。其三,即便该队列是持久化的,一旦链接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。.net
b) Auto-delete:自动删除,若是该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。队列
c) Durable:持久化,这个会在后面做为专门一个章节讨论。事务
d) 其余选项,例如若是用户仅仅想查询某一个队列是否已存在,若是不存在,不想创建该队列,仍然能够调用queue.declare,只不过须要将参数passive设为true,传给queue.declare,若是该队列已存在,则会返回true;若是不存在,则会返回Error,可是不会建立新的队列。
2. 生产者发送消息
在AMQP模型中,Exchange是接受生产者消息并将消息路由到消息队列的关键组件。ExchangeType和Binding决定了消息的路由规则。因此生产者想要发送消息,首先必需要声明一个Exchange和该Exchange对应的Binding。能够经过 ExchangeDeclare和BindingDeclare完成。在Rabbit MQ中,声明一个Exchange须要三个参数:ExchangeName,ExchangeType和Durable。ExchangeName是该Exchange的名字,该属性在建立Binding和生产者经过publish推送消息时须要指定。ExchangeType,指Exchange的类型,在RabbitMQ中,有三种类型的Exchange:direct ,fanout和topic,不一样的Exchange会表现出不一样路由行为。Durable是该Exchange的持久化属性,这个会在消息持久化章节讨论。声明一个Binding须要提供一个QueueName,ExchangeName和BindingKey。下面咱们就分析一下不一样的ExchangeType表现出的不一样路由规则。
生产者在发送消息时,都须要指定一个RoutingKey和Exchange,Exchange在接到该RoutingKey之后,会判断该ExchangeType:
a) 若是是Direct类型,则会将消息中的RoutingKey与该Exchange关联的全部Binding中的BindingKey进行比较,若是相等,则发送到该Binding对应的Queue中。
b) 若是是 Fanout 类型,则会将消息发送给全部与该 Exchange 定义过 Binding 的全部 Queues 中去,实际上是一种广播行为。
c)若是是Topic类型,则会按照正则表达式,对RoutingKey与BindingKey进行匹配,若是匹配成功,则发送到对应的Queue中。
3. 消费者订阅消息
在RabbitMQ中消费者有2种方式获取队列中的消息:
a) 一种是经过basic.consume命令,订阅某一个队列中的消息,channel会自动在处理完上一条消息以后,接收下一条消息。(同一个channel消息处理是串行的)。除非关闭channel或者取消订阅,不然客户端将会一直接收队列的消息。
b) 另一种方式是经过basic.get命令主动获取队列中的消息,可是绝对不能够经过循环调用basic.get来代替basic.consume,这是由于basic.get RabbitMQ在实际执行的时候,是首先consume某一个队列,而后检索第一条消息,而后再取消订阅。若是是高吞吐率的消费者,最好仍是建议使用basic.consume。
若是有多个消费者同时订阅同一个队列的话,RabbitMQ是采用循环的方式分发消息的,每一条消息只能被一个订阅者接收。例如,有队列Queue,其中ClientA和ClientB都Consume了该队列,MessageA到达队列后,被分派到ClientA,ClientA回复服务器收到响应,服务器删除MessageA;再有一条消息MessageB抵达队列,服务器根据“循环推送”原则,将消息会发给ClientB,而后收到ClientB的确认后,删除MessageB;等到再下一条消息时,服务器会再将消息发送给ClientA。
这里咱们能够看出,消费者再接到消息之后,都须要给服务器发送一条确认命令,这个便可以在handleDelivery里显示的调用basic.ack实现,也能够在Consume某个队列的时候,设置autoACK属性为true实现。这个ACK仅仅是通知服务器能够安全的删除该消息,而不是通知生产者,与RPC不一样。 若是消费者在接到消息之后还没来得及返回ACK就断开了链接,消息服务器会重传该消息给下一个订阅者,若是没有订阅者就会存储该消息。
既然RabbitMQ提供了ACK某一个消息的命令,固然也提供了Reject某一个消息的命令。当客户端发生错误,调用basic.reject命令拒绝某一个消息时,能够设置一个requeue的属性,若是为true,则消息服务器会重传该消息给下一个订阅者;若是为false,则会直接删除该消息。固然,也能够经过ack,让消息服务器直接删除该消息而且不会重传。
4. 持久化:
Rabbit MQ默认是不持久队列、Exchange、Binding以及队列中的消息的,这意味着一旦消息服务器重启,全部已声明的队列,Exchange,Binding以及队列中的消息都会丢失。经过设置Exchange和MessageQueue的durable属性为true,可使得队列和Exchange持久化,可是这还不能使得队列中的消息持久化,这须要生产者在发送消息的时候,将delivery mode设置为2,只有这3个所有设置完成后,才能保证服务器重启不会对现有的队列形成影响。这里须要注意的是,只有durable为true的Exchange和durable为ture的Queues才能绑定,不然在绑定时,RabbitMQ都会抛错的。持久化会对RabbitMQ的性能形成比较大的影响,可能会降低10倍不止。
5. 事务:
对事务的支持是AMQP协议的一个重要特性。假设当生产者将一个持久化消息发送给服务器时,由于consume命令自己没有任何Response返回,因此即便服务器崩溃,没有持久化该消息,生产者也没法获知该消息已经丢失。若是此时使用事务,即经过txSelect()开启一个事务,而后发送消息给服务器,而后经过txCommit()提交该事务,便可以保证,若是txCommit()提交了,则该消息必定会持久化,若是txCommit()还未提交即服务器崩溃,则该消息不会服务器就收。固然Rabbit MQ也提供了txRollback()命令用于回滚某一个事务。
6. Confirm机制:
使用事务当然能够保证只有提交的事务,才会被服务器执行。可是这样同时也将客户端与消息服务器同步起来,这背离了消息队列解耦的本质。Rabbit MQ提供了一个更加轻量级的机制来保证生产者能够感知服务器消息是否已被路由到正确的队列中——Confirm。若是设置channel为confirm状态,则经过该channel发送的消息都会被分配一个惟一的ID,而后一旦该消息被正确的路由到匹配的队列中后,服务器会返回给生产者一个Confirm,该Confirm包含该消息的ID,这样生产者就会知道该消息已被正确分发。对于持久化消息,只有该消息被持久化后,才会返回Confirm。Confirm机制的最大优势在于异步,生产者在发送消息之后,便可继续执行其余任务。而服务器返回Confirm后,会触发生产者的回调函数,生产者在回调函数中处理Confirm信息。若是消息服务器发生异常,致使该消息丢失,会返回给生产者一个nack,表示消息已经丢失,这样生产者就能够经过重发消息,保证消息不丢失。Confirm机制在性能上要比事务优越不少。可是Confirm机制,没法进行回滚,就是一旦服务器崩溃,生产者没法获得Confirm信息,生产者其实自己也不知道该消息吃否已经被持久化,只有继续重发来保证消息不丢失,可是若是原先已经持久化的消息,并不会被回滚,这样队列中就会存在两条相同的消息,系统须要支持去重。
其余:
Broker:简单来讲就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪一个队列。
Queue:消息队列载体,每一个消息都会被投入到一个或多个队列。
Binding:绑定,它的做用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里能够开设多个vhost,用做不一样用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每一个链接里,可创建多个channel,每一个channel表明一个会话任务。
消息队列的使用过程大概以下:
(1)客户端链接到消息队列服务器,打开一个channel。 (2)客户端声明一个exchange,并设置相关属性。 (3)客户端声明一个queue,并设置相关属性。 (4)客户端使用routing key,在exchange和queue之间创建好绑定关系。 (5)客户端投递消息到exchange。