RabbitMQ 是一个由erlang语言编写的、开源的、在AMQP基础上完整的、可复用的企业消息系统。支持多种语言,包括java、Python、ruby、PHP、C/C++等。java
AMQP:advanced message queuing protocol ,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息并不受客户端/中间件不一样产品、不一样开发语言等条件的限制。算法
AMQP模型图 spring
发布者(Publisher)发布消息(Message),经由交换机(Exchange)。json
交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)。缓存
最后 AMQP 代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。安全
一、发布者、交换机、队列、消费者均可以有多个。同时由于 AMQP 是一个网络协议,因此这个过程当中的发布者,消费者,消息代理 能够分别存在于不一样的设备上。springboot
二、发布者发布消息时能够给消息指定各类消息属性(Message Meta-data)。有些属性有可能会被消息代理(Brokers)使用,然而其余的属性则是彻底不透明的,它们只能被接收消息的应用所使用。ruby
三、从安全角度考虑,网络是不可靠的,又或是消费者在处理消息的过程当中意外挂掉,这样没有处理成功的消息就会丢失。基于此缘由,AMQP 模块包含了一个消息确认(Message Acknowledgements)机制:当一个消息从队列中投递给消费者后,不会当即从队列中删除,直到它收到来自消费者的确认回执(Acknowledgement)后,才彻底从队列中删除。服务器
四、在某些状况下,例如当一个消息没法被成功路由时(没法从交换机分发到队列),消息或许会被返回给发布者并被丢弃。或者,若是消息代理执行了延期操做,消息会被放入一个所谓的死信队列中。此时,消息发布者能够选择某些参数来处理这些特殊状况。网络
交换机是用来发送消息的 AMQP 实体。交换机拿到一个消息以后将它路由给一个或零个队列。它使用哪一种路由算法是由交换机类型和绑定(Bindings)规则所决定的。常见的交换机有以下几种:
AMQP 中的队列(queue)跟其余消息队列或任务队列中的队列是很类似的:它们存储着即将被应用消费掉的消息。队列跟交换机共享某些属性,可是队列也有一些另外的属性。
rabbitmq遵循AMQP协议,用在实时的对可靠性要求比较高的消息传递上。kafka主要用于处理活跃的流式数据,大数据量的数据处理上。主要体如今:
RabbitMQ经常使用的Exchange Type有fanout、direct、topic、headers这四种。
direct类型的Exchange路由规则很简单,它会把消息路由到那些binding key与routing key彻底匹配的Queue中。
前面讲到direct类型的Exchange路由规则是彻底匹配binding key与routing key,但这种严格的匹配方式在不少状况下不能知足实际业务需求。topic类型的Exchange与direct类型的Exchage类似,也是将消息路由到binding key与routing key相匹配的Queue中,但支持模糊匹配:
fanout类型的Exchange路由规则很是简单,它会把全部发送到fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的全部Queue上。
Fanout Exchange 不须要处理RouteKey 。只须要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的全部队列上。相似子网广播,每台子网内的主机都得到了一份复制的消息。因此,Fanout Exchange 转发消息是最快的。
headers类型的Exchange也不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否彻底匹配Queue与Exchange绑定时指定的键值对;若是彻底匹配则消息会路由到该Queue,不然不会路由到该Queue。
严格来讲,Default Exchange 并不该该和上面四个交换机在一块儿,由于它不属于独立的一种交换机类型,而是属于Direct Exchange 直连交换机。
默认交换机(default exchange)其实是一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。
它有一个特殊的属性使得它对于简单应用特别有用处:那就是每一个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。
举个例子:当你声明了一个名为 “search-indexing-online” 的队列,AMQP 代理会自动将其绑定到默认交换机上,绑定(binding)的路由键名称也是为 “search-indexing-online”。因此当你但愿将消息投递给“search-indexing-online”的队列时,指定投递信息包括:交换机名称为空字符串,路由键为“search-indexing-online”便可。
所以 direct exchange 中的 default exchange 用法,体现出了消息队列的 point to point,感受像是直接投递消息给指定名字的队列。
虽然咱们要避免系统宕机,可是这种“不可抗力”总会有可能发生。rabbitmq若是宕机了,再启动即是了,大不了有短暂时间不可用。但若是你启动起来后,发现这个rabbitmq服务器像是被重置了,之前的exchange,queue和message数据都没了,那就太使人崩溃了。不光业务系统由于无对应exchange和queue受影响,丢失的不少message数据更是致命的。因此如何保证rabbitmq的持久化,在服务使用前必须得考虑到位。
持久化能够提升RabbitMQ的可靠性,以防在异常状况(重启、关闭、宕机等)下的数据丢失。RabbitMQ的持久化分为三个部分:交换器的持久化、队列的持久化和消息的持久化。
exchange交换器的持久化是在声明交换器的时候,将durable设置为true。
若是交换器不设置持久化,那么在RabbitMQ交换器服务重启以后,相关的交换器信息会丢失,不过消息不会丢失,可是不能将消息发送到这个交换器。
spring中建立exchange时,构造方法默认设置为持久化。
队列的持久化在声明队列的时候,将durable设置为true。
若是队列不设置持久化,那么RabbitMQ交换器服务重启以后,相关的队列信息会丢失,同时队列中的消息也会丢失。
exchange和queue,若是一个是非持久化,另外一个是持久化,中bind时会报错。
spring中建立exchange时,构造方法默认设置为持久化。
要确保消息不会丢失,除了设置队列的持久化,还须要将消息设置为持久化。经过将消息的投递模式(BasicProperties中的deliveryMode属性)设置为2便可实现消息的持久化。
若是将全部的消息都进行持久化操做,这样会影响RabbitMQ的性能。写入磁盘的速度比写入内存的速度慢很,因此要在可靠性和吞吐量之间作权衡。
在spring中,BasicProperties中的deliveryMode属性,对应的是MessageProperties中的deliveryMode。平时使用的RabbitTemplate.convertAndSend()方法默认设置为持久化,deliveryMode=2。若是须要设置非持久化发送消息,须要手动设置:
messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
这里讲解实现消息持久化的完整方案。
1、exchange、queue、message
要保证消息的持久化,在rabbitmq自己的结构上须要实现下面这些:
2、发布确认
前面是保证了消息在投递到rabbitmq中,如何保证rabbit中消息的持久化。
那么还须要保证生产者能成功发布消息,如交换机名字写错了等等。能够在发布消息时设置投递成功的回调,肯定消息能成功投递到目标队列中。
3、接收确认
对于消费者来讲,若是在订阅消息的时候,将autoAck设置为true,那么消费者接收到消息后,尚未处理,就出现了异常挂掉了,此时,队列中已经将消息删除,消费者不可以在收到消息。
这种状况能够将autoAck设置为false,进行手动确认。
4、镜像队列集群
在持久化后的消息存入RabbitMQ以后,还须要一段时间才能存入磁盘。RabbitMQ并不会为每条消息都进行同步存盘,可能仅仅是保存到操做系统缓存之中而不是物理磁盘。若是在这段时间,服务器宕机或者重启,消息还没来得及保存到磁盘当中,从而丢失。对于这种状况,能够引入RabiitMQ镜像队列机制。
这里强调是镜像队列集群,而非普通集群。由于出于同步效率考虑,普通集群只会同步队列的元数据,而不会同步队列中的消息。只有升级成镜像队列集群后,才能也同步消息。
每一个镜像队列由一个master和一个或多个mirrors组成。主节点位于一个一般称为master的节点上。每一个队列都有本身的主节点。给定队列的全部操做首先应用于队列的主节点,而后传播到镜像。这包括队列发布(enqueueing publishes)、向消费者传递消息、跟踪消费者的确认等等。
发布到队列的消息将复制到全部镜像。无论消费者链接到哪一个节点,都会链接到master,镜像会删除在master上已确认的消息。所以,队列镜像提升了可用性,但不会在节点之间分配负载。 若是承载队列master的节点出现故障,则最旧的镜像将升级为新的master,只要它已同步。根据队列镜像参数,也能够升级未同步的镜像。
java开发上,这里以spring-boot-starter-amqp
为例,记录在springboot中使用rabbitmq的一些关注点。pom.xml中引用为:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
一个简单的示例,仅限于文本消息的发布和接收。
ProducerController.java
@RestController public class ProducerController { private static final String HEADER_KEY_UID="uid"; @Autowired private ProducerService producerService; @PostMapping("/sendText") public void sendText(@RequestParam("uid")String uid,@RequestParam("msg")String msg){ MessageProperties messageProperties=new MessageProperties(); messageProperties.setHeader(HEADER_KEY_UID,uid); producerService.sendText(msg,messageProperties); } }
ProducerService.java
@Service public class ProducerService { private static final String EXCHANGE_NAME="direct.exchange.a"; private static final String ROUTING_KEY_NAME="direct.routingKey.a"; @Resource private RabbitTemplate rabbitTemplate; /** * 发送 消息文本 * @param data 文本消息 * @param messageProperties 消息属性 */ public void sendText(String data, MessageProperties messageProperties) { Message message = rabbitTemplate.getMessageConverter().toMessage(data, messageProperties); rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY_NAME, message); } }
消息发送的经常使用方法:
MessageListener.java
import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component @Slf4j public class MessageListener { @Autowired private RabbitTemplate rabbitTemplate; @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "direct.queue.d", durable = "true"), exchange = @Exchange(value = "direct.exchange.a", durable = "true", type = ExchangeTypes.DIRECT, ignoreDeclarationExceptions = "true"), key = "direct.routingKey.a" ) ) @RabbitHandler public void onMessage(Message message, Channel channel) throws Exception { MessageConverter messageConverter = rabbitTemplate.getMessageConverter(); String msg = (String) messageConverter.fromMessage(message); log.info("消费端 Body: " + msg); } }
rabbitmq中消息的序列化依赖于MessageConvert,这是一个接口,用于消息内容的序列化。
RabbitConfig.java
public class RabbitConfig { @Bean("jsonRabbitTemplate") public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); return rabbitTemplate; } @Bean("defaultRabbitTemplate") public RabbitTemplate defaultRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory); return rabbitTemplate; } }
ProducerService.java
@Service public class ProducerService { private static final String EXCHANGE_NAME="direct.exchange.a"; private static final String ROUTING_KEY_NAME="direct.routingKey.a"; @Resource(name = "defaultRabbitTemplate") private RabbitTemplate defaultRabbitTemplate; @Resource(name = "jsonRabbitTemplate") private RabbitTemplate jsonRabbitTemplate; /** * 发送 消息对象 json * * @param data * @param messageProperties */ public void sendObject(Object data, MessageProperties messageProperties) { Message message = jsonRabbitTemplate.getMessageConverter().toMessage(data, messageProperties); jsonRabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY_NAME, message); } /** * 发送 消息文本 * * @param data * @param messageProperties */ public void sendText(String data, MessageProperties messageProperties) { Message message = defaultRabbitTemplate.getMessageConverter().toMessage(data, messageProperties); defaultRabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY_NAME, message); } }
MessageListener.java
@Component @Slf4j public class MessageListener { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private ObjectMapper objectMapper; @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "direct.queue.d", durable = "false"), exchange = @Exchange(value = "direct.exchange.a", durable = "true", type = ExchangeTypes.DIRECT, ignoreDeclarationExceptions = "true"), key = "direct.routingKey.a" ) ) @RabbitHandler public void onMessage(Message message, Channel channel) throws Exception { String contentType = message.getMessageProperties().getContentType(); String bodyText = null; System.out.println(contentType); switch (contentType) { //字符串 case MessageProperties.CONTENT_TYPE_TEXT_PLAIN: bodyText = (String) rabbitTemplate.getMessageConverter().fromMessage(message); break; //json对象 case MessageProperties.CONTENT_TYPE_JSON: User user = objectMapper.readValue(message.getBody(), User.class); bodyText = user.toString(); break; } log.info("消费端Payload: " + bodyText); } }
生产者发送对象消息时,咱们使用Jackson2JsonMessageConverter,并用其toMessage方法封装。可是在消费者接收对象消息时,咱们却没有用Jackson2JsonMessageConverter的fromMessage方法,而是使用ObjectMapper来反序列化Json对象。是由于rabbitmq在发送Jackson2JsonMessageConverter的序列化对象时,会在包含类的包名信息,消费者在使用fromMessage反序列化时,必须建立一个和生产者中包名等如出一辙的类。明显不太现实。
ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调。
使用方式在于:
ProducerService.java
@Slf4j @Service public class ProducerService { private static final String EXCHANGE_NAME = "direct.exchange.a"; private static final String ROUTING_KEY_NAME = "direct.routingKey.ab"; @Resource(name = "defaultRabbitTemplate") private RabbitTemplate defaultRabbitTemplate; /** * ConfirmCallback * * 投递对象:exchange * 回调触发:不管成功或失败,都会触发回调。 * 投递成功:ack=true * 投递失败:ack=false */ RabbitTemplate.ConfirmCallback confirmCallback = (CorrelationData correlationData, boolean ack, String cause) -> { log.info("ack: " + ack); if (!ack) { log.info("投递exchange失败!....能够进行日志记录、异常处理、补偿处理等"); } else { log.info("投递exchange成功!"); } }; /** * 发送 消息文本 * * @param data * @param messageProperties */ public void sendText(String data, MessageProperties messageProperties) { Message message = defaultRabbitTemplate.getMessageConverter().toMessage(data, messageProperties); //confirmCallback defaultRabbitTemplate.setConfirmCallback(confirmCallback); defaultRabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY_NAME, message); } }
配置文件须要设置:
spring.rabbitmq.publisher-confirm-type = correlated
ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调。
使用方式在于:
ProducerService.java
@Slf4j @Service public class ProducerService { private static final String EXCHANGE_NAME = "direct.exchange.a"; private static final String ROUTING_KEY_NAME = "direct.routingKey.ab"; @Resource(name = "defaultRabbitTemplate") private RabbitTemplate defaultRabbitTemplate; /** * ReturnCallback * * 投递对象:queue * 回调触发:只有投递失败,才会触发回调。 */ RabbitTemplate.ReturnCallback returnCallback = (Message message, int replyCode, String replyText, String exchange, String routingKey) -> { log.info("投递到queue失败! exchange: " + exchange + ", routingKey: " + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText); }; /** * 发送 消息文本 * * @param data * @param messageProperties */ public void sendText(String data, MessageProperties messageProperties) { Message message = defaultRabbitTemplate.getMessageConverter().toMessage(data, messageProperties); //returnCallback defaultRabbitTemplate.setMandatory(true); defaultRabbitTemplate.setReturnCallback(returnCallback); defaultRabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY_NAME, message); } }
须要在配置文件中配置:
spring.rabbitmq.publisher-returns = true
上一节讲解的是,如何在生产者发布消息时,确认消息发布到rabbitmq的交换机和队列中。那么这一节讲解的是,如何保障消费者能彻底“消费”了消息。
一般状况下,rabbitmq做为消息中间件,它把message推送给消费者就完成了它的使命,该message就自动被“签收”了。而消费者在接收到message后,再去实现关于该message的业务逻辑。可若是在实现该业务逻辑过程当中发生了错误,须要从新执行,那就难办了。由于message一旦被“签收”后,就从rabbitmq中被删除,不可能从新再发送。
若是消费者能手动控制message的“签收”操做,只有当关于message的业务逻辑执行完成后再“签收”,message再从rabbitmq中删除,不然可让message重发就行了。这一节就讲这个。
Acknowledge意思是“确认”,消息经过 ACK 确认是否被正确接收,每一个 Message 都要被确认(acknowledged),能够手动去 ACK 或自动 ACK。
使用手动应答消息,有一点须要特别注意,那就是不能忘记应答消息,由于对于RabbitMQ来讲处理消息没有超时,只要不该答消息,他就会认为仍在正常处理消息,致使消息队列出现阻塞,影响业务执行。若是不想处理,能够reject丢弃该消息。
消息确认模式有:
默认是自动确认,能够经过RabbitListenerContainerFactory 中进行开启手动ack,或者中配置文件中开启:
spring.rabbitmq.listener.simple.acknowledge-mode = manual
MessageListener.java
@Component @Slf4j public class MessageListener { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private ObjectMapper objectMapper; @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "direct.queue.d", durable = "false"), exchange = @Exchange(value = "direct.exchange.a", durable = "true", type = ExchangeTypes.DIRECT, ignoreDeclarationExceptions = "true"), key = "direct.routingKey.a" ) ) @RabbitHandler public void onMessage(Message message, Channel channel) throws Exception { String contentType = message.getMessageProperties().getContentType(); String bodyText = null; System.out.println(contentType); switch (contentType) { //字符串 case MessageProperties.CONTENT_TYPE_TEXT_PLAIN: bodyText = (String) rabbitTemplate.getMessageConverter().fromMessage(message); break; //json对象 case MessageProperties.CONTENT_TYPE_JSON: User user = objectMapper.readValue(message.getBody(), User.class); bodyText = user.toString(); break; } log.info("消费端Payload: " + bodyText); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }
设置为手动确认后,有3种确认操做:
如示例代码中的 basicAck 方法,须要注意的是,要传递两个参数:
除了上述手动确认的方式,还有一种不太经常使用的方式,能够实现重复发送消息。在开启异常重试的前提下,在消费者代码中抛出异常,会自动重发消息。
application.properties
spring.rabbitmq.listener.simple.retry.enabled=true 是否开启消费者重试 spring.rabbitmq.listener.simple.retry.max-attempts=5 最大重试次数 spring.rabbitmq.listener.simple.retry.initial-interval=5000 重试间隔时间(单位毫秒) spring.rabbitmq.listener.simple.default-requeue-rejected=false 重试次数超过上面的设置以后是否丢弃
MessageListener.java
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = "direct.queue.d", durable = "false"), exchange = @Exchange(value = "direct.exchange.a", durable = "true", type = ExchangeTypes.DIRECT, ignoreDeclarationExceptions = "true"), key = "direct.routingKey.a" ) ) @RabbitHandler public void onMessage(Message message, Channel channel) throws Exception { String contentType = message.getMessageProperties().getContentType(); String bodyText = null; System.out.println(contentType); switch (contentType) { //字符串 case MessageProperties.CONTENT_TYPE_TEXT_PLAIN: bodyText = (String) rabbitTemplate.getMessageConverter().fromMessage(message); break; //json对象 case MessageProperties.CONTENT_TYPE_JSON: User user = objectMapper.readValue(message.getBody(), User.class); bodyText = user.toString(); break; } log.info("消费端Payload: " + bodyText); throw new RuntimeException("重试啦"); }
在RabbitMQ中消费者有2种方式获取队列中的消息:
对比来讲,若是有持续消费的需求,建议用push的方式,经过监听器来订阅。若是只是特定时刻须要从队列中,一次性取些数据,能够用pull方式。
咱们知道不管是生产者仍是消费者,都须要和 RabbitMQ Broker 创建链接,这个链接就是一条 TCP 链接,也就是 Connection。一旦 TCP 链接创建起来,客户端紧接着能够建立一个 AMQP 信道(Channel),每一个信道都会被指派一个惟一的 ID。
信道是创建在 Connection 之上的虚拟链接,RabbitMQ 处理的每条 AMQP 指令都是经过信道完成的。
咱们彻底可使用 Connection 就能完成信道的工做,为何还要引入信道呢?试想这样一个场景,一个应用程序中有不少个线程须要从 RabbitMQ 中消费消息,或者生产消息,那么必然须要创建不少个 Connection,也就是多个 TCP 链接。然而对于操做系统而言,创建和销毁 TCP 链接是很是昂贵的开销,若是遇到使用高峰,性能瓶颈也随之显现。
RabbitMQ 采用相似 NIO(Non-blocking I/O)的作法,选择 TCP 链接复用,不只能够减小性能开销,同时也便于管理。
每一个线程把持一个信道,因此信道复用了 Connection 的 TCP 链接。同时 RabbitMQ 能够确保每一个线程的私密性,就像拥有独立的链接同样。当每一个信道的流量不是很大时,复用单一的 Connection 能够在产生性能瓶颈的状况下有效地节省 TCP 链接资源。可是信道自己的流量很大时,这时候多个信道复用一个 Connection 就会产生性能瓶颈,进而使总体的流量被限制了。此时就须要开辟多个 Connection,将这些信道均摊到这些 Connection 中,至于这些相关的调优策略须要根据业务自身的实际状况进行调节。
信道在 AMQP 中是一个很重要的概念,大多数操做都是在信道这个层面展开的。好比 channel.exchangeDeclare、channel.queueDeclare、channel.basicPublish、channel.basicConsume 等方法。RabbitMQ 相关的 API 与 AMQP 紧密相连,好比 channel.basicPublish 对应 AMQP 的 Basic.Publish 命令。
针对push方式,RabbitMQ能够设置basicQoS(Consumer Prefetch)来对consumer进行流控,从而限制未Ack的消息数量。
前提包括,消息确认模式必须是手动确认。
basicQos(int var1, boolean var2)