Binding
:Exchange和Exchange、Queue之间的链接关系
Queue
:实际存储消息。
Durability
:是否持久化,Durable:是,Transient:否。
Auto delete
:如选yes,表明当最后一个监听被移除以后,该Queue会自动删除。
Message
:服务器和应用程序之间传送的数据。本质上就是一段数据,由Properties和Payload(Body)组成。经常使用属性:delivery mode、headers(自定义属性)
Virtual host
:虚拟主机,用于进行逻辑隔离,最上层的消息路由。一个Virtual Host里面能够有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或者Queue。
消息如何保证100%的投递成功?segmentfault
生产端可靠性投递
服务器
- 保障消息的成功发出
- 保障MQ节点的成功接收
- 发送端收到MQ节点(Broker)确认应答
- 完善的消息进行补偿机制
可靠性投递解决方案
ide
- 消息落库,对消息状态进行打标
- 消息的延迟投递,作二次确认,回调检查
- 消息集群镜像队列:rabbitmq集群模式很是经典的就是Mirror镜像模式,保证100%数据不丢失,在实际工做中也是用的最多的。而且实现集群很是简单,通常互联网公司都会构建这种镜像集群模式。https://segmentfault.com/a/11...
Confirm确认消息、Return返回消息fetch
理解Confirm消息确认机制:
日志
- 消息的确认,是指生产者投递消息后,若是Broker收到消息,则会给咱们生产者一个应答。
- 生产者进行接受应答,用来肯定这条消息是否正常的发送到了Broker,这种方式也是消息的可靠性投递的核心保障。
如何实现Confirm确认消息?
code
- 第一步:在channel上开启确认模式:channel.confirmSelect()
- 第二步:在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行从新发送、或者记录日志等后续处理。
Return消息机制
rabbitmq
- Return Listener用于处理一些不可路由的消息
- 咱们的消息生产者,经过制定一个Exchange和Routing Key,把消息送达到某一个队列中去,而后咱们的消费者监听队列,进行消费处理。
- 可是在某写状况下,若是咱们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候若是咱们须要监听这种不可达的消息,就要使用Return Listener。
- Mandatory:若是为true,则监听器会接收到路由不可达的消息,而后进行后续处理,若是为false,那么broker端自动删除该消息。
public class MyConsumer extends DefaultConsumer implements Consumer { public MyConsumer(Channel channel) { super(channel); } @Override public void handleDelivery(String arg0, Envelope arg1, BasicProperties arg2, byte[] arg3) throws IOException { } }
消息的ACK与重回队列队列
消费端手工ACK(应答成功)和NACK(应答失败)
ip
- 消费端进行消费的时候,若是因为业务异常咱们能够进行日志的记录,而后进行补偿。
- 因为服务器宕机等严重问题,咱们就要手工进行ACK保障消费端消费成功。好比:消费一半的时候宕机了,broker端没有收到应答,重发消息。
消费端重回队列
ci
- 消费端重回队列是为了对没有处理成功的消息,把消息从新回递给broker。
- 通常咱们在实际应用中,都会关闭重回队列,也就是设置为false。
channel.basciNack(envelope.getDeliveryTag(),false,
true
) // 为true的话,在消费失败的状况下会重回队列放入队列末端。
消息的限流
假设有一个场景,RabbitMq服务器上有上万条未处理的消息,咱们随便打开一个消费者客户端,会出现下面的状况: 巨量的消息瞬间所有推送过来,可是咱们单个客户端没法同时处理这么多数据。RabbitMq提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,若是必定数目的消息(经过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息。
void BasicQos(unit prefetchSize, ushort prefetchCount, bool global);
prefetchSize
:0
prefetchCount
: 会告诉RabbitMq不要同时给一个消费者推送多于N个消息,即一旦有N个消息尚未被ack,则该consumer将block掉,知道有消息ack。
global
: true/false,是否将上面设置应用于channel,简答点说,就是上面限制是channel级别的仍是consumer级别。
// 限流方式,第一件事就是autoAck设置为false,关闭自动签收,必须手动签收 channel.basicQos(0,3,false); // 3表示若是消息积压了1000条,先给我推3条,这三条消费结束后,我会给你一个ack表示这三条我已经处理完了,而后再给我推送3条... channel.basicConsume(queueName,false,new MyConsumer(channel)) //在MyConsumer中对消息进行签收ack channel.basicAck(envelope,getDeliveryTag(), false);
TTL消息
TTL
:是Time To Live。就是生存时间。
- RabbitMQ支持消息的过时时间,在消息发送时能够进行指定。
- RabbitMQ支持队列的过时时间,从消息入队开始计算,只要超过了队列的超时时间配置,那么消息会自动清除。
死信队列
死信队列
:DLX,Dead-Letter-Exchange。
利用DLX,当消息在一个队列中变成了死信(dead message:这条消息没有消费者去消费)以后,他能被从新publish到另一个Exchange,这个Exchange就是DLX。进入死信队列(进入死信的三种方式)
- 1.消息被拒绝(basic.reject or basic.nack)而且requeue=false
void basicNack(long deliveryTag, boolean multiple,
boolean
requeue
) throws IOException;
- 2.消息TTL过时
- 3.队列达到最大长度
备注说明
- DLX也是一个正常的Exchange,和通常的Exchange没区别,他能在任何的队列上被指定,实际上就是设置某个队列的属性。
- 当这个队列中有死信时,RabbitMQ会自动将这个消息从新发送到已经设置了的Exchange上去,进而被路由到另一个队列上去。
- 能够监听这个队列中消息作相应的处理。
死信队列设置
- 首先须要设置死信队列的exchange和queue,而后进行绑定:Exchange:dlx.exchange、Queue:dlx.queue、RoutingKey:#
- 而后进行正常声明交换机、队列、绑定,只不过咱们须要在队列加上一个参数便可:arguments.put("
x-dead-letter-exchange
","dlx.exchange");