消息队列之 RabbitMQ

关于消息队列,从前年开始断断续续看了些资料,想写好久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时候把这块的知识整理记录一下了。html

市面上的消息队列产品有不少,好比老牌的 ActiveMQ、RabbitMQ ,目前我看最火的 Kafka ,还有 ZeroMQ ,去年末阿里巴巴捐赠给 Apache 的 RocketMQ ,连 redis 这样的 NoSQL 数据库也支持 MQ 功能。总之这块知名的产品就有十几种,就我本身的使用经验和兴趣只打算谈谈 RabbitMQ、Kafka 和 ActiveMQ ,本文先讲 RabbitMQ ,在此以前先看下消息队列的相关概念。java

什么叫消息队列

消息(Message)是指在应用间传送的数据。消息能够很是简单,好比只包含文本字符串,也能够更复杂,可能包含嵌入对象。node

消息队列(Message Queue)是一种应用间的通讯方式,消息发送后能够当即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而无论是谁发布的。这样发布者和使用者都不用知道对方的存在。redis

为什么用消息队列

从上面的描述中能够看出消息队列是一种应用间的异步协做机制,那何时须要使用 MQ 呢?数据库

以常见的订单系统为例,用户点击【下单】按钮以后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知。在业务发展初期这些逻辑可能放在一块儿同步执行,随着业务的发展订单量增加,须要提高系统服务的性能,这时能够将一些不须要当即生效的操做拆分出来异步执行,好比发放红包、发短信通知等。这种场景下就能够用 MQ ,在下单的主流程(好比扣减库存、生成相应单据)完成以后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取MQ的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信之类的消息时,执行相应的业务逻辑。windows

以上是用于业务解耦的状况,其它常见场景包括最终一致性、广播、错峰流控等等。后端

RabbitMQ 特色

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。安全

AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。bash

RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特色包括:服务器

  1. 可靠性(Reliability) RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。

  2. 灵活的路由(Flexible Routing) 在消息进入队列以前,经过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,能够将多个 Exchange 绑定在一块儿,也经过插件机制实现本身的 Exchange 。

  3. 消息集群(Clustering) 多个 RabbitMQ 服务器能够组成一个集群,造成一个逻辑 Broker 。

  4. 高可用(Highly Available Queues) 队列能够在集群中的机器上进行镜像,使得在部分节点出问题的状况下队列仍然可用。

  5. 多种协议(Multi-protocol) RabbitMQ 支持多种消息队列协议,好比 STOMP、MQTT 等等。

  6. 多语言客户端(Many Clients) RabbitMQ 几乎支持全部经常使用语言,好比 Java、.NET、Ruby 等等。

  7. 管理界面(Management UI) RabbitMQ 提供了一个易用的用户界面,使得用户能够监控和管理消息 Broker 的许多方面。

  8. 跟踪机制(Tracing) 若是消息异常,RabbitMQ 提供了消息跟踪机制,使用者能够找出发生了什么。

  9. 插件机制(Plugin System) RabbitMQ 提供了许多插件,来从多方面进行扩展,也能够编写本身的插件。

RabbitMQ 中的概念

消息模型

全部 MQ 产品从模型抽象上来讲都是同样的过程: 消费者(consumer)订阅某个队列。生产者(producer)建立消息,而后发布到队列(queue)中,最后将消息发送到监听的消费者。

消息流

RabbitMQ 基本概念

上面只是最简单抽象的描述,具体到 RabbitMQ 则有更详细的概念须要解释。上面介绍过 RabbitMQ 是 AMQP 协议的一个开源实现,因此其内部实际上也是 AMQP 中的基本概念:

RabbitMQ 内部结构

  1. Message 消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其余消息的优先权)、delivery-mode(指出该消息可能须要持久性存储)等。
  2. Publisher 消息的生产者,也是一个向交换器发布消息的客户端应用程序。
  3. Exchange 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
  4. Binding 绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列链接起来的路由规则,因此能够将交换器理解成一个由绑定构成的路由表。
  5. Queue 消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者链接到这个队列将其取走。
  6. Connection 网络链接,好比一个TCP链接。
  7. Channel 信道,多路复用链接中的一条独立的双向数据流通道。信道是创建在真实的TCP链接内地虚拟链接,AMQP 命令都是经过信道发出去的,无论是发布消息、订阅队列仍是接收消息,这些动做都是经过信道完成。由于对于操做系统来讲创建和销毁 TCP 都是很是昂贵的开销,因此引入了信道的概念,以复用一条 TCP 链接。
  8. Consumer 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
  9. Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每一个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有本身的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在链接时指定,RabbitMQ 默认的 vhost 是 / 。
  10. Broker 表示消息队列服务器实体。

AMQP 中的消息路由

AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差异,AMQP 中增长了 Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。

AMQP 的消息路由过程

Exchange 类型

Exchange分发消息时根据类型的不一样分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器彻底一致,但性能差不少,目前几乎用不到了,因此直接看另外三种类型:

  1. direct
    direct 交换器
    消息中的路由键(routing key)若是和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名彻底匹配,若是一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是彻底匹配、单播的模式。
  2. fanout
    fanout 交换器
    每一个发到 fanout 类型交换器的消息都会分到全部绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每一个发送到交换器的消息都会被转发到与该交换器绑定的全部队列上。很像子网广播,每台子网内的主机都得到了一份复制的消息。fanout 类型转发消息是最快的。
  3. topic
    topic 交换器
    topic 交换器经过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列须要绑定到一个模式上。它将路由键和绑定键的字符串切分红单词,这些单词之间用点隔开。它一样也会识别两个通配符:符号“#”和符号“*”。#匹配0个或多个单词,*匹配很少很多一个单词。

RabbitMQ 安装

通常来讲安装 RabbitMQ 以前要安装 Erlang ,能够去Erlang官网下载。接着去RabbitMQ官网下载安装包,以后解压缩便可。根据操做系统不一样官网提供了相应的安装说明:WindowsDebian / UbuntuRPM-based LinuxMac

若是是Mac 用户,我的推荐使用 HomeBrew 来安装,安装前要先更新 brew:

brew update
复制代码

接着安装 rabbitmq 服务器:

brew install rabbitmq
复制代码

这样 RabbitMQ 就安装好了,安装过程当中会自动其所依赖的 Erlang 。

RabbitMQ 运行和管理

  1. 启动 启动很简单,找到安装后的 RabbitMQ 所在目录下的 sbin 目录,能够看到该目录下有6个以 rabbitmq 开头的可执行文件,直接执行 rabbitmq-server 便可,下面将 RabbitMQ 的安装位置以 . 代替,启动命令就是:
./sbin/rabbitmq-server
复制代码

启动正常的话会看到一些启动过程信息和最后的 completed with 7 plugins,这也说明启动的时候默认加载了7个插件。

正常启动

  1. 后台启动 若是想让 RabbitMQ 以守护程序的方式在后台运行,能够在启动的时候加上 -detached 参数:
./sbin/rabbitmq-server -detached
复制代码
  1. 查询服务器状态 sbin 目录下有个特别重要的文件叫 rabbitmqctl ,它提供了 RabbitMQ 管理须要的几乎一站式解决方案,绝大部分的运维命令它均可以提供。 查询 RabbitMQ 服务器的状态信息能够用参数 status :
./sbin/rabbitmqctl status
复制代码

该命令将输出服务器的不少信息,好比 RabbitMQ 和 Erlang 的版本、OS 名称、内存等等

  1. 关闭 RabbitMQ 节点 咱们知道 RabbitMQ 是用 Erlang 语言写的,在Erlang 中有两个概念:节点和应用程序。节点就是 Erlang 虚拟机的每一个实例,而多个 Erlang 应用程序能够运行在同一个节点之上。节点之间能够进行本地通讯(无论他们是否是运行在同一台服务器之上)。好比一个运行在节点A上的应用程序能够调用节点B上应用程序的方法,就好像调用本地函数同样。若是应用程序因为某些缘由奔溃,Erlang 节点会自动尝试重启应用程序。 若是要关闭整个 RabbitMQ 节点能够用参数 stop :
./sbin/rabbitmqctl stop
复制代码

它会和本地节点通讯并指示其干净的关闭,也能够指定关闭不一样的节点,包括远程节点,只须要传入参数 -n :

./sbin/rabbitmqctl -n rabbit@server.example.com stop 
复制代码

-n node 默认 node 名称是 rabbit@server ,若是你的主机名是 server.example.com ,那么 node 名称就是 rabbit@server.example.com 。

  1. 关闭 RabbitMQ 应用程序 若是只想关闭应用程序,同时保持 Erlang 节点运行则能够用 stop_app:
./sbin/rabbitmqctl stop_app
复制代码

这个命令在后面要讲的集群模式中将会颇有用。

  1. 启动 RabbitMQ 应用程序
./sbin/rabbitmqctl start_app
复制代码
  1. 重置 RabbitMQ 节点
./sbin/rabbitmqctl reset
复制代码

该命令将清除全部的队列。

  1. 查看已声明的队列
./sbin/rabbitmqctl list_queues
复制代码
  1. 查看交换器
./sbin/rabbitmqctl list_exchanges
复制代码

该命令还能够附加参数,好比列出交换器的名称、类型、是否持久化、是否自动删除:

./sbin/rabbitmqctl list_exchanges name type durable auto_delete
复制代码
  1. 查看绑定
./sbin/rabbitmqctl list_bindings
复制代码

Java 客户端访问

RabbitMQ 支持多种语言访问,以 Java 为例看下通常使用 RabbitMQ 的步骤。

  1. maven工程的pom文件中添加依赖
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
</dependency>
复制代码
  1. 消息生产者
package org.study.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //建立链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        //设置 RabbitMQ 地址
        factory.setHost("localhost");
        //创建到代理服务器到链接
        Connection conn = factory.newConnection();
        //得到信道
        Channel channel = conn.createChannel();
        //声明交换器
        String exchangeName = "hello-exchange";
        channel.exchangeDeclare(exchangeName, "direct", true);

        String routingKey = "hola";
        //发布消息
        byte[] messageBodyBytes = "quit".getBytes();
        channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

        channel.close();
        conn.close();
    }
}
复制代码
  1. 消息消费者
package org.study.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("localhost");
        //创建到代理服务器到链接
        Connection conn = factory.newConnection();
        //得到信道
        final Channel channel = conn.createChannel();
        //声明交换器
        String exchangeName = "hello-exchange";
        channel.exchangeDeclare(exchangeName, "direct", true);
        //声明队列
        String queueName = channel.queueDeclare().getQueue();
        String routingKey = "hola";
        //绑定队列,经过键 hola 将队列和交换器绑定起来
        channel.queueBind(queueName, exchangeName, routingKey);

        while(true) {
            //消费消息
            boolean autoAck = false;
            String consumerTag = "";
            channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    String routingKey = envelope.getRoutingKey();
                    String contentType = properties.getContentType();
                    System.out.println("消费的路由键:" + routingKey);
                    System.out.println("消费的内容类型:" + contentType);
                    long deliveryTag = envelope.getDeliveryTag();
                    //确认消息
                    channel.basicAck(deliveryTag, false);
                    System.out.println("消费的消息体内容:");
                    String bodyStr = new String(body, "UTF-8");
                    System.out.println(bodyStr);

                }
            });
        }
    }
}
复制代码
  1. 启动 RabbitMQ 服务器
./sbin/rabbitmq-server
复制代码
  1. 运行 Consumer 先运行 Consumer ,这样当生产者发送消息的时候能在消费者后端看到消息记录。
  2. 运行 Producer 接着运行 Producer ,发布一条消息,在 Consumer 的控制台能看到接收的消息:
    Consumer 控制台

RabbitMQ 集群

RabbitMQ 最优秀的功能之一就是内建集群,这个功能设计的目的是容许消费者和生产者在节点崩溃的状况下继续运行,以及经过添加更多的节点来线性扩展消息通讯吞吐量。RabbitMQ 内部利用 Erlang 提供的分布式通讯框架 OTP 来知足上述需求,使客户端在失去一个 RabbitMQ 节点链接的状况下,仍是可以从新链接到集群中的任何其余节点继续生产、消费消息。

RabbitMQ 集群中的一些概念

RabbitMQ 会始终记录如下四种类型的内部元数据:

  1. 队列元数据 包括队列名称和它们的属性,好比是否可持久化,是否自动删除
  2. 交换器元数据 交换器名称、类型、属性
  3. 绑定元数据 内部是一张表格记录如何将消息路由到队列
  4. vhost 元数据 为 vhost 内部的队列、交换器、绑定提供命名空间和安全属性

在单一节点中,RabbitMQ 会将全部这些信息存储在内存中,同时将标记为可持久化的队列、交换器、绑定存储到硬盘上。存到硬盘上能够确保队列和交换器在节点重启后可以重建。而在集群模式下一样也提供两种选择:存到硬盘上(独立节点的默认设置),存在内存中。

若是在集群中建立队列,集群只会在单个节点而不是全部节点上建立完整的队列信息(元数据、状态、内容)。结果是只有队列的全部者节点知道有关队列的全部信息,所以当集群节点崩溃时,该节点的队列和绑定就消失了,而且任何匹配该队列的绑定的新消息也丢失了。还好RabbitMQ 2.6.0以后提供了镜像队列以免集群节点故障致使的队列内容不可用。

RabbitMQ 集群中能够共享 user、vhost、exchange等,全部的数据和状态都是必须在全部节点上复制的,例外就是上面所说的消息队列。RabbitMQ 节点能够动态的加入到集群中。

当在集群中声明队列、交换器、绑定的时候,这些操做会直到全部集群节点都成功提交元数据变动后才返回。集群中有内存节点和磁盘节点两种类型,内存节点虽然不写入磁盘,可是它的执行比磁盘节点要好。内存节点能够提供出色的性能,磁盘节点能保障配置信息在节点重启后仍然可用,那集群中如何平衡这二者呢?

RabbitMQ 只要求集群中至少有一个磁盘节点,全部其余节点能够是内存节点,当节点加入或离开集群时,它们必需要将该变动通知到至少一个磁盘节点。若是只有一个磁盘节点,恰好又是该节点崩溃了,那么集群能够继续路由消息,但不能建立队列、建立交换器、建立绑定、添加用户、更改权限、添加或删除集群节点。换句话说集群中的惟一磁盘节点崩溃的话,集群仍然能够运行,但直到该节点恢复,不然没法更改任何东西。

RabbitMQ 集群配置和启动

若是是在一台机器上同时启动多个 RabbitMQ 节点来组建集群的话,只用上面介绍的方式启动第2、第三个节点将会由于节点名称和端口冲突致使启动失败。因此在每次调用 rabbitmq-server 命令前,设置环境变量 RABBITMQ_NODENAME 和 RABBITMQ_NODE_PORT 来明确指定惟一的节点名称和端口。下面的例子端口号从5672开始,每一个新启动的节点都加1,节点也分别命名为test_rabbit_一、test_rabbit_二、test_rabbit_3。

启动第1个节点:

RABBITMQ_NODENAME=test_rabbit_1 RABBITMQ_NODE_PORT=5672 ./sbin/rabbitmq-server -detached
复制代码

启动第2个节点:

RABBITMQ_NODENAME=test_rabbit_2 RABBITMQ_NODE_PORT=5673 ./sbin/rabbitmq-server -detached
复制代码

启动第2个节点前建议将 RabbitMQ 默认激活的插件关掉,不然会存在使用了某个插件的端口号冲突,致使节点启动不成功。

如今第2个节点和第1个节点都是独立节点,它们并不知道其余节点的存在。集群中除第一个节点外后加入的节点须要获取集群中的元数据,因此要先中止 Erlang 节点上运行的 RabbitMQ 应用程序,并重置该节点元数据,再加入而且获取集群的元数据,最后从新启动 RabbitMQ 应用程序。

中止第2个节点的应用程序:

./sbin/rabbitmqctl -n test_rabbit_2 stop_app
复制代码

重置第2个节点元数据:

./sbin/rabbitmqctl -n test_rabbit_2 reset
复制代码

第2节点加入第1个节点组成的集群:

./sbin/rabbitmqctl -n test_rabbit_2 join_cluster test_rabbit_1@localhost
复制代码

启动第2个节点的应用程序

./sbin/rabbitmqctl -n test_rabbit_2 start_app
复制代码

第3个节点的配置过程和第2个节点相似:

RABBITMQ_NODENAME=test_rabbit_3 RABBITMQ_NODE_PORT=5674 ./sbin/rabbitmq-server -detached

./sbin/rabbitmqctl -n test_rabbit_3 stop_app

./sbin/rabbitmqctl -n test_rabbit_3 reset

./sbin/rabbitmqctl -n test_rabbit_3 join_cluster test_rabbit_1@localhost

./sbin/rabbitmqctl -n test_rabbit_3 start_app
复制代码

RabbitMQ 集群运维

中止某个指定的节点,好比中止第2个节点:

RABBITMQ_NODENAME=test_rabbit_2 ./sbin/rabbitmqctl stop
复制代码

查看节点3的集群状态:

./sbin/rabbitmqctl -n test_rabbit_3 cluster_status
复制代码