RabbitMQ 总结

Connection & Channel

Connection 表明一个 TCP 链接,Channel 是创建在 Connection 上的虚拟链接。RabbitMQ 每条指令都是经过 Channel 完成的。java

对于 OS 而言,建立和销毁 TCP 链接的代价很是高,在高峰期很容易遇到瓶颈。程序中通常会有多个线程须要与 RabbitMQ创建通讯,消费或生产消息,经过 TCP 链接复用来减小性能开销。node

Connection 能够建立多个 Channel ,可是 Channel 不是线程安全的因此不能在线程间共享。web

Connection 在建立时能够传入一个 ExecutorService ,这个线程池时给该 Connection 上的 Consumer 用的。 正则表达式

Channel.isOpen 以及 Connection.isOpen 方法是同步的,所以若是在发送消息时频繁调用会产生竞争。咱们能够认为在 createChannel 方法后 Channel 以及处于开启状态。若在使用过程当中 Channel 关闭了,那么只要捕获抛出的 ShutDownSignalException 就能够了,同时建议捕获 IOException 以及 SocketException 防止链接意外关闭。docker

Exchange & Queue

消费者和生产者均可以声明一个已经存在的 Exchange 或者 Queue ,前提是参数彻底匹配现有的 Exchange 或者 Queue,不然会抛出异常。安全

QueueDeclare 参数:
exclusive: 排他队列,只有同一个 ConnectionChannel 能够访问,且在 Connection 关闭或者客户端退出后自动删除,即便 durabletruebash

queuePurge(String queue):清空队列服务器

Exchange 能够绑定另外一个 ExchangeexchangeBind(String destination, String source, String routeKey), 从 sourcedestinationcookie

若业务容许,则最好预先建立好 Exchange 以及 Queue 并进行绑定(rabbitmqadmin),防止 Exchange 没有绑定 Queue 或 绑定错误的 Queue 而致使消息丢失(关键信息应当使用 mandatory 参数)。

Alternate Exchange: 在 Channel.exchangeDeclare 时添加 alternate-exchange 参数或在 Policy 中声明。mandatorytrue 时,未被路由的消息会被发送到 Alternate Exchange 。建议 Exchange Type 设置为 fanout ,不然当 RoutingKey 依然不匹配就会被返回 Producerapp

P.S. 有些书上讲备份交换器和 mandatory 参数一块儿使用 mandatory 参数失效是错的,当 RoutingKey 不匹配 Alternate Exchange 依然会被返回 Producer
(rabbitmq v3.7 测试)
Map<String, Object> arg = new HashMap<String, Object>() {{
    put("alternate-exchange", "alt");
}};
channel.exchangeDeclare("normalExchange", "direct", true, false, arg);
channel.exchangeDeclare("alt", "fanout", true, false, null);
channel.queueDeclare("normalQueue", true, false, false, null);
channel.queueDeclare("notSend", true, false, false, null);
channel.queueBind("normalQueue", "normalExchange", "key");
channel.queueBind("notSend", "alt", "");

Publish & Consume

Publish Confirm

消息发送到服务器后可能还没来的及刷到磁盘中,服务器就挂掉,从而形成消息丢失。 Publish Confirm 可以在消息确实到达服务器(开启持久化的消息会在刷入磁盘以后)以后返回一个确认给 Publisher。

经过 channel.confirmSelectedChannel 设置为 Confirm 模式,并为 Channel 添加一个 ConfirmLister 来监听返回的确认。

SortedSet<Long> unconfirmedSet = new TreeSet<>();
channel.confirmSelect();
channel.addConfirmListener((deliveryTag, multiple) -> {

    System.out.println("handleAck: " + deliveryTag + " " + multiple);
    if (multiple) {
        unconfirmedSet.headSet(deliveryTag - 1).clear();
    } else {
        unconfirmedSet.remove(deliveryTag);
    }
}, (deliveryTag, multiple) -> {

    System.out.println("handleNack: " + deliveryTag + " " + multiple);
    if (multiple) {
        unconfirmedSet.headSet(deliveryTag - 1).clear();
    } else {
        unconfirmedSet.remove(deliveryTag);
    }
});
while (true) {
    long seq = channel.getNextPublishSeqNo();
    channel.basicPublish("normalExchange", "key", true, null, message.getBytes(StandardCharsets.UTF_8));
    unconfirmedSet.add(seq);
    Thread.sleep(1000);
}

除了异步处理的方式以外还有批量确认以及事务的方法。批量确认的速度在大量连续发送的状况下和异步的方法差很少。无论怎样这两种消息确认的方法都要比事务的方式快7倍左右。

Consumer

通常应当实现 Consumer 接口或者继承 DefaultConsumer ,Consumer 经过 consumerTag 来进行区分。

消费消息有两种方式,一种是 Push ,一种是 Get。

Push 是由 RabbitMQ 以轮询的方式将消息推送到 Consumer ,方法为 basicConsume 。通常一个 Channel 对应一个 Consumer

Get 由客户端主动从 RabbitMQ 拉取一条消息,方法为 basicGet 。__不能循环执行 basicGet 来代替 basicConsume ,否则会严重影响性能。__

消息确认:autoAckfalseRabbitMQ 会等待 basicAck 的显式确认。除非 Consumer 链接断开不然一直等待确认。当 Consumer 显式调用 basicReject 或者 basicNack 并将 requeue 设为 true 后会将消息从新入队投递。通常咱们在业务处理完以后再 ack .
mandatory : 当 Exchange 没法匹配 QueueExchange 时,mandatorytrue 的消息会被返回给 Producer,不然会被丢弃。 经过 Channel.addReturnListener 来添加 ReturnListener 监视器。

TTL

  1. queueDeclare 时添加 x-message-ttl 参数,单位毫秒。

    Map<String, Object> arg = new HashMap<String, Object>() {{
        put("x-message-ttl", "1000000");
    }};
    channel.exchangeDeclare("normalExchange", "direct", true, false, arg);
  2. 使用 AMQP.BasicProperties.Builder 建立 AMQP.BasicProperties 并设置 expiration 参数。

    AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
    builder.expiration("100000");
    channel.basicPublish("normalExchange", "key", true, builder.build(), message.getBytes(StandardCharsets.UTF_8));

Dead Letter Exchange (DLX)

Dead Letter(死信):

  • Basic.Reject / Basic.Nack 而且 requeuetrue
  • 消息 TTL 过时
  • 队列达到最大长度

当消息成为 Dead Letter 以后, RabbitMQ 会自动把这个消息发到 DLX 上。

// 当发送到 normalQueue 中的消息成为 Dead Letter 以后会自动以
// dead-letter 为 routingKey 发送到 dlxQueue Exchange
Map<String, Object> arg = new HashMap<String, Object>() {{
    put("x-dead-letter-exchange", "dlx");
    put("x-dead-letter-routing-key", "dead-letter");
}};
channel.queueDeclare("normalQueue", true, false, false, arg);
channel.exchangeDeclare("dlx", "direct", true, false, false, null);
channel.queueDeclare("dlxQueue", true, false, false, null);

DLX 其余用法:延迟队列,消息 发送到一个暂存的、没有 ConsumerQueue 并设置 TTL,Consumer 消费 DLX 绑定的 Queue 的消息,建议给暂存的 Queue 设置一个最大的 TTL,防止消息没有设置 TTL 而一直堆积在 Queue 中。

Priority

消息的消费能够有优先级,Queue 的最大优先级能够经过 x-max-priority 进行设置。

Map<String, Object> arg = new HashMap<String, Object>() {{
    put("x-max-priority", 5);
}};
channel.queueDeclare("normalQueue", true, false, false, arg);
channel.exchangeDeclare("normalExchange", "direct", true, false, null);

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.priority(2);
channel.basicPublish("normalExchange", "key", true, builder.build(), messsage.getBytes(StandardCharsets.UTF_8));

Durability

Exchange , Queue , 消息均可以进行持久化。在消息发送到 Exchange 以后会马上路由到 Queue 中,所以未持久化的 Exchange 在重启后会丢失 Exchange 元数据以及绑定,对 Queue 和消息的持久化无影响。

未持久化的 Queue 在重启后会丢失,包括 Queue 中的消息,无论消息是否设置了持久化。

未持久化的消息在重启后会丢失,即便所在的 Queue 已持久化。

channel.queueDeclare("normalQueue", true, false, false, null); // Queue 持久化
channel.exchangeDeclare("normalExchange", "direct", true, false, null); // Exchange 持久化
channel.queueBind("normalQueue", "normalExchange", "key");
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);    // 消息持久化
channel.basicPublish("normalExchange", "key", true, builder.build(), messsage.getBytes(StandardCharsets.UTF_8));

Qos

Qos 的做用时负载均衡。当一个队列有两个 Consumer ,一个性能很好 A,另外一个不那么好 B,RabbitMQ 会轮询,将消息平均地分给这两个 Consumer。可见 B 上的堆积的消息会愈来愈多,而 A 上的线程可能会空闲。 Qos 的做用就是防止一个 Consumer 堆积了过多的消息,把这些消息分给其余 Consumer。

global 参数:

channel.basicQos(3, false);  // each Consumer limit 3
channel.basicQos(5, true);   // this channel limit to 5

global 参数会让 RabbitMQ 调用更多资源,尽可能不要设置(默认值为 false)。

Relibility

RabbitMQ 支持最少一次和最多一次。
最少一次:

- 启用 Publisher Confirm 或者 事务保证消息可以到达服务器。
- 启用 mandatory 参数保证消息不回被 Exchange 丢掉。
- 消息和 Queue 开启持久化。
- Consumer autoAck off, 并确保消息在处理完以后再 ack

Policy

Policy 能够很方便的批量设置 Exchange 以及 Queue 的属性,可是 Policy 的优先级较低,请注意。

Policy 能够经过 HTTP API, web console,以及 cli 的方式。

rabbitmqctl set_policy [-p vhost] [--priority prirority] [--apply-to apply-to] {name} {pattern} {defination}
  • vhost : 指定 vhost
  • proiority : 若是一个 Queue 或者 Exchange 有多个 Policy 的状况下,只有 priority 最大的那个 Policy 才会生效。
  • apply-to : 应用到

    • Exchange and Queue
    • Exchange
    • Queue
  • name : Policy 的名字
  • pattern : Exchange 或者 Queue 名字的正则表达式
  • defination : 属性值,能够经过 management > Admin > Policies 的查看。

Cluster

RabbitMQ 会把全部的元数据存储到全部的节点上,可是队列是分散在集群中全部的节点上的。

Build A Cluster with docker

咱们尝试使用 Docker Compose 建立一个由 3 个服务组成的集群

version: "3"
services:
  node1:
    image: rabbitmq:3.7-management-alpine
    container_name: node1
    hostname: node1
    environment:
      RABBITMQ_ERLANG_COOKIE: secret_cookie_here
    ports:
      - "5673:5672"
      - "15673:15672"
  node2:
    image: rabbitmq:3.7-management-alpine
    container_name: node2
    hostname: node2
    environment:
      RABBITMQ_ERLANG_COOKIE: secret_cookie_here
    ports:
      - "5674:5672"
      - "15674:15672"
  node3:
    image: rabbitmq:3.7-management-alpine
    container_name: node3
    hostname: node3
    environment:
      RABBITMQ_ERLANG_COOKIE: secret_cookie_here
    ports:
      - "5675:5672"
      - "15675:15672"

经过设置 hostname ,容器内部的 rabbitmq 的 nodename 就变成相似 rabbitmq@node1。同时集群中的 RabbitMQ 须要相同的 RABBITMQ_ERLANG_COOKIE 来进行互相认证。

启动服务:

docker-compose up -d

而后将 node2 , node3 加入 node1 ,注意,加入集群以前 RabbitMQ 必须中止:

# 中止 rabbitmq
docker-compose exec node2 rabbitmqctl stop_app
docker-compose exec node3 rabbitmqctl stop_app
# 加入 node1
docker-compose exec node2 rabbitmqctl join_cluster rabbitmq@node1
docker-compose exec node3 rabbitmqctl join_cluster rabbitmq@node1
# 从新启动 
docker-compose exec node2 rabbitmqctl start_app
docker-compose exec node3 rabbitmqctl start_app

在任意一个节点上查询集群状态:

docker-compose exec node2 rabbitmqctl cluster_status

能够看到以下状态:

Cluster status of node rabbit@node2 ...
[{nodes,[{disc,[rabbit@node1,rabbit@node2,rabbit@node3]}]},
 {running_nodes,[rabbit@node3,rabbit@node1,rabbit@node2]},
 {cluster_name,<<"rabbit@node2">>},
 {partitions,[]},
 {alarms,[{rabbit@node3,[]},{rabbit@node1,[]},{rabbit@node2,[]}]}]

手动下线节点

将节点从在线状态下线, 首先中止节点,而后重置节点。

docker-compose exec node2 rabbitmqctl stop_app
docker-compose exec node2 rabbitmqctl reset
docker-compose exec node2 rabbitmqctl stop_app

在从新启动服务器以后能够发现该节点已经脱离了集群。

Cluster status of node rabbit@node2 ...
[{nodes,[{disc,[rabbit@node2]}]},
 {running_nodes,[rabbit@node2]},
 {cluster_name,<<"rabbit@node2">>},
 {partitions,[]},
 {alarms,[{rabbit@node2,[]}]}]

节点类型

RabbitMQ 的节点类型有两种,一种是 disc , 第二种是 ram。 RabbitMQ 要求集群中至少要有一个磁盘节点,储存了全部的元数据。当集群中的惟一一个磁盘节点崩溃后,集群能够继续收发消息,可是不能建立队列等操做。

RabbitMQ 在加入集群时默认为磁盘模式,若是要之内存模式加入:

docker-compose exec node2 rabbitmqctl join_cluster rabbit@node1 --ram

更改节点类型:

docker-compose exec node 2 rabbitmqctl change cluster_node_type desc

Mirror Queue

RabbitMQ 提供了 Master/Slave 模式的 Mirror Queue 机制。请注意,开启 Publisher Confirmed 或者事务的状况下,只有全部的 Slave 都 ACK 以后才会返回 ACK 给客户端。

开启 Mirror Queue 主要经过设置 Policy 其中最主要的是 defination

  • ha-mode: Mirror Queue 的模式

    • all : 默认的模式,表示在集群中的全部节点上进行镜像
    • exactly : 在指定数量的节点上进行镜像,数量由 ha-params 指定。
    • nodes : 在指定的节点上进行镜像,节点名称由 ha-params 指定。
  • ha-params : 如上所述
  • ha-sync-mode : 消息的同步模式

    • automatic : 当新的 Slave 加入集群以后会自动同步消息。
    • manual: 默认,当加入新的 Slave 以后不会自动把消息同步到新的 Slave 上。指导调用命令显式同步。
  • ha-promote-on-shutdown:

    • when-synced: 默认,若是主动中止 master ,那么 slave 不会自动接管。也就是说会指望 master 会重启启动,这能够保证消息不会丢失。
    • always: 无论 master 是由于什么缘由中止的,slave 会马上接管,有可能有一部分数据没有从 master 同步到 slave.
  • ha-promote-on-failure: 默认 always ,不推荐设置为 when-synced
相关文章
相关标签/搜索