Connection
表明一个 TCP 链接,Channel
是创建在 Connection
上的虚拟链接。RabbitMQ 每条指令都是经过 Channel
完成的。java
对于 OS 而言,建立和销毁 TCP 链接的代价很是高,在高峰期很容易遇到瓶颈。程序中通常会有多个线程须要与 RabbitMQ创建通讯,消费或生产消息,经过 TCP 链接复用来减小性能开销。node
Connection
能够建立多个 Channel
,可是 Channel
不是线程安全的因此不能在线程间共享。web
Connection
在建立时能够传入一个 ExecutorService
,这个线程池时给该 Connection
上的 Consumer
用的。 正则表达式
Channel.isOpen
以及 Connection.isOpen
方法是同步的,所以若是在发送消息时频繁调用会产生竞争。咱们能够认为在 createChannel
方法后 Channel
以及处于开启状态。若在使用过程当中 Channel
关闭了,那么只要捕获抛出的 ShutDownSignalException
就能够了,同时建议捕获 IOException
以及 SocketException
防止链接意外关闭。docker
消费者和生产者均可以声明一个已经存在的 Exchange
或者 Queue
,前提是参数彻底匹配现有的 Exchange
或者 Queue,不然会抛出异常。
安全
QueueDeclare
参数:exclusive
: 排他队列,只有同一个 Connection
的 Channel
能够访问,且在 Connection
关闭或者客户端退出后自动删除,即便 durable
为 true
。bash
queuePurge(String queue)
:清空队列服务器
Exchange
能够绑定另外一个 Exchange
:exchangeBind(String destination, String source, String routeKey)
, 从 source
到 destination
cookie
若业务容许,则最好预先建立好Exchange
以及Queue
并进行绑定(rabbitmqadmin),防止 Exchange 没有绑定Queue
或 绑定错误的Queue
而致使消息丢失(关键信息应当使用mandatory
参数)。
Alternate Exchange
: 在 Channel.exchangeDeclare
时添加 alternate-exchange
参数或在 Policy
中声明。mandatory
为 true
时,未被路由的消息会被发送到 Alternate Exchange
。建议 Exchange Type
设置为 fanout
,不然当 RoutingKey
依然不匹配就会被返回 Producer
。app
P.S. 有些书上讲备份交换器和mandatory
参数一块儿使用mandatory
参数失效是错的,当RoutingKey
不匹配Alternate Exchange
依然会被返回Producer
。
(rabbitmq v3.7 测试)
Map<String, Object> arg = new HashMap<String, Object>() {{ put("alternate-exchange", "alt"); }}; channel.exchangeDeclare("normalExchange", "direct", true, false, arg); channel.exchangeDeclare("alt", "fanout", true, false, null); channel.queueDeclare("normalQueue", true, false, false, null); channel.queueDeclare("notSend", true, false, false, null); channel.queueBind("normalQueue", "normalExchange", "key"); channel.queueBind("notSend", "alt", "");
消息发送到服务器后可能还没来的及刷到磁盘中,服务器就挂掉,从而形成消息丢失。 Publish Confirm 可以在消息确实到达服务器(开启持久化的消息会在刷入磁盘以后)以后返回一个确认给 Publisher。
经过 channel.confirmSelected
把 Channel
设置为 Confirm
模式,并为 Channel
添加一个 ConfirmLister
来监听返回的确认。
SortedSet<Long> unconfirmedSet = new TreeSet<>(); channel.confirmSelect(); channel.addConfirmListener((deliveryTag, multiple) -> { System.out.println("handleAck: " + deliveryTag + " " + multiple); if (multiple) { unconfirmedSet.headSet(deliveryTag - 1).clear(); } else { unconfirmedSet.remove(deliveryTag); } }, (deliveryTag, multiple) -> { System.out.println("handleNack: " + deliveryTag + " " + multiple); if (multiple) { unconfirmedSet.headSet(deliveryTag - 1).clear(); } else { unconfirmedSet.remove(deliveryTag); } }); while (true) { long seq = channel.getNextPublishSeqNo(); channel.basicPublish("normalExchange", "key", true, null, message.getBytes(StandardCharsets.UTF_8)); unconfirmedSet.add(seq); Thread.sleep(1000); }
除了异步处理的方式以外还有批量确认以及事务的方法。批量确认的速度在大量连续发送的状况下和异步的方法差很少。无论怎样这两种消息确认的方法都要比事务的方式快7倍左右。
通常应当实现 Consumer
接口或者继承 DefaultConsumer
,Consumer
经过 consumerTag
来进行区分。
消费消息有两种方式,一种是 Push
,一种是 Get。
Push
是由 RabbitMQ
以轮询的方式将消息推送到 Consumer
,方法为 basicConsume
。通常一个 Channel
对应一个 Consumer
。
Get
由客户端主动从 RabbitMQ
拉取一条消息,方法为 basicGet
。__不能循环执行 basicGet
来代替 basicConsume
,否则会严重影响性能。__
消息确认:autoAck
为 false
,RabbitMQ
会等待 basicAck
的显式确认。除非 Consumer
链接断开不然一直等待确认。当 Consumer
显式调用 basicReject
或者 basicNack
并将 requeue
设为 true
后会将消息从新入队投递。通常咱们在业务处理完以后再 ack
.mandatory
: 当 Exchange
没法匹配 Queue
或 Exchange
时,mandatory
为 true
的消息会被返回给 Producer
,不然会被丢弃。 经过 Channel.addReturnListener
来添加 ReturnListener
监视器。
queueDeclare
时添加 x-message-ttl
参数,单位毫秒。
Map<String, Object> arg = new HashMap<String, Object>() {{ put("x-message-ttl", "1000000"); }}; channel.exchangeDeclare("normalExchange", "direct", true, false, arg);
使用 AMQP.BasicProperties.Builder
建立 AMQP.BasicProperties
并设置 expiration
参数。
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration("100000"); channel.basicPublish("normalExchange", "key", true, builder.build(), message.getBytes(StandardCharsets.UTF_8));
Dead Letter(死信):
Basic.Reject
/ Basic.Nack
而且 requeue
为 true
当消息成为 Dead Letter 以后, RabbitMQ 会自动把这个消息发到 DLX 上。
// 当发送到 normalQueue 中的消息成为 Dead Letter 以后会自动以 // dead-letter 为 routingKey 发送到 dlxQueue Exchange Map<String, Object> arg = new HashMap<String, Object>() {{ put("x-dead-letter-exchange", "dlx"); put("x-dead-letter-routing-key", "dead-letter"); }}; channel.queueDeclare("normalQueue", true, false, false, arg); channel.exchangeDeclare("dlx", "direct", true, false, false, null); channel.queueDeclare("dlxQueue", true, false, false, null);
DLX 其余用法:延迟队列,消息 发送到一个暂存的、没有 Consumer
的 Queue
并设置 TTL,Consumer
消费 DLX 绑定的 Queue
的消息,建议给暂存的 Queue
设置一个最大的 TTL,防止消息没有设置 TTL 而一直堆积在 Queue
中。
消息的消费能够有优先级,Queue
的最大优先级能够经过 x-max-priority
进行设置。
Map<String, Object> arg = new HashMap<String, Object>() {{ put("x-max-priority", 5); }}; channel.queueDeclare("normalQueue", true, false, false, arg); channel.exchangeDeclare("normalExchange", "direct", true, false, null); AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.priority(2); channel.basicPublish("normalExchange", "key", true, builder.build(), messsage.getBytes(StandardCharsets.UTF_8));
Exchange
, Queue
, 消息均可以进行持久化。在消息发送到 Exchange
以后会马上路由到 Queue
中,所以未持久化的 Exchange
在重启后会丢失 Exchange
元数据以及绑定,对 Queue
和消息的持久化无影响。
未持久化的 Queue
在重启后会丢失,包括 Queue
中的消息,无论消息是否设置了持久化。
未持久化的消息在重启后会丢失,即便所在的 Queue
已持久化。
channel.queueDeclare("normalQueue", true, false, false, null); // Queue 持久化 channel.exchangeDeclare("normalExchange", "direct", true, false, null); // Exchange 持久化 channel.queueBind("normalQueue", "normalExchange", "key"); AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.deliveryMode(2); // 消息持久化 channel.basicPublish("normalExchange", "key", true, builder.build(), messsage.getBytes(StandardCharsets.UTF_8));
Qos 的做用时负载均衡。当一个队列有两个 Consumer ,一个性能很好 A,另外一个不那么好 B,RabbitMQ 会轮询,将消息平均地分给这两个 Consumer。可见 B 上的堆积的消息会愈来愈多,而 A 上的线程可能会空闲。 Qos 的做用就是防止一个 Consumer 堆积了过多的消息,把这些消息分给其余 Consumer。
global 参数:
channel.basicQos(3, false); // each Consumer limit 3 channel.basicQos(5, true); // this channel limit to 5
global 参数会让 RabbitMQ 调用更多资源,尽可能不要设置(默认值为 false)。
RabbitMQ 支持最少一次和最多一次。
最少一次:
- 启用 Publisher Confirm 或者 事务保证消息可以到达服务器。 - 启用 mandatory 参数保证消息不回被 Exchange 丢掉。 - 消息和 Queue 开启持久化。 - Consumer autoAck off, 并确保消息在处理完以后再 ack
Policy 能够很方便的批量设置 Exchange 以及 Queue 的属性,可是 Policy 的优先级较低,请注意。
Policy 能够经过 HTTP API, web console,以及 cli 的方式。
rabbitmqctl set_policy [-p vhost] [--priority prirority] [--apply-to apply-to] {name} {pattern} {defination}
vhost
: 指定 vhostproiority
: 若是一个 Queue 或者 Exchange 有多个 Policy 的状况下,只有 priority 最大的那个 Policy 才会生效。apply-to
: 应用到
name
: Policy 的名字pattern
: Exchange 或者 Queue 名字的正则表达式defination
: 属性值,能够经过 management > Admin > Policies 的查看。RabbitMQ 会把全部的元数据存储到全部的节点上,可是队列是分散在集群中全部的节点上的。
咱们尝试使用 Docker Compose 建立一个由 3 个服务组成的集群
version: "3" services: node1: image: rabbitmq:3.7-management-alpine container_name: node1 hostname: node1 environment: RABBITMQ_ERLANG_COOKIE: secret_cookie_here ports: - "5673:5672" - "15673:15672" node2: image: rabbitmq:3.7-management-alpine container_name: node2 hostname: node2 environment: RABBITMQ_ERLANG_COOKIE: secret_cookie_here ports: - "5674:5672" - "15674:15672" node3: image: rabbitmq:3.7-management-alpine container_name: node3 hostname: node3 environment: RABBITMQ_ERLANG_COOKIE: secret_cookie_here ports: - "5675:5672" - "15675:15672"
经过设置 hostname ,容器内部的 rabbitmq 的 nodename 就变成相似 rabbitmq@node1。同时集群中的 RabbitMQ 须要相同的 RABBITMQ_ERLANG_COOKIE 来进行互相认证。
启动服务:
docker-compose up -d
而后将 node2 , node3 加入 node1 ,注意,加入集群以前 RabbitMQ 必须中止:
# 中止 rabbitmq docker-compose exec node2 rabbitmqctl stop_app docker-compose exec node3 rabbitmqctl stop_app # 加入 node1 docker-compose exec node2 rabbitmqctl join_cluster rabbitmq@node1 docker-compose exec node3 rabbitmqctl join_cluster rabbitmq@node1 # 从新启动 docker-compose exec node2 rabbitmqctl start_app docker-compose exec node3 rabbitmqctl start_app
在任意一个节点上查询集群状态:
docker-compose exec node2 rabbitmqctl cluster_status
能够看到以下状态:
Cluster status of node rabbit@node2 ... [{nodes,[{disc,[rabbit@node1,rabbit@node2,rabbit@node3]}]}, {running_nodes,[rabbit@node3,rabbit@node1,rabbit@node2]}, {cluster_name,<<"rabbit@node2">>}, {partitions,[]}, {alarms,[{rabbit@node3,[]},{rabbit@node1,[]},{rabbit@node2,[]}]}]
将节点从在线状态下线, 首先中止节点,而后重置节点。
docker-compose exec node2 rabbitmqctl stop_app docker-compose exec node2 rabbitmqctl reset docker-compose exec node2 rabbitmqctl stop_app
在从新启动服务器以后能够发现该节点已经脱离了集群。
Cluster status of node rabbit@node2 ... [{nodes,[{disc,[rabbit@node2]}]}, {running_nodes,[rabbit@node2]}, {cluster_name,<<"rabbit@node2">>}, {partitions,[]}, {alarms,[{rabbit@node2,[]}]}]
RabbitMQ 的节点类型有两种,一种是 disc , 第二种是 ram。 RabbitMQ 要求集群中至少要有一个磁盘节点,储存了全部的元数据。当集群中的惟一一个磁盘节点崩溃后,集群能够继续收发消息,可是不能建立队列等操做。
RabbitMQ 在加入集群时默认为磁盘模式,若是要之内存模式加入:
docker-compose exec node2 rabbitmqctl join_cluster rabbit@node1 --ram
更改节点类型:
docker-compose exec node 2 rabbitmqctl change cluster_node_type desc
RabbitMQ 提供了 Master/Slave 模式的 Mirror Queue 机制。请注意,开启 Publisher Confirmed 或者事务的状况下,只有全部的 Slave 都 ACK 以后才会返回 ACK 给客户端。
开启 Mirror Queue 主要经过设置 Policy 其中最主要的是 defination
:
ha-mode
: Mirror Queue 的模式
all
: 默认的模式,表示在集群中的全部节点上进行镜像exactly
: 在指定数量的节点上进行镜像,数量由 ha-params
指定。nodes
: 在指定的节点上进行镜像,节点名称由 ha-params
指定。ha-params
: 如上所述ha-sync-mode
: 消息的同步模式
automatic
: 当新的 Slave 加入集群以后会自动同步消息。manual
: 默认,当加入新的 Slave 以后不会自动把消息同步到新的 Slave 上。指导调用命令显式同步。ha-promote-on-shutdown
:
when-synced
: 默认,若是主动中止 master ,那么 slave 不会自动接管。也就是说会指望 master 会重启启动,这能够保证消息不会丢失。always
: 无论 master 是由于什么缘由中止的,slave 会马上接管,有可能有一部分数据没有从 master 同步到 slave.ha-promote-on-failure
: 默认 always
,不推荐设置为 when-synced