分布式消息队列RabbitMQ

这篇文章简单讲述下分布式消息队列的基础知识,不会太深入,因为类似MQ这样的分布式组件有很多不同的种类,都有各自的特征和其对应的应用场景,需要在实际应用中才能更加深入的理解。

整篇文章按照,分布式消息队列基础知识->AMQP协议->RabbitMQ实例来概述。

1、分布式消息队列基础知识

消息
要想弄清楚分布式消息队列的具体意义,首先得从什么是消息入手。
消息即是信息的载体。为了让消息发送者和消息接收者都能够明白消息所承载的信息(消息发送者需要知道如何构造消息;消息接收者需要知道如何解析消息),它们就需要按照一种统一的格式描述消息,这种统一的格式称之为消息协议。所以,有效的消息一定具有某一种格式;而没有格式的消息是没有意义的。

消息通信
而消息从发送者到接收者的方式也有两种。一种我们可以称为即时消息通讯,也就是说消息从一端发出后(消息发送者)立即就可以达到另一端(消息接收者),这种方式的具体实现就是我们已经介绍过的RPC(当然单纯的http通讯也满足这个定义);另一种方式称为延迟消息通讯,即消息从某一端发出后,首先进入一个容器进行临时存储,当达到某种条件后,再由这个容器发送给另一端。 这个容器的一种具体实现就是消息队列。

消息中间件
消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋、消息通讯等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。
目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。

2、AMQP协议

背景
越是大型的公司越是不可避免的使用来自众多供应商的MQ产品,来服务企业内部的不同应用。如果应用已经订阅了TIBCO MQ信息,若突然需要消费来自IBM MQ的消息,则实现起来会非常困难。这些产品使用不同的api,不同的协议,因而毫无疑问无法联合起来组成单一的总线。为了解决这个问题,Java Message Service(JMS)在2001年诞生了。JMS试图通过提供公共java api的方式,隐藏单独MQ产品供应商提供的实际接口,从而跨越了壁垒和解决了互通问题。从技术上讲,java应用程序只需要对JMS API编程,选择合适的MQ驱动即可。JMS会打理好其他部分的。问题是你在尝试使用单独编准化接口来整合众多不同的接口。这就像是把不同的类型的衣服粘在一起:缝合处终究会裂开。使用JMS(Java Message Service)的应用程序会变得更加脆弱。我们需要新的消息通信标准化方案。

协议定义
高级消息队列协议(AMQP)是面向消息的中间件的开放标准应用层协议。 AMQP的特征是消息导向,排队,路由(包括点对点和发布和订阅),可靠性和安全性。

AMQP要求消息传递提供商和客户端的行为在不同供应商实现可互操作的情况下,以与SMTP,HTTP,FTP等相同的方式创建了可互操作的系统。 中间件的以前标准化发生在API级别(例如JMS),并且专注于使程序员与不同中间件实现的交互标准化,而不是提供多个实现之间(AMQP的实现)的互操作性。与定义API和消息传递实现必须提供的一组行为的JMS不同,AMQP是线级协议。 线级协议是以网络流作为字节流发送的数据格式的描述。 因此,无论实现语言如何,任何可以创建和解释符合此数据格式的消息的工具都可以与任何其他兼容工具进行互操作。

总结
AMQP是一个开放的面向消息中间件的协议,所有此协议的实现可以进行互相操作,无论实现语言如何,任何符合此协议的数据格式的消息工具都可以与任何其他兼容工具进行互操作。而以前JMS(Java Message Service),将不同的中间件的实现进行API层次的标准化。

3、RabbitMQ实例

前面讲到了MQ有多种实现方式,这里以RabbitMQ进行具体说明,因为RabbitMQ相对来说是对AMQP完全支持的一个MQ实现方式。

RabbitMQ定义
RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX,持久化。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

基本结构
在这里插入图片描述
基本组件
Broker:简单来说就是消息队列服务器实体。

Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

使用流程
消息队列的使用过程,如下:

(1)客户端连接到消息队列服务器,打开一个channel。
(2)客户端声明一个exchange,并设置相关属性。
(3)客户端声明一个queue,并设置相关属性。
(4)客户端使用routing key,在exchange和queue之间建立好绑定关系。
(5)客户端投递消息到exchange。
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。

交换器的绑定过程
队列通过路由键(routing key)绑定到交换器
四种类型交换器:

direct
服务器包含一个空白字符串名称的默认交换器,当声明一个队列时,它会自动绑定到默认交换器,并以队 列名称作为路由键。
fanout
这种类型的交换器会将收到的消息广播到绑定的队列上。例如一个web应用需要在用户上传新图片时,用 户相册必须清除缓存,同时用户应该得到积分奖励。这时可以将两个队列绑定到图片上传交换器上,一个 用于清除缓存,另一个用于增加用户积分。如果产品需要增加新功能时,就可以不修改原来代码,只需要 新声明一个队列并绑定到fanout交换器上。
topic可以使用通配符(*和#)
headers headers交换器和direct完全一致,但性能会差很多,因此并不实用。

多租户的隔离
每一个RabbitMQ服务器都能创建虚拟消息服务器,我们称之为虚拟主机(vhost),每个vhost本质上是一个mini版的RabbitMQ服务器,拥有自己的队列,交换器和绑定,已经自己的权限机制。vhost之于Rabbit就像虚拟机之于物理服务器一样:它既能将同一个Rabbit的众多客户区分开来,又可以避免队列和交换器的命名冲突。

当你在Rabbit里创建一个用户时,用户通常会被指派给至少一个vhost,并且只能访问被指派的vhost内的队列、交换器和绑定。

当在RabbitMQ集群上创建vhost时,整个集群上都会创建该vhost。

消息持久化
默认情况下,重启RabbitMQ服务器后,那些队列(连同里面的消息)和交换器都会消失,原因在于每个队列和交换器的durable属性(默认为false),将它设置为true,在崩溃或重启后就不需要重新创建队列(或者交换器)。但是这样不能保证消息幸免于重启。

如果消息要从Rabbit崩溃中恢复,那么消息必须:

把它的“投递模式”选项设置为2来把消息标记成持久化
它还必须被发布到持久化的交换器中
到达持久化的队列中

当发布一条持久性消息到持久交换器上时,Rabbit会在消息提交到持久化日志文件后才提交响应。之后这条消息如果路由到了非持久队列中,它会自动从持久性日志中移除,并且无法从服务器重启中恢复。一旦从持久化队列中消费了一条持久性消息的话(并且确认了它)。RabbitMQ会在持久化日志中把这条消息标记为等待垃圾收集。在消费持久性消息前如果RabbitMQ重启,服务器会自动重建交换器和队列(已经绑定),重播持久性日志文件中的消息到合适的队列或者交换器上(取决于Rabbit服务器宕机的时候,消息处在路由过程的哪个环节)。

可以通过其他方式保证消息投递,例如:生产者可以在单独的信道上监听应答队列,每次发送消息的时候,都包含应答队列的名称,这样消费者就可以回发应答到该队列以确认接收到了,如果消息应答未在合理时间内到达,生产者就重发消息。

由于发布操作不返回任何信息给生产者,生产者怎么知道服务器是否已经持久化了持久消息到硬盘呢?服务器可能会在把消息写入硬盘前就宕机了,消息因此而丢失,这就是事务发挥作用的地方。在AMQP中,把信道设置成事务模式,就可以通过信道发送那些想要确认的消息,之后还有多个其他AMQP命令,如果事务中首次发布成功,信道会在事务中完成其他AMQP命令,如果发送失败,其他AMQP命令将不会执行。

事务会降低大约2~10倍的消息吞吐量,而且会使生产者应用程序产生同步。

RabbitMQ有更好的方案来保证消息投递:发送方确认模式。告诉Rabbit将信道设置为confirm模式,而且只能通过重新创建信道来关闭该设置。**一旦信道进入confirm模式,所有在信道上发布的消息都会被指派一个唯一的ID号。一旦消息被投递给所有匹配的队列后,信道会发送一个发送方确认模式给生产者(包含消息的唯一ID),这使得生产者知晓消息已经安全到达目的队列。**如果消息和队列时可持久化的,那么确认消息只会在队列将消息写入磁盘后才会发出,

发送发确认模式的最大好处是异步,一旦发布一条消息,生产者就可以在等待确认的同时继续发送下一条,当确认消息收到时,生产者应用的回调方法就会被触发来处理该确认消息,如果Rabbit发生了内部错误导致消息丢失,Rabbit会发送一条nack(未确认)消息,就像发送确认消息那样,只不过这次说明的是消息已经丢失。由于没有消息回滚(和事务相比),发送法确认模式更加轻量级。

具体实例:以helloworld的生产者消费者来查看

public class RabbitMQConfig {
    public Queue helloQueue() {
        return new Queue("hello_queue");
    }public Queue userQueue() {
        return new Queue("user_queue");
    }public Queue queueMessage() {
        return new Queue("topicMessage_queue");
    }public Queue queueMessages() {
        return new Queue("topicMessages_queue");
    }public Queue AMessage() {
        return new Queue("fanoutA_queue");
    }public Queue BMessage() {
        return new Queue("fanoutB_queue");
    }public Queue CMessage() {
        return new Queue("fanoutC_queue");
    }
​
    DirectExchange directExchange() {
        return new DirectExchange("directExchange");
    }
​
    TopicExchange topicExchange() {
        return new TopicExchange("topicExchange");
    }
    
   FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }/** * 将userQueue()和directExchange()通过"user"绑定 * @return */

    Binding bindingDirectExchange() {
        return BindingBuilder.bind(userQueue()).to(directExchange()).with("user");
    }/** * 将队列topic.message与exchange绑定,binding_key为topic.message,就是完全匹配 * @return */

    Binding bindingExchangeMessage() {
        return BindingBuilder.bind(queueMessage()).to(topicExchange()).with("topic.message");
    }/** * 将队列topic.messages与exchange绑定,binding_key为topic.#,模糊匹配 * @return */

    Binding bindingExchangeMessages() {
        return BindingBuilder.bind(queueMessages()).to(topicExchange()).with("topic.#");
    }
​

    Binding bindingExchangeA() {
        return BindingBuilder.bind(AMessage()).to(fanoutExchange());
    }
​

    Binding bindingExchangeB() {
        return BindingBuilder.bind(BMessage()).to(fanoutExchange());
    }
​

    Binding bindingExchangeC() {
        return BindingBuilder.bind(CMessage()).to(fanoutExchange());
    }
}

/** * 生产者 */

public class HelloSender1 {
    private final static Logger LOGGER = LogManager.getLogger(HelloSender1.class);
    private AmqpTemplate rabbitTemplate;/** * direct类型的交换器:通过默认的exchange,routing_key就是queueName:“hello_queue” */
    public void send() {
        String sendMsg = "hello1 " + new Date();
        LOGGER.info("Sender1 : {}", sendMsg);
        this.rabbitTemplate.convertAndSend("hello_queue", sendMsg);
    }/** * direct类型的交换器:通过directExchange,routing_key就是"user" * @param age */
    public void send(int age) {
        User user = new User();
        user.setAge(age);
        user.setName("张三");
        user.setGender("male");
        user.setBirthday(new Date());
        LOGGER.info("Sender1 : {}", user);
        this.rabbitTemplate.convertAndSend("directExchange", "user", user);
    }
}

/** * 消费者 */
public class HelloReceiver1 {
    private final static Logger LOGGER = LogManager.getLogger(HelloReceiver1.class);
​
    @RabbitHandler
    @RabbitListener(queues = "hello_queue")
    public void process(String text) {
        LOGGER.info("Receiver2 : {}", text);
    }
​
    @RabbitHandler
    @RabbitListener(queues = "user_queue")
    public void process(User user) {
        LOGGER.info("Receiver2 : {}", user);
    }
}

参考博客:

http://www.javashuo.com/article/p-brfvlvtg-gn.html
http://www.javashuo.com/article/p-koikzrfw-hq.html
https://kb.cnblogs.com/page/537753/ 《RabbitMQ实战》