做为中间件的杠把子选手,rabbimq
在系统架构中承担着承上启下的做用,常问到,大家为什么选用rabbimq?则答曰,为了削峰填谷,为了系统解耦合,为了提升系统性能。但这事是绝对的吗?用了这款软件就能够实现这个目的吗?java
RabbitMQ 是一个由Erlang
语言开发的AMQP
的开源实现。rabbitMQ是一款基于AMQP协议的消息中间件,它可以在应用之间提供可靠的消息传输。在易用性,扩展性,高可用性上表现优秀。使用消息中间件利于应用之间的解耦,生产者(客户端)无需知道消费者(服务端)的存在。并且两端可使用不一样的语言编写,大大提供了灵活性。编程
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。后端
消息是贯穿这个这款中间件服务的脉络,咱们不妨经过一条消息来推演窥探整个rabbimq
的设计思想,站在前人的肩膀上,看看这款软件的先进设计。安全
即信息,生产者产生的数据,这些数据记录着生产端产生的业务日志,将会被投递到后端进行处理。服务器
消息队列(MQ)
全称为Message Queue
,是一种应用程序对应用程序的通讯方法。说人话就是,在应用之间放一个组件组件,以后应用双方经过这个消息组件进行通讯。网络
本次将经过消息的生命周期的回顾来窥探这款服务的细节多线程
rabbimq的每一次迭代都离不开三个核心概念,生产者producer
,消费者comsumer
,服务端broker
,随着系统的迭代增长了集群的概念,以及后续的容灾机制。架构
消息的产生离不开生产者producer
,其发送发生很简单,伪代码以下app
# 消息发送方法 # messageBody 消息体 # exchangeName 交换器名称 # routingKey 路由键 publishMsg(messageBody,exchangeName,routingKey){ ...... } # 消息发送 publishMsg("This is a warning log","exchange","log.warning");
至此,咱们一条消息发送成功了,但做为一名开发人员,咱们理应知道这条消息去到哪了?到底作了什么操做?后续将会遭遇到了什么?异步
在生产者和消费者之间其实包括了不少内容,咱们须要把前面的图进行更加深刻的展开,咱们一层一层拨开mq的心,你会发现,你会流泪
RabbitMQ Server
。namespace
。当多个不一样的用户使用同一个RabbitMQ server提供的服务时,能够划分出多个vhost
,每一个用户在本身的vhost
建立exchange/queue
等,他们之间互不影响,互相独立且隔离。publisher/consumer
和broker
之间的TCP链接。断开链接的操做只会在client端进行,Broker
不会断开链接,除非出现网络故障或broker
服务出现问题。Connection
,在消息量大的时候创建TCP Connection
的开销会比较大且效率也较低。Channel
是在connection
内部创建的逻辑链接,若是应用程序支持多线程,一般每一个thread
建立单独的channel
进行通信,AMQP method包含了channel id帮助客户端和message broker识别channel,因此channel之间是彻底隔离的。Channel做为轻量级的Connection极大减小了操做系统创建TCP connection的开销。exchange
和queue
之间的虚拟链接,binding
中能够包含routing key。Binding信息被保存到exchange中的查询表中,用于message的分发依据。看完了这些概念,我再给你们梳理一遍信息流:
当咱们的生产者端往Broker(RabbitMQ)
中推送消息,Broker
会根据其消息的标识送往不一样的Virtual host
,而后Exchange
会根据消息的路由key
和交换器类型将消息分发到本身所属的Queue
中去。
而后消费者端会经过Connection
中的Channel
获取刚刚推送的消息,拉取消息进行消费。
这里指的是交换机的类型
这里有三种交换机类型,也就是有三种路由模式
消费者就是消息的处理端,会主动从消息队列中拉取信息,释放消息队列中挤压的资源
默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取奇数序列的任务,消费者1去队列中获取偶数序列的任务。
对于消息消费而言,消费者直接指定要消费的队列便可,好比指定消费队列A的数据。
须要注意的是,在消费者消费完成数据后,返回给rabbimq ACK消息,rabbimq会删掉队列中的该条信息。
万物抱阳负阴,系统之间忽然加了个中间件,提升系统复杂度的同时也增长了不少问题:
上面前三个问题其实就是对投递模式的灵魂发问,也就是消息推送方知不知道数据已经推送,消息服务端在消息被拉取的时候有没有偏移量记录,消息消费端有没有拉取确认机制。固然了校验机制越复杂对于系统投递性能损耗就越严重,可靠性越强,效率就会相应的打折扣
这个是最简单的模式,也是效率最高的机制,相似于udp
,RabbitMQ默认发布消息是不会返回任何结果给生产者的,因此存在发送过程当中丢失数据的风险。
AMQP事务保证RabbitMQ不只收到了消息,并成功将消息路由到了全部匹配的订阅队列,AMQP事务将使得生产者和RabbitMQ产生同步。
虽然事务使得生产者能够肯定消息已经到达RabbitMQ中的对应队列,可是却会下降2~10倍的消息吞吐量。
开启发送方确认模式后,消息会有一个惟一的ID,一旦消息被投递给全部匹配的队列后,会回调给发送方应用程序(包含消息的惟一ID),使得生产者知道消息已经安全到达队列了。
若是消息和队列是配置成了持久化,这个确认消息只会在队列将消息写入磁盘后才会返回。若是RabbitMQ内部发生了错误致使这条消息丢失,那么RabbitMQ会发送一条nack消息,固然我理解这个是不能保证的。
这种模式因为不存在事务回滚,同时总体仍然是一个异步过程,因此更加轻量级,对服务器性能的影响很小。
那么问题来了,rabbimq的消息是以什么样的形式存储。默认条件下消息是存储在内存中,不止是消息,Exchange路由等元数据信息实际都在内存中。
具体的元数据信息:
队列元数据:队列名称和属性
交换器元数据:交换器名称、类型和属性
绑定元数据:路由信息
内存的优势是高性能,问题在于故障后没法恢复。都已经2021年,RabbitMQ必然也支持持久化的存储,也就是写磁盘。
实现消息队列持久化的建议同时知足如下三个条件
效果:
Exchange
,Bindings
和Queue
,同时经过重播持久化日志来恢复消息消息是否为持久化那还要看消息的持久化设置。也就是说,重启服务以前那个queue
里面尚未发出去的消息的话,重启以后那队列里面是否是还存在原来的消息,这个就要取决于发生着在发送消息时对消息的设置了。
若是要在重启后保持消息的持久化必须设置消息是持久化的标识。
设置消息的持久化:
channel.basicPublish("exchange.persistent", "persistent", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_test_message".getBytes());
这里的关键是:MessageProperties.PERSISTENT_TEXT_PLAIN
首先看一下basicPublish
的方法:
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException; void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException; void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
true
时,若是exchange
根据自身类型和消息routeKey
没法找到一个符合条件的queue
,那么会调用basic.return
方法将消息返回给生产者(Basic.Return + Content-Header + Content-Body)
;false
时,出现上述情形broker
会直接将消息扔掉。queue
上有消费者,则立刻将消息投递给它,若是全部queue
都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。
true
时,若是exchange
在将消息路由到queue(s)
时发现对于的queue
上么有消费者,那么这条消息不会放入队列中。当与消息routeKey
关联的全部queue(一个或者多个)
都没有消费者时,该消息会经过basic.return
方法返还给生产者。这里关键的是BasicProperties props
这个参数了,这里看下BasicProperties
的定义:
public BasicProperties( String contentType,//消息类型如:text/plain String contentEncoding,//编码 Map<String,Object> headers, Integer deliveryMode,//1:nonpersistent 2:persistent Integer priority,//优先级 String correlationId, String replyTo,//反馈队列 String expiration,//expiration到期时间 String messageId, Date timestamp, String type, String userId, String appId, String clusterId)
这里的deliveryMode=1
表明不持久化,deliveryMode=2
表明持久化。
上面的实现代码使用的是MessageProperties.PERSISTENT_TEXT_PLAIN
,那么这个又是什么呢?
public static final BasicProperties PERSISTENT_TEXT_PLAIN = new BasicProperties("text/plain", null, null, 2, 0, null, null, null, null, null, null, null null, null);
能够看到这其实就是讲deliveryMode
设置为2的BasicProperties
的对象,为了方便编程而出现的一个东东。
换一种实现方式:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.deliveryMode(2); AMQP.BasicProperties properties = builder.build(); channel.basicPublish("exchange.persistent", "persistent",properties, "persistent_test_message".getBytes());
设置了队列和消息的持久化以后,当broker
服务重启的以后,消息依旧存在。单只设置队列持久化,重启以后消息会丢失;单只设置消息的持久化,重启以后队列消失,既而消息也丢失。单单设置消息持久化而不设置队列的持久化显得毫无心义。
不设置Exchange
的持久化对消息的可靠性来讲没有什么影响,可是一样若是Exchange
不设置持久化,那么当broker
服务重启以后,Exchange
将不复存在,那么既而发送方rabbitmq producer就没法正常发送消息。这里笔者建议,一样设置Exchange
的持久化。Exchange
的持久化设置也特别简单,方法以下:
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException; Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException; Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException; Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException; void exchangeDeclareNoWait(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException; Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;
通常只须要:channel.exchangeDeclare(exchangeName, “direct/topic/header/fanout”, true);
即在声明的时候讲durable
字段设置为true
便可。
若是将Queue
的持久化标识durable
设置为true
,则表明是一个持久的队列,那么在服务重启以后,也会存在,由于服务会把持久化的Queue
存放在硬盘上,当服务重启的时候,会从新什么以前被持久化的Queue
。
Queue
的持久化是经过durable=true
来实现的。
通常程序中这么使用:
Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("queue.persistent.name", true, false, false, null);
关键的是第二个参数设置为true
,即durable=true
.
Channel
类中queueDeclare
的完整定义以下:
/** * Declare a queue * @see com.rabbitmq.client.AMQP.Queue.Declare * @see com.rabbitmq.client.AMQP.Queue.DeclareOk * @param queue the name of the queue * @param durable true if we are declaring a durable queue (the queue will survive a server restart) * @param exclusive true if we are declaring an exclusive queue (restricted to this connection) * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) * @param arguments other properties (construction arguments) for the queue * @return a declaration-confirm method to indicate the queue was successfully declared * @throws java.io.IOException if an error is encountered */ Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
参数说明:
Queue.DeclareOk queueDeclare() throws IOException; Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException; void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException; Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
其中须要说明的是queueDeclarePassive(String queue)
能够用来检测一个queue
是否已经存在。若是该队列存在,则会返回true
;若是不存在,就会返回异常,可是不会建立新的队列。
将queue
,exchange
,message
等都设置了持久化以后就能保证100%保证数据不丢失了吗?
答案是否认的。
首先,从consumer
端来讲,若是这时autoAck=true
,那么当consumer
接收到相关消息以后,还没来得及处理就crash
掉了,那么这样也算数据丢失,这种状况也好处理,只需将autoAck
设置为false
(方法定义以下),而后在正确处理完消息以后进行手动ack(channel.basicAck)
.
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
其次,关键的问题是消息在正确存入RabbitMQ以后,还须要有一段时间(这个时间很短,但不可忽视)才能存入磁盘之中,RabbitMQ并非为每条消息都作fsync的处理,可能仅仅保存到cache中而不是物理磁盘上,在这段时间内RabbitMQ broker发生crash, 消息保存到cache可是还没来得及落盘,那么这些消息将会丢失。
那么这个怎么解决呢?
首先能够引入RabbitMQ的mirrored-queue即镜像队列,这个至关于配置了副本,当master在此特殊时间内crash掉,能够自动切换到slave,这样有效的保障了HA, 除非整个集群都挂掉,这样也不能彻底的100%保障RabbitMQ不丢消息,但比没有mirrored-queue的要好不少,不少现实生产环境下都是配置了mirrored-queue的,关于镜像队列的,咱们后续展开讨论。还有要在producer引入事务机制或者Confirm机制来确保消息已经正确的发送至broker端,有关RabbitMQ的事务机制或者Confirm机制读者们有兴趣的话,请留言,咱们再详细展开讨论。
幸好本章节的主题是讨论RabbitMQ的持久化而不是可靠性,否则就一发不可收拾了。RabbitMQ的可靠性涉及producer端的确认机制、broker端的镜像队列的配置以及consumer端的确认机制,要想确保消息的可靠性越高,那么性能也会随之而降,鱼和熊掌不可兼得,关键在于选择和取舍。
当RabbitMQ出现内存(默认是0.4)或者磁盘资源达到阈值时,会触发流控机制,阻塞Producer的Connection,让生产者不能继续发送消息,直到内存或者磁盘资源获得释放。
RabbitMQ基于Erlang/OTP开发,一个消息的生命周期中,会涉及多个进程间的转发,这些Erlang进程之间不共享内存,每一个进程都有本身独立的内存空间,若是没有合适的流控机制,可能会致使某个进程占用内存过大,致使OOM。所以,要保证各个进程占用的内容在一个合理的范围,RabbitMQ的流控采用了一种信用证机制(Credit),为每一个进程维护了四类键值对
A进程当前能够发送给B的消息有100条,每发一次,值减1,直到为0,A才会被Block住。B消费消息后,会给A增长新的Credit,这样A才能够持续的发送消息。这里只画了两个进程,多进程串联的状况下,这中影响也就是从底向上传递的
rabbimq的集群设计起来,多是历史缘由致使,我的感受是不够先进,毕竟当年开发的时候,也没有相关的业务需求推进啊
RabbitMQ 会将元数据存储到内存上,若是是磁盘节点,还会存储到磁盘上。
队列A的实例实际只在一个RabbitMQ节点上,其它节点实际存储的是指向该队列的指针。虽然RabbitMQ的队列实际只会在一个节点上,但元数据能够存在各个节点上。举个例子来讲,当建立一个新的交换器时,RabbitMQ会把该信息同步到全部节点上,这个时候客户端无论链接的那个RabbitMQ节点,均可以访问到这个新的交换器,也就能找到交换器下的队列
在《RabbitMQ实战指南》中朱忠华老师的观点是
Partition
到各个节点上,这样才能真正达到线性扩容的目的。这个实际上是后续对站着学习kafka
的时候触发的感受镜像队列,本质上就是副本机制
RabbitMQ本身也考虑到了咱们以前分析的单节点长时间故障没法恢复的问题,因此RabbitMQ 2.6.0以后它也支持了镜像队列,除了发送消息,全部的操做实际都在主拷贝上,从拷贝实际只是个冷备(默认的状况下全部RabbitMQ节点上都会有镜像队列的拷贝),若是使用消息确认模式,RabbitMQ会在主拷贝和从拷贝都安全的接受到消息时才通知生产者。
从这个结构上来看,若是从拷贝的节点挂了,实际没有任何影响,若是主拷贝挂了,那么会有一个重新选主的过程,这也是镜像队列的优势,除非全部节点都挂了,才会致使消息丢失。从新选主后,RabbitMQ会给消费者一个消费者取消通知(Consumer Cancellation),让消费者重连新的主拷贝。
Confirm
消息等AMQQueue
调用的接口,完成消息的存储和持久化工做,由Q1,Q2,Delta,Q3,Q4五个子队列构成,在Backing中,消息的生命周期有四个状态:
这里以持久化消息为例(能够看到非持久化消息的生命周期会简单不少),从Q1到Q4,消息实际经历了一个RAM->DISK->RAM
这样的过程,BackingQueue
这么设计的目的有点相似于Linux的Swap
,当队列负载很高时,经过将部分消息放到磁盘上来节省内存空间,当负载下降时,消息又从磁盘回到内存中,让整个队列有很好的弹性。
全部对镜像队列主拷贝的操做,都会经过Guarented Multicasting(GM)同步到各个Salve节点,Coodinator负责组播结果的确认。GM是一种可靠的组播通讯协议,保证组组内的存活节点都收到消息。
至于说master和slave之间的关系应该是以下图所示
GM的组播并非由Master
节点来负责通知全部Slave
的(目的是为了不Master
压力过大,同时避免Master
失效致使消息没法最终Ack
),RabbitMQ把一个镜像队列的全部节点组成一个链表,由主拷贝发起,由主拷贝最终确认通知到了全部的Slave
,而中间由Slave
接力的方式进行消息传播。从这个结构来看,消息完成整个镜像队列的同步耗时理论上是不低的,可是因为RabbitMQ消息的消息确认自己是异步的模式,因此总体的吞吐量并不会受到太大影响。
镜像队列(副本)的引入其实就是对Rabbimq的高可用性的补充,从实际结果看,RabbitMQ完成设计目标上并不十分出色,主要缘由在于默认的模式下,RabbitMQ的队列实例只存在在一个节点上(虽而后续也支持了镜像队列),既不能保证该节点崩溃的状况下队列还能够继续运行,也不能线性扩展该队列的吞吐量。
至此咱们能够看出rabbimq的一个发展脉络,在古早时代,其推送消息相似udp送过去就送过去了,以后无论了,以后需求倒逼架构改进,要求有可靠性投递引入了确认(ack)机制。随着技术的进步,大规模节点的推广,引入集群的,出现副本,然而集群的并不完美
曾经有位大佬跟我分享过学生时代有个常见的心态:锤子心态即手中有个锤子,看到啥都是钉子,都想锤一下。咱们学习了rabbimq,确实是想用起来但不是啥时候都能用,这个要具体问题具体分析