转 https://www.jianshu.com/p/79ca08116d57html
关于消息队列,从前年开始断断续续看了些资料,想写好久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时候把这块的知识整理记录一下了。java
市面上的消息队列产品有不少,好比老牌的 ActiveMQ、RabbitMQ ,目前我看最火的 Kafka ,还有 ZeroMQ ,去年末阿里巴巴捐赠给 Apache 的 RocketMQ ,连 redis 这样的 NoSQL 数据库也支持 MQ 功能。总之这块知名的产品就有十几种,就我本身的使用经验和兴趣只打算谈谈 RabbitMQ、Kafka 和 ActiveMQ ,本文先讲 RabbitMQ ,在此以前先看下消息队列的相关概念。node
消息(Message)是指在应用间传送的数据。消息能够很是简单,好比只包含文本字符串,也能够更复杂,可能包含嵌入对象。redis
消息队列(Message Queue)是一种应用间的通讯方式,消息发送后能够当即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而无论是谁发布的。这样发布者和使用者都不用知道对方的存在。数据库
从上面的描述中能够看出消息队列是一种应用间的异步协做机制,那何时须要使用 MQ 呢?windows
以常见的订单系统为例,用户点击【下单】按钮以后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知。在业务发展初期这些逻辑可能放在一块儿同步执行,随着业务的发展订单量增加,须要提高系统服务的性能,这时能够将一些不须要当即生效的操做拆分出来异步执行,好比发放红包、发短信通知等。这种场景下就能够用 MQ ,在下单的主流程(好比扣减库存、生成相应单据)完成以后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取MQ的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信之类的消息时,执行相应的业务逻辑。后端
以上是用于业务解耦的状况,其它常见场景包括最终一致性、广播、错峰流控等等。安全
RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。bash
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。服务器
RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特色包括:
可靠性(Reliability)
RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
灵活的路由(Flexible Routing)
在消息进入队列以前,经过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,能够将多个 Exchange 绑定在一块儿,也经过插件机制实现本身的 Exchange 。
消息集群(Clustering)
多个 RabbitMQ 服务器能够组成一个集群,造成一个逻辑 Broker 。
高可用(Highly Available Queues)
队列能够在集群中的机器上进行镜像,使得在部分节点出问题的状况下队列仍然可用。
多种协议(Multi-protocol)
RabbitMQ 支持多种消息队列协议,好比 STOMP、MQTT 等等。
多语言客户端(Many Clients)
RabbitMQ 几乎支持全部经常使用语言,好比 Java、.NET、Ruby 等等。
管理界面(Management UI)
RabbitMQ 提供了一个易用的用户界面,使得用户能够监控和管理消息 Broker 的许多方面。
跟踪机制(Tracing)
若是消息异常,RabbitMQ 提供了消息跟踪机制,使用者能够找出发生了什么。
插件机制(Plugin System)
RabbitMQ 提供了许多插件,来从多方面进行扩展,也能够编写本身的插件。
全部 MQ 产品从模型抽象上来讲都是同样的过程:
消费者(consumer)订阅某个队列。生产者(producer)建立消息,而后发布到队列(queue)中,最后将消息发送到监听的消费者。
上面只是最简单抽象的描述,具体到 RabbitMQ 则有更详细的概念须要解释。上面介绍过 RabbitMQ 是 AMQP 协议的一个开源实现,因此其内部实际上也是 AMQP 中的基本概念:
AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差异,AMQP 中增长了 Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。
Exchange分发消息时根据类型的不一样分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器彻底一致,但性能差不少,目前几乎用不到了,因此直接看另外三种类型:
direct
消息中的路由键(routing key)若是和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名彻底匹配,若是一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是彻底匹配、单播的模式。
fanout
每一个发到 fanout 类型交换器的消息都会分到全部绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每一个发送到交换器的消息都会被转发到与该交换器绑定的全部队列上。很像子网广播,每台子网内的主机都得到了一份复制的消息。fanout 类型转发消息是最快的。
通常来讲安装 RabbitMQ 以前要安装 Erlang ,能够去Erlang官网下载。接着去RabbitMQ官网下载安装包,以后解压缩便可。根据操做系统不一样官网提供了相应的安装说明:Windows、Debian / Ubuntu、RPM-based Linux、Mac
若是是Mac 用户,我的推荐使用 HomeBrew 来安装,安装前要先更新 brew:
brew update
接着安装 rabbitmq 服务器:
brew install rabbitmq
这样 RabbitMQ 就安装好了,安装过程当中会自动其所依赖的 Erlang 。
./sbin/rabbitmq-server
启动正常的话会看到一些启动过程信息和最后的 completed with 7 plugins,这也说明启动的时候默认加载了7个插件。
./sbin/rabbitmq-server -detached
./sbin/rabbitmqctl status
该命令将输出服务器的不少信息,好比 RabbitMQ 和 Erlang 的版本、OS 名称、内存等等
./sbin/rabbitmqctl stop
它会和本地节点通讯并指示其干净的关闭,也能够指定关闭不一样的节点,包括远程节点,只须要传入参数 -n :
./sbin/rabbitmqctl -n rabbit@server.example.com stop
-n node 默认 node 名称是 rabbit@server ,若是你的主机名是 server.example.com ,那么 node 名称就是 rabbit@server.example.com 。
./sbin/rabbitmqctl stop_app
这个命令在后面要讲的集群模式中将会颇有用。
./sbin/rabbitmqctl start_app
./sbin/rabbitmqctl reset
该命令将清除全部的队列。
./sbin/rabbitmqctl list_queues
./sbin/rabbitmqctl list_exchanges
该命令还能够附加参数,好比列出交换器的名称、类型、是否持久化、是否自动删除:
./sbin/rabbitmqctl list_exchanges name type durable auto_delete
./sbin/rabbitmqctl list_bindings
RabbitMQ 支持多种语言访问,以 Java 为例看下通常使用 RabbitMQ 的步骤。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.1.0</version> </dependency>
package org.study.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //建立链接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //设置 RabbitMQ 地址 factory.setHost("localhost"); //创建到代理服务器到链接 Connection conn = factory.newConnection(); //得到信道 Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); String routingKey = "hola"; //发布消息 byte[] messageBodyBytes = "quit".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes); channel.close(); conn.close(); } }
package org.study.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); //创建到代理服务器到链接 Connection conn = factory.newConnection(); //得到信道 final Channel channel = conn.createChannel(); //声明交换器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); //声明队列 String queueName = channel.queueDeclare().getQueue(); String routingKey = "hola"; //绑定队列,经过键 hola 将队列和交换器绑定起来 channel.queueBind(queueName, exchangeName, routingKey); while(true) { //消费消息 boolean autoAck = false; String consumerTag = ""; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消费的路由键:" + routingKey); System.out.println("消费的内容类型:" + contentType); long deliveryTag = envelope.getDeliveryTag(); //确认消息 channel.basicAck(deliveryTag, false); System.out.println("消费的消息体内容:"); String bodyStr = new String(body, "UTF-8"); System.out.println(bodyStr); } }); } } }
./sbin/rabbitmq-server
运行 Producer
接着运行 Producer ,发布一条消息,在 Consumer 的控制台能看到接收的消息:
RabbitMQ 最优秀的功能之一就是内建集群,这个功能设计的目的是容许消费者和生产者在节点崩溃的状况下继续运行,以及经过添加更多的节点来线性扩展消息通讯吞吐量。RabbitMQ 内部利用 Erlang 提供的分布式通讯框架 OTP 来知足上述需求,使客户端在失去一个 RabbitMQ 节点链接的状况下,仍是可以从新链接到集群中的任何其余节点继续生产、消费消息。
RabbitMQ 会始终记录如下四种类型的内部元数据:
在单一节点中,RabbitMQ 会将全部这些信息存储在内存中,同时将标记为可持久化的队列、交换器、绑定存储到硬盘上。存到硬盘上能够确保队列和交换器在节点重启后可以重建。而在集群模式下一样也提供两种选择:存到硬盘上(独立节点的默认设置),存在内存中。
若是在集群中建立队列,集群只会在单个节点而不是全部节点上建立完整的队列信息(元数据、状态、内容)。结果是只有队列的全部者节点知道有关队列的全部信息,所以当集群节点崩溃时,该节点的队列和绑定就消失了,而且任何匹配该队列的绑定的新消息也丢失了。还好RabbitMQ 2.6.0以后提供了镜像队列以免集群节点故障致使的队列内容不可用。
RabbitMQ 集群中能够共享 user、vhost、exchange等,全部的数据和状态都是必须在全部节点上复制的,例外就是上面所说的消息队列。RabbitMQ 节点能够动态的加入到集群中。
当在集群中声明队列、交换器、绑定的时候,这些操做会直到全部集群节点都成功提交元数据变动后才返回。集群中有内存节点和磁盘节点两种类型,内存节点虽然不写入磁盘,可是它的执行比磁盘节点要好。内存节点能够提供出色的性能,磁盘节点能保障配置信息在节点重启后仍然可用,那集群中如何平衡这二者呢?
RabbitMQ 只要求集群中至少有一个磁盘节点,全部其余节点能够是内存节点,当节点加入火离开集群时,它们必需要将该变动通知到至少一个磁盘节点。若是只有一个磁盘节点,恰好又是该节点崩溃了,那么集群能够继续路由消息,但不能建立队列、建立交换器、建立绑定、添加用户、更改权限、添加或删除集群节点。换句话说集群中的惟一磁盘节点崩溃的话,集群仍然能够运行,但知道该节点恢复,不然没法更改任何东西。
若是是在一台机器上同时启动多个 RabbitMQ 节点来组建集群的话,只用上面介绍的方式启动第2、第三个节点将会由于节点名称和端口冲突致使启动失败。因此在每次调用 rabbitmq-server 命令前,设置环境变量 RABBITMQ_NODENAME 和 RABBITMQ_NODE_PORT 来明确指定惟一的节点名称和端口。下面的例子端口号从5672开始,每一个新启动的节点都加1,节点也分别命名为test_rabbit_一、test_rabbit_二、test_rabbit_3。
启动第1个节点:
RABBITMQ_NODENAME=test_rabbit_1 RABBITMQ_NODE_PORT=5672 ./sbin/rabbitmq-server -detached
启动第2个节点:
RABBITMQ_NODENAME=test_rabbit_2 RABBITMQ_NODE_PORT=5673 ./sbin/rabbitmq-server -detached
启动第2个节点前建议将 RabbitMQ 默认激活的插件关掉,不然会存在使用了某个插件的端口号冲突,致使节点启动不成功。
如今第2个节点和第1个节点都是独立节点,它们并不知道其余节点的存在。集群中除第一个节点外后加入的节点须要获取集群中的元数据,因此要先中止 Erlang 节点上运行的 RabbitMQ 应用程序,并重置该节点元数据,再加入而且获取集群的元数据,最后从新启动 RabbitMQ 应用程序。
中止第2个节点的应用程序:
./sbin/rabbitmqctl -n test_rabbit_2 stop_app
重置第2个节点元数据:
./sbin/rabbitmqctl -n test_rabbit_2 reset
第2节点加入第1个节点组成的集群:
./sbin/rabbitmqctl -n test_rabbit_2 join_cluster test_rabbit_1@localhost
启动第2个节点的应用程序
./sbin/rabbitmqctl -n test_rabbit_2 start_app
第3个节点的配置过程和第2个节点相似:
RABBITMQ_NODENAME=test_rabbit_3 RABBITMQ_NODE_PORT=5674 ./sbin/rabbitmq-server -detached ./sbin/rabbitmqctl -n test_rabbit_3 stop_app ./sbin/rabbitmqctl -n test_rabbit_3 reset ./sbin/rabbitmqctl -n test_rabbit_3 join_cluster test_rabbit_1@localhost ./sbin/rabbitmqctl -n test_rabbit_3 start_app
中止某个指定的节点,好比中止第2个节点:
RABBITMQ_NODENAME=test_rabbit_2 ./sbin/rabbitmqctl stop
查看节点3的集群状态:
./sbin/rabbitmqctl -n test_rabbit_3 cluster_status