rabbitmq

1、rabbitmq关键字

Binding:Exchange和Exchange、Queue之间的链接关系
Queue:实际存储消息。
Durability:是否持久化,Durable:是,Transient:否。
Auto delete:如选yes,表明当最后一个监听被移除以后,该Queue会自动删除。
Message:服务器和应用程序之间传送的数据。本质上就是一段数据,由Properties和Payload(Body)组成。经常使用属性:delivery mode、headers(自定义属性)
Virtual host:虚拟主机,用于进行逻辑隔离,最上层的消息路由。一个Virtual Host里面能够有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或者Queue。

2、rabbitmq高级特性

  1. 消息如何保证100%的投递成功?segmentfault

    生产端可靠性投递服务器

    • 保障消息的成功发出
    • 保障MQ节点的成功接收
    • 发送端收到MQ节点(Broker)确认应答
    • 完善的消息进行补偿机制

    可靠性投递解决方案ide

    • 消息落库,对消息状态进行打标
    • 消息的延迟投递,作二次确认,回调检查
    • 消息集群镜像队列:rabbitmq集群模式很是经典的就是Mirror镜像模式,保证100%数据不丢失,在实际工做中也是用的最多的。而且实现集群很是简单,通常互联网公司都会构建这种镜像集群模式。https://segmentfault.com/a/11...
  2. 幂等性概念详解
  3. 在海量订单产生的业务高峰期,如何避免消息的重复消费问题?
  4. Confirm确认消息、Return返回消息fetch

    理解Confirm消息确认机制:日志

    • 消息的确认,是指生产者投递消息后,若是Broker收到消息,则会给咱们生产者一个应答。
    • 生产者进行接受应答,用来肯定这条消息是否正常的发送到了Broker,这种方式也是消息的可靠性投递的核心保障。

    如何实现Confirm确认消息?code

    • 第一步:在channel上开启确认模式:channel.confirmSelect()
    • 第二步:在channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进行从新发送、或者记录日志等后续处理。

    Return消息机制rabbitmq

    • Return Listener用于处理一些不可路由的消息
    • 咱们的消息生产者,经过制定一个Exchange和Routing Key,把消息送达到某一个队列中去,而后咱们的消费者监听队列,进行消费处理。
    • 可是在某写状况下,若是咱们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候若是咱们须要监听这种不可达的消息,就要使用Return Listener。
    • Mandatory:若是为true,则监听器会接收到路由不可达的消息,而后进行后续处理,若是为false,那么broker端自动删除该消息。

  • 自定义消费者
public class MyConsumer extends DefaultConsumer implements Consumer {

    public MyConsumer(Channel channel) {
        super(channel);
    }


    @Override
    public void handleDelivery(String arg0, Envelope arg1, BasicProperties arg2, byte[] arg3) throws IOException {
        
    }

}
  • 消息的ACK与重回队列队列

    消费端手工ACK(应答成功)和NACK(应答失败)ip

    1. 消费端进行消费的时候,若是因为业务异常咱们能够进行日志的记录,而后进行补偿。
    2. 因为服务器宕机等严重问题,咱们就要手工进行ACK保障消费端消费成功。好比:消费一半的时候宕机了,broker端没有收到应答,重发消息。

    消费端重回队列ci

    • 消费端重回队列是为了对没有处理成功的消息,把消息从新回递给broker。
    • 通常咱们在实际应用中,都会关闭重回队列,也就是设置为false。

    channel.basciNack(envelope.getDeliveryTag(),false,true) // 为true的话,在消费失败的状况下会重回队列放入队列末端。

  • 消息的限流

    假设有一个场景,RabbitMq服务器上有上万条未处理的消息,咱们随便打开一个消费者客户端,会出现下面的状况: 巨量的消息瞬间所有推送过来,可是咱们单个客户端没法同时处理这么多数据。RabbitMq提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,若是必定数目的消息(经过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息。
    void BasicQos(unit prefetchSize, ushort prefetchCount, bool global);
    prefetchSize:0
    prefetchCount: 会告诉RabbitMq不要同时给一个消费者推送多于N个消息,即一旦有N个消息尚未被ack,则该consumer将block掉,知道有消息ack。
    global: true/false,是否将上面设置应用于channel,简答点说,就是上面限制是channel级别的仍是consumer级别。
    // 限流方式,第一件事就是autoAck设置为false,关闭自动签收,必须手动签收
    channel.basicQos(0,3,false); // 3表示若是消息积压了1000条,先给我推3条,这三条消费结束后,我会给你一个ack表示这三条我已经处理完了,而后再给我推送3条...
    channel.basicConsume(queueName,false,new MyConsumer(channel))
    //在MyConsumer中对消息进行签收ack
    channel.basicAck(envelope,getDeliveryTag(), false);
  • TTL消息

    TTL:是Time To Live。就是生存时间。

    • RabbitMQ支持消息的过时时间,在消息发送时能够进行指定。
    • RabbitMQ支持队列的过时时间,从消息入队开始计算,只要超过了队列的超时时间配置,那么消息会自动清除。
  • 死信队列

    死信队列:DLX,Dead-Letter-Exchange。
    利用DLX,当消息在一个队列中变成了死信(dead message:这条消息没有消费者去消费)以后,他能被从新publish到另一个Exchange,这个Exchange就是DLX。
    进入死信队列(进入死信的三种方式)

    • 1.消息被拒绝(basic.reject or basic.nack)而且requeue=false

    void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

    • 2.消息TTL过时
    • 3.队列达到最大长度

    备注说明

    • DLX也是一个正常的Exchange,和通常的Exchange没区别,他能在任何的队列上被指定,实际上就是设置某个队列的属性。
    • 当这个队列中有死信时,RabbitMQ会自动将这个消息从新发送到已经设置了的Exchange上去,进而被路由到另一个队列上去。
    • 能够监听这个队列中消息作相应的处理。

    死信队列设置

    • 首先须要设置死信队列的exchange和queue,而后进行绑定:Exchange:dlx.exchange、Queue:dlx.queue、RoutingKey:#
    • 而后进行正常声明交换机、队列、绑定,只不过咱们须要在队列加上一个参数便可:arguments.put("x-dead-letter-exchange","dlx.exchange");
相关文章
相关标签/搜索