生产者,发送消息的一方,图中左侧的client。java
消费者,接收消息的一方,图中后侧的client。spring
消息中间件的服务节点,通常一个RabbitMQ Broker当作一台RabbitMQ服务器。服务器
消息包含两部分:消息体和标签。消息体(payload)是一个带有业务逻辑结构的数据,好比一个 JSON 字符串。消息的标签用来表述这条消息 , 好比一个交换器的名称和一个路由键。 生产者把消息交由 RabbitMQ, RabbitMQ 以后会根据标签把消息发送给感兴趣的消费者。app
链接实际上是一条TCP链接,若是是生产者仍是消费者都须要和Broker创建链接。ide
信道是创建在 Connection 之上的虚拟链接, RabbitMQ 处理的每条 AMQP 指令都是经过信道完成的。函数
为何还要引入信道呢?试想这 样一个场景, 一个应用程序中有不少个线程须要从 RabbitMQ 中消费消息,或者生产消息,那 么必然须要创建不少个 Connection,也就是许多个 TCP 链接。然而对于操做系统而言,创建和 销毁 TCP 链接是很是昂贵的开销,若是遇到使用高峰,性能瓶颈也随之显现。 RabbitMQ 采用 相似 NIO' (Non-blocking 1/0) 的作法,选择 TCP 链接复用,不只能够减小性能开销,同时也便于管理。性能
队列是 RabbitMQ 的内部对象,用于存储消息,当多个消费者能够订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询) 给多个消费者进行处理,而不是每一个消费者都收到全部的消息井处理。fetch
生产者将消息发送到 Exchange (交换器,一般也 能够用大写的 "X" 来表示),由交换器将消息路由到一个或者多个队列中。若是路由不到,或 许会返回给生产者,或许直接丢弃。操作系统
生产者将消息发给交换器的时候, 通常会指定一个 RoutingKey,用 来指定这个消息的路由规则,而这个 RoutingKey 须要与交换器类型和绑定键 (BindingKey) 联合使用才能最终生效。线程
RabbitMQ 中经过绑定将交换器与队列关联起来,在绑定的时候通常会指定一个绑定键 (BindingKey), 这样 RabbitMQ 就知道如何正确地将消息路由到队列了。
生产者将消息发送给交换器时, 须要一个 RoutingKey, 当 BindingKey 和 RoutingKey 相匹配时, 消息会被路由到对应的队列中。在绑定多个队列到同一个交换器的时候, 这些绑定容许使用相同的 BindingKey。 BindingKey 并非在全部的状况下都生效,它依赖于交换器类型, 比 如 fanout 类型的交换器就会无视 BindingKey, 而是将消息路由到全部绑定到该交换器的队列中 。
每一个Vhost本质上是一个mini版的RabbitMQ服务器,拥有本身的队列,交换机和绑定,它拥有本身的权限机制。
RabbitMQ 经常使用的交换器类型有 fanout、 direct、 topic、 headers 这四种。
它会把全部发送到该交换器的消息路由到全部与该交换器绑定的队列中,至关于广播模式。
把消息路由到那些 BindingKey 和 RoutingKey彻底匹配的队列中。
将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中。
headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。
package com.spring.hello.demo.mq; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class RabbitProducer { private static final String EXCHANGE_NAME = "exchange_demo"; private static final String ROUTING_KEY = " routingkey_demo"; private static final String QUEUE_NAME = "queue_demo"; private static final String IP_ADDRESS = "192.168.93.131"; private static final int PORT = 5672;// RabbitMQ public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP_ADDRESS); factory.setPort(PORT); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = factory.newConnection();// 建立链接 Channel channel = connection.createChannel(); // 建立信道 //建立一个 type="direct"、持久化的、非自动删除的交换器 channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null); // 建立一个持久化、非排他的、非自动删除的队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 将交换器与队列经过路由键绑定 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); // 发送一条持久化的消息: Hello World! String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); // 关闭资源 channel.close(); connection.close(); } } package com.spring.hello.demo.mq; import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Address; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class RabbitConsumer { private static final String QUEUE_NAME = "queue_demo"; private static final String IP_ADDRESS = "192.168.93.131"; private static final int PORT = 5672;// RabbitMQ public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Address[] addresses = new Address[] { new Address(IP_ADDRESS, PORT) }; ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); // 这里的链接方式与生产者的 demo 略有不一样, 注意辨别区别 Connection connection = factory.newConnection(addresses); // 建立链接 final Channel channel = connection.createChannel(); // 建立信道 channel.basicQos(64); // 设置客户端最多接收未被 ack 的消息的个数 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { System.out.println("recv message: " + new String(body)); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } //channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, consumer); // 等待回调函数执行完毕以后, 关闭资源 TimeUnit.SECONDS.sleep(15); channel.close(); connection.close(); } }
exchange: 交换器的名称。
type: 交换器的类型,常见的如 fanout、 direct、 topic
durable: 设置是否持久化。 durable 设置为 true 表示持久化, 反之是非持久化。持 久化能够将交换器存盘,在服务器重启 的时候不会丢失相关信息。
autoDelete: 设置是否自动删除。 autoDelete 设置为 true 则表示自动删除。自动 删除的前提是至少有一个队列或者交换器与这个交换器绑定, 以后全部与这个交换器绑
定的队列或者交换器都与此解绑。注意不能错误地把这个参数理解为: "当与此交换器 链接的客户端都断开时, RabbitMQ 会自动删除本交换器"。
internal : 设置是不是内置的。若是设置为 true,则表示是内置的交换器,客户端程 序没法直接发送消息到这个交换器中,只能经过交换器路由到交换器这种方式。
argument: 其余一些结构化参数,好比 alternate-exchange (备份交换机)
Queue. DeclareOk queueDeclare (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<Str工ng , Object> arguments) throws IOException;
queue : 队列的名称。
durable: 设置是否持久化。为 true 则设置队列为持久化。持久化的队列会存盘,在 服务器重启的时候能够保证不丢失相关信息。
exclusive: 设置是否排他。为 true 则设置队列为排他的。若是一个队列被声明为排 他队列,该队列仅对首次声明它的链接可见,并在链接断开时自动删除。这里须要注意三点:排他队列是基于链接(Connection) 可见的,同一个链接的不一样信道 (Channel) 是能够同时访问同一链接建立的排他队列; "首次"是指若是一个链接己经声明了一个排他队列,其余链接是不容许创建同名的排他队列的,这个与普通队列不一样:即便该队列是持久化的,一旦链接关闭或者客户端退出,该排他队列都会被自动删除,这种队列 适用于一个客户端同时发送和读取消息的应用场景。
autoDelete: 设置是否自动删除。为 true 则设置队列为自动删除。自动删除的前提是: 至少有一个消费者链接到这个队列,以后全部与这个队列链接的消费者都断开时,才会自动删除。不能把这个参数错误地理解为: "当链接到此队列的全部客户端断开时,这 个队列自动删除",由于生产者客户端建立这个队列,或者没有消费者客户端与这个队列链接时,都不会自动删除这个队列。
argurnents: 设置队列的其余一些参数,如 x-rnessage-ttl 、 x-expires 、x-rnax-length、x-rnax-length-bytes 、x-dead-letter-exchange、x-dead-letter-routing-key, x-rnax-priority 等。
将队列和交换器绑定起来
Queue . BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
queue: 队列名称:
exchange: 交换器的名称:
routingKey: 用来绑定队列和交换器的路由键;
argument: 定义绑定的一些参数。
将交换器与交换器绑定,绑定以后, 消息从 source 交 换器转发到 destination 交换器,某种程度上来讲 destination 交换器能够看做一个队列。
Exchange . BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
channel . exchangeDeclare( "source" , "direct", false, true, null) ; channel . exchangeDeclare("destination", "fanout" , false, true, null); channel.exchangeBind("destination" , "source" , "exKey"); channel . queueDeclare( "queue", false, false, true, null); channel . queueBind("queue" , " dest工nation " , "") ; channel.basicPublish( "source" , "exKey" , nul l , "exToExDemo". getBytes ()) ;
void basicPublish(String exchange, String routingKey, boo1ean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
exchange: 交换器的名称,指明消息须要发送到哪一个交换器中 。 若是设置为空字符串, 则消息会被发送到 RabbitMQ 默认的交换器中。
routingKey: 路由键,交换器根据路由键将消息存储到相应的队列之中 。
props : 消息的基本属性集,其包含 14 个属性成员,分别有 contentType 、contentEncoding、 headers (Map<String , Object>) 、 deliveryMode、 priority、correlationld、 replyTo、 expiration、 messageld、 timestamp、 type、 userld、appld、 clusterld。
byte [] body: 消息体 (payload),真正须要发送的消息。
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String ,Object> arguments, Consumer callback) throws IOException;
queue: 队列的名称:
autoAck: 设置是否自动确认。建议设成 false,即不自动确认:
consumerTag: 消费者标签,用来区分多个消费者:
noLocal : 设置为 true 则表示不能将同一个 Connection中生产者发送的消息传送给 这个 Connection 中的消费者:
exclusive : 设置是否排他:
arguments : 设置消费者的其余参数:
callback: 设置消费者的回调函数。用来处理 RabbitMQ 推送过来的消息,好比 DefaultConsumer, 使用时须要客户端重写 (override) 其中的方法。
对于消费者客户端来讲重写 handleDelivery 方法是十分方便的。更复杂的消费者客户端会重写更多的方法, 具体以下:
void handleConsumeOk(String consumerTag) ;
void handleCancelOk(String consumerTag);
void handleCancel(String consumerTag) throws IOException;
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) ;
void handleRecoverOk(String consumerTag);
容许限制信道上的消费者所能保持的最大 未确认消息的数量。
void basicQos(int prefetchSize, int prefetchCount, boo1ean global)
prefetchSize 这个参数表示消费者所能接收未确认消息的整体大小的上限,单位为 B
prefetchSize 以确保发送的消息都没有超过所限定的 prefetchCount 的值
例子:
channel .basicQos(3, false); // Per consumer limit
channel.basicQos(5, true); // Per channel limit
channel .basicConsume("queuel", false, consumerl) ;
channel.basicConsume("queue2", false, consumer2) ;
那么这里每一个消费者最多只能收到 3 个未确认的消息,两个消费者能收到的未确认的消息个数之和的上限为 5。在未确认消息的状况下,若是 consumerl 接收到了消息 1 、 2 和 3,那么 consumer2至多只能收到 11 和 12。若是像这样同时使用两种 global 的模式,则会增长 RabbitMQ 的负载,由于 RabbitMQ 须要更多的资源来协调完成这些限制。如无特殊须要,最好只使用 global 为 false 的设置,这也是默认的设置。
采用消息确认机制后,只要设置 autoAck 参数为 false,消费者就有足够的时间处理消息 (任务),不用担忧处理消息过程当中消费者进程挂掉后消息丢失的问题, 由于 RabbitMQ 会一直 等待持有消息直到消费者显式调用 Basic.Ack 命令为止。若是 RabbitMQ 一直没有收到消费者的确认信号,而且消费此消息的消费者己经 断开链接,则 RabbitMQ 会安排该消息从新进入队列,等待投递给下一个消费者,固然也有可 能仍是原来的那个消费者。当 autoAck 等于 true 时, RabbitMQ 会自动把发送出去的 消息置为确认,而后从内存(或者磁盘)中删除,而无论消费者是否真正地消费到了这些消息。
消息拒绝:
void basicReject(long deliveryTag, boolean requeue) throws IOException;
其中 deliveryTag 能够看做消息的编号 ,它是一个 64 位的长整型值,最大值是 9223372036854775807。若是 requeue 参数设置为 true,则 RabbitMQ 会从新将这条消息存入 队列,以即可以发送给下一个订阅的消费者;若是 requeue 参数设置为 false,则 RabbitMQ 当即会把消息从队列中移除,而不会把它发送给新的消费者。
Basic.Reject 命令一次只能拒绝一条消息 ,若是想要批量拒绝消息 ,则可使用 Basic.Nack 这个命令。
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
其中 deliveryTag 和 requeue 的含义能够参考 basicReject 方法。 multiple 参数设置为 false 则表示拒绝编号为 deliveryT坷的这一条消息,这时候 basicNack 和 basicReject 方法同样; multiple 参数设置为 true 则表示拒绝 deliveryTag 编号以前所 有未被当前消费者确认的消息。
消息恢复:
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;
channel.basicRecover 方法用来请求 RabbitMQ 从新发送还未被确认的消息。 若是 requeue 参数设置为 true,则未被确认的消息会被从新加入到队列中,这样对于同一条消息 来讲,可能会被分配给与以前不一样的消费者。若是 requeue 参数设置为 false,那么同一条消 息会被分配给与以前相同的消费者。默认状况下,若是不设置 requeue 这个参数,至关于channel.basicRecover(true) ,即 requeue 默认为 true