RabbitMQ吐血总结(1)---基础啊基础

吐血总结吧~由于太经常使用,由于是平常,由于即便是平常也并无多少人懂的不少,由于想掌握一个消息中间件,就是如此吧,很少说,我一篇,我主要讲讲基本的使用与方法调用的入参及其意义,其实,RabbitMQ就光是基础的api调用,也并无多少人熟悉~java

1、RabbitMQ概览

想来想去,不知道如何开篇,我大概就把我学习到的一些比较重要的概念罗列出来吧,就算是一个记录吧。下面是一些我想写入的点面试

一、消息中间件的做用

  • 解耦:有别于本地线程或者线程池的解耦,这种解耦的可靠性彻底是用消息中间件体系来保证,可靠不少!本地线程池会撑爆、服务重启等问题。
  • 削峰:恩,很关键性的高并发场景的实用点!主要应对流量激增状况,不会由于突增的流量而在现用的机器承受力以内,保证服务的可用性。
  • 缓冲:主要控制数据流通过系统的速度
  • 异步通讯:这个不用解释了。原本消息中间件就是用来通讯的。

二、RabbitMQ的特色

  • 就是使用erlang语言,实现了AMQP协议
  • 很关键的特性:可靠性。持久化、确认机制等
  • 灵活性高,经过交互机到队列经过绑定键进行绑定,能够有不少玩法:广播、直连、模糊匹配、头信息匹配等
  • 可扩展性好,能够经过业务的状况动态扩展集群中的结点
  • 高可用性,经过配置镜像结点,是的部分结点出问题的状况下,队列仍然可用
  • 多协议支持,除了支持AMQP,还支持STOMP、MQTT等多种消息中间件协议
  • 多语言支持,恩这个不用多说,反正好过ActiveMQ
  • 默认给了个server端的管理界面,很人性化
  • 不少插件,支持本身编写

基本的安装我不在我文章里面介绍,这种东西一搜一大把,并且每一个平台安装模式也不同,例如个人mac,直接一条命令搞定。我看Linux下面还要先安装erlang的虚拟机。这些并不是RabbitMQ的核心技术,想作运维,出门左转。设计模式

2、从协议窥探RabbitMQ

RabbitMQ主要实现了AMQP这个很牛逼的协议。其实这种就相似于Redis,任何的Java这个层面的接口API调用,都是对应着相关协议的命令输出的。这东西细想也简单,无非就是咱们给RabbitMQ的server端下命令,让他作什么事儿。例如我让他建立信道,我就发一个Channel.open的字符串过去,对方server端起了个socket监听器,发现有数据,立刻处理,而后匹配到具体的命令处理器,相似于策略设计模式,使用一个Handler处理逻辑。本节我经过生产者与消费者的最简单代码,来窥探一下,到底RabbitMQ的流转过程是怎么样的,让你们有个初步的印象:哦原来,RabbitMQ大概是这样的。api

一、生产者的流转过程

先上基础的最简单的生产者代码:安全

// 建立链接
Connection connection = connectionFactory.newConnection();
// 建立信道
Channel channel = connection.createChannel();
String msg = "21123123";
channel.basicPublish("ExchargeName", "RoutingKey",
        MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
// 关闭资源
channel.close();
connection.close();

接下俩咱们来看看流转过程的用例图:并发

其中,Product表明生产者节点,Consumer表明消费者节点,Broker表明RabbitMQ Server端的一个机器节点。以后再也不说明运维

二、消费者的流转过程

一样,咱们先来一个最简单的消费者的代码模板:异步

// 建立链接
Connection connection = connectionFactory.newConnection();
// 建立信道
Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel){}
channel.basicQos(64);
channel.basicConsume("queueName",consumer);

// 关闭资源
channel.close();
connection.close();

下面是消费者的流转图:socket

请细看这两个时序图,与图中的标记,其中对应了具体哪一个JavaAPI调用,对应了哪一个AMQP的消息协议命令。不能,就是稍微多了点步骤。接下来,我会在遇到相关AMQP的命令时候,再继续介绍,就不集中介绍所有的AMQP协议命令了。ide

我本身感受,集中所有介绍没用,由于是我的根本无法立刻记住,即便我给你十天记住了,而后1天不用就忘了。说白了,仍是要紧扣实战的编码,单独研究协议,枯燥且没意义,即便面试,人家也不会专门问你命令。

3、RabbitMQ几大基础概念

发现,AMQP的协议中,我大体看了两个实现(RabbitMQ和Qpid),对这几个基础的构件,都是提早谈起的,可见,AMQP就是根据这几个构件来组织的,我就不经过大量文字解说这几个构件了,尽可能经过图片来展现,好懂一些。

一、总览

主要的构件包括:

  • Exchange:交换器
  • Queue:队列
  • RoutingKey和BindingKey:路由键和绑定键

二、队列

为了说明队列,先上一个虚拟的图(由于真实状况,消息不会直接到队列的):

  • RabbitMQ中消息只能存储在队列中
  • 多个消费者消费时候,一个消息只会发送给一个消费者,使用轮训方式

三、交换机

其实,咱们使用生产者,pulish一条消息,首先发送到的,就是交换机这个实体里面,并不是队列,而后经过各类绑定关系再路由到具体的队列中,下一小节,咱们就讲这个路由的问题。首先来看看交换机的示例图:

交换机类型:

  • fanout:每一个到了交换机里面的消息,会发送给全部绑定的队列里面
  • direct:必须BindingKey和RoutingKey彻底匹配才能将信息发送给对应的队列
  • topic:RoutingKey和BindingKey能够进行模糊匹配,具体的匹配规则是
    • RoutintKey和BindingKey是用”.“符号进行分割,例如:com.rabbitmq.client
    • BindingKey中能够存在两种特殊的字符串:*、#,分别对应匹配一个单词和多个单词(能够是0个)
    • 举例:RoutingKey是com.rabbitmq.client,BindingKey是*.rabbitmq.*
  • headers:根据消息的headers属性里面的键值对进行匹配,通常不建议使用

具体的RoutingKey和BindingKey如何区分下面咱们来讲,即便咱们没有指定具体的交换机发送给一个queue,RabbitMQ服务端也会随机生成一个交换机,后面会介绍到

四、RoutingKey与BindingKey

下面是RoutingKey和BindingKey的相关图例:

直接上具体的区分方法吧:

  • 在使用绑定动做的时候,使用的就是BindingKey,涉及的客户端方法有:channel.exchangeBind、channel.queueBind,对应的AMQP为:Exchange.Bind、Queue.Bind
  • 在发送消息的时候,其中须要的路由键是RoutingKey,涉及客户端的方法有:channel.basicPublish,对应的AMQP为:Basic.Publish

4、RabbitMQ的API基础实战

这一部分让咱们开始写代码,看看如何用RabbitMQ来发送与接收消息,而且接口参数都是什么意思,毕竟发送与接收代码中的参数仍是挺多的,代码不算整洁,咱们来看看为啥不整洁,为啥要用这些个参数。

一、建立连接与信道

两种方式来建立:

  • 直接多参数设置方式:
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("root");
connectionFactory.setPassword("root123");
connectionFactory.setVirtualHost("/root");
// 建立链接
Connection connection = connectionFactory.newConnection();
// 建立信道
Channel channel = connection.createChannel();
  • 使用uri的方式:
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUri("amqp://root:root123@127.0.0.1:5672/root");
// 建立链接
Connection connection = connectionFactory.newConnection();
// 建立信道
Channel channel = connection.createChannel();

使用注意事项:

  1. 一个connection能够对应多个channel
  2. 推荐作法是每一个线程建立本身的channel,多个线程共享一个connection
  3. 若是使用一个已经关闭的channel,会抛出ShutdownSignalException
  4. 下面咱们仍是来看看整个RabbitMQ的信道链接的模型:

二、声明交换机

对应的方式是:exchangeDeclare。先来看看Java中API客户端的源码,发现注释已经介绍的很清楚了:

/**
* 激活一个不自动删除, 不持久化,且没有拓展参数的交换机
* @param exchange 交换机名称
* @param type 交换机类型
* @return 确认交换机声明是否成功
* @throws java.io.IOException io异常
*/
Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;

/**
* 激活一个不自动删除, 不持久化,且没有拓展参数的交换机
* @param exchange 交换机名称
* @param type 交换机类型(封装了个enum给你用)
* @return 确认交换机声明是否成功
* @throws java.io.IOException io异常
*/
Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type) throws IOException;

/**
* 激活一个不自动删除, 且没有拓展参数的交换机
* @param exchange 交换机名称
* @param type 交换机类型
* @param durable true表示要持久化这个交换机 (在服务端重启的时候还会存在这个交换机)
*/
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;

/**
* 激活一个不自动删除, 且没有拓展参数的交换机
* @param exchange 交换机名称
* @param type 交换机类型(封装了个enum给你用)
* @param durable true表示要持久化这个交换机 (在服务端重启的时候还会存在这个交换机)
*/
Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable) throws IOException;

/**
* 声明一个交换机
* @param exchange 交换机名称
* @param type 交换机类型
* @param durable ttrue表示要持久化这个交换机 (在服务端重启的时候还会存在这个交换机)
* @param autoDelete true表示若是没有被使用,这个交换机会自动删除
* @param arguments 其余的一个些属性
*/
Exchange.DeclareOk exchangeDeclare(String exchange, 
                        String type, 
                        boolean durable, 
                        boolean autoDelete,
                        Map<String, Object> arguments) throws IOException;

/**
* 声明一个交换机
* @param exchange 交换机名称
* @param type 交换机类型(封装了个enum给你用)
* @param durable ttrue表示要持久化这个交换机 (在服务端重启的时候还会存在这个交换机)
* @param autoDelete true表示若是没有被使用,这个交换机会自动删除
* @param arguments 其余的一个些属性
*/
Exchange.DeclareOk exchangeDeclare(String exchange, 
                            BuiltinExchangeType type, 
                            boolean durable, 
                            boolean autoDelete,
                            Map<String, Object> arguments) throws IOException;

/**
* @param internal true表示这个交换机是一个内部的, 不能直接使用客户端的api进行pubulish操做
*/
Exchange.DeclareOk exchangeDeclare(String exchange,
                          String type,
                          boolean durable,
                          boolean autoDelete,
                          boolean internal,
                          Map<String, Object> arguments) throws IOException;


Exchange.DeclareOk exchangeDeclare(String exchange,
                            BuiltinExchangeType type,
                            boolean durable,
                            boolean autoDelete,
                            boolean internal,
                            Map<String, Object> arguments) throws IOException;

/**
* 和前面的方式差很少,可是没有返回值,且无论服务端是否有response
*/
void exchangeDeclareNoWait(String exchange,
           String type,
           boolean durable,
           boolean autoDelete,
           boolean internal,
           Map<String, Object> arguments) throws IOException;


void exchangeDeclareNoWait(String exchange,
            BuiltinExchangeType type,
            boolean durable,
            boolean autoDelete,
            boolean internal,
            Map<String, Object> arguments) throws IOException;

/**
* 被动的声明一个交换机; 判断这个名字的交换机是否存在
* @throws IOException 若是交换机不存在,会报404的异常
*/
Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;

/**
* 删除一个交换机
* @param exchange 交换机名称
* @param ifUnused true表示肯定这个交换机是否被使用,只有彻底不被使用的状况下,才能删除
* @return a deletion-confirm method to indicate the exchange was successfully deleted
* @throws java.io.IOException if an error is encountered
*/
Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;

/**
* 一样也是删除一个交换机,可是没有返回值,不等待服务端的返回确认
*/
void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;

其实直接看源码,一切已经很明了了,下面我对极个别一些注意点说说:

  • 自动删除的前提是至少有一个队列或者交换机与这个交换机绑定,以后全部与这个交换机绑定的队列与交换机都与此解绑。就是说刚建立的交换机也是没有与之绑定的实体,这个时候是不能自动删除的
  • exchangeDeclarePassive这个方法在实际应用中很是有用,主要用来检测响应的交换机是否存在,若是存在则返回,不存在则抛出异常:404 channel exception,同时channel也会关闭

三、声明队列

使用的方法是queueDeclare,对于队列的声明,重载方法少不少,一样的咱们直接来看看源码与源码中的注释:

/**
 * 声明一个服务端命名的, 本链接独占的, 自动删除的, 不持久化的队列
 * 建立以后队列的名称,能够从返回对象Queue.DeclareOk中的"queue"字段获取.
 */
Queue.DeclareOk queueDeclare() throws IOException;

/**
 * 声明一个队列
 * @param queue 队列的名称
 * @param durable true表示这个队列要作持久化(服务端重启以后,这个队列同样存在)
 * @param exclusive true表示这个队列是一个独占的(被限制只能当前connection使用)
 * @param autoDelete true表示这个队列是自动删除的 (服务端会在这个队列不被使用的时候将其删除)
 * @param arguments 队列建立时候的其余参数
 */
Queue.DeclareOk queueDeclare(String queue, 
                    boolean durable,
                    boolean exclusive,
                    boolean autoDelete,
                    Map<String, Object> arguments) throws IOException;

/**
 * 相似于queueDeclare方法,可是没有返回值,而且无论服务端是否有回复,声明是否成功
 */
void queueDeclareNoWait(String queue, 
                boolean durable,
                boolean exclusive,
                boolean autoDelete,
                Map<String, Object> arguments) throws IOException;

/**
 * 声明一个队列,若是队列不存在,将会抛出一个异常,而且关闭Channel
 */
Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;

/**
 * 删除一个队列, 无论是否这个队列被使用或者里面有消息
 * @param queue 队列名称
 */
Queue.DeleteOk queueDelete(String queue) throws IOException;

/**
 * 删除一个队列
 * @param queue 队列名称
 * @param ifUnused true表示这个队列只有不被使用时候才删除
 * @param ifEmpty true表示只有这个队列为空的时候才删除
 */
Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;

/**
 * 和queueDelete(String, boolean, boolean)相似,可是没有返回值,且不等服务端返回
 */
void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;

一样的,介绍几个重点问题:

  • 解释下exclusive(独占队列)的细节:
    • true状况下是基于链接(connection)可见的,就是说一个链接中的不一样的信道(channel)可见,不一样链接不可见
    • 当前一个链接中建立一个独占队列,其余链接不能在建立同名的队列了,这一点与普通队列不一样
    • 即便这个队列是持久化的队列,可是一旦链接关闭或者客户端退出,该独占队列也会被自动删除
    • 这种队列比较适合一个客户端同时发送和读取消息的应用场景
  • 自动删除细节:至少有一个消费者链接了这个队列,以后全部与这个队列链接的消费者都断开是,会自动删除
  • 若是消费者在同一个信道上面订阅了另外一个队列,就没法再声明队列了,必须先取消订阅,而后将信道设置为“传输”模式,以后才能声明队列

四、队列与交换机的绑定

这一部分,主要是将咱们声明的交换机与队列创建关联的一个动做,涉及到的方法有:

  • queueBind
  • exchangeBind

下面咱们来看看源码与注释:

/**
 * 将一个队列绑定到交换机上面, 不带任何拓展参数
 * @param queue 队列名称
 * @param exchange 交换机名称
 * @param routingKey 这里其实就是绑定的键
 */
Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;

/**
 * 将一个队列绑定到交换机上面
 * @param queue 队列名称
 * @param routingKey 绑定键
 * @param arguments 其余的拓展参数
 */
Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

/**
 * 无论是否绑定成功就返回的绑定方法
 */
void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;


/**
 * 将队列与一个交换机经过绑定键绑定在一块儿的关系解除
 * @param queue 队列名称
 * @param exchange 交换机名称
 * @param routingKey 绑定键
 */
Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException;

Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;


/**
 * 将一个交换机绑定到另一个交换机上面
 * @param destination 这个是消息要到达的交换机名称
 * @param source 这个是消息从哪里来的交换机名称
 * @param routingKey 绑定键
 */
Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;


Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;


void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;

主要使用的参数,经过注释已经很明了了,其余的注意点暂时没有。

五、发送消息

发送消息这里对应的方法,主要就是basicPublish这个方法,先来看看他的API接口:

/**
 * 发布一个消息到服务端。
 *
 * 若是发布的消息到交换机不存在,会抛出一个channel-level protocol的异常,而且会关闭这个channel
 *
 * @param exchange 消息要发布到的交换机
 * @param routingKey 这个是路由键(会匹配具体的绑定键)
 * @param 一个消息其余的属性 - 例如headers
 * @param body 消息体
 */
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

/**
 * 发布一个消息到服务端。
 *
 * @param exchange 消息要发布到的交换机
 * @param routingKey 这个是路由键
 * @param mandatory 这个后面文章会介绍
 * @param 一个消息其余的属性 - 例如headers
 * @param 消息体
 */
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
        throws IOException;

/**
 * 发布一个消息到服务端。
 *
 * @param mandatory 后面文章介绍
 * @param immediate 后面文章介绍
 */
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
        throws IOException;

这里能够看到的是,mandatory、immediate这两个入参很关键,后面会专门讲,由于这两个入参关系到了消息的可靠性和稳定性。下面是一些能够添加属性的代码示例:

channel.basicPublish("ExchargeName", "RoutingKey",
                MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());

channel.basicPublish("ExchargeName", "RoutingKey",
                new AMQP.BasicProperties.Builder()
                .contentType("test/plain")
                .deliveryMode(2)
                .priority(1)
                .userId("hiden")
                .build(), msg.getBytes());

下面介绍下经过Builder模式构建发消息时候的属性,有助于了解消息的可靠性:

  • deliveryMode设置成2,表示消息会被持久化,存入磁盘
  • priority设置成了0表示优先级为0
  • content-type为text/plain表示消息体为文本类型

六、消费消息

消费消息来讲,相比较来讲要比单纯的接口调用要复杂点,由于涉及到方法的注册等,还存在两种消费模式:

  • 推模式
  • 拉模式

推模式

在推模式中,能够经过持续订阅方式来消费消息,主要经过两个类:

  • com.rabbitmq.client.Consumer(接口)
  • com.rabbitmq.client.DefaultConsumer(上面那个接口的默认实现类)

设计这两个消费对象的类的方法是:

/**
 * 启动一个非本地, 非排他, 要手动ack的, 服务端自定义的消费者标签名称的消费者
 */
String basicConsume(String queue, Consumer callback) throws IOException;

/**
 * 启动一个非本地, 非排他, 服务端自定义的消费者标签名称的消费者
 */
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

/**
 * 启动一个非本地, 非排他, 服务端自定义的消费者标签名称的消费者(固然带了个参数map)
 */
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException;

/**
 * 启动一个非本地, 非排他的消费者(固然带了个参数map)
 * @param queue the name of the queue
 * @param autoAck true if the server should consider messages
 * acknowledged once delivered; false if the server should expect
 * explicit acknowledgements
 * @param consumerTag a client-generated consumer tag to establish context
 * @param callback an interface to the consumer object
 * @return the consumerTag associated with the new consumer
 * @throws java.io.IOException if an error is encountered
 * @see com.rabbitmq.client.AMQP.Basic.Consume
 * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
 * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
 */
String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;

/**
 * 启动一个消费者
 * @param queue 队列名称
 * @param autoAck 消费者是否自动确认
 * @param consumerTag 消费者标签
 * @param noLocal true表示不能把同一个connection中生产者发送的消息传送给这个connection中的消费者
 * @param exclusive 是不是拍他的消费者
 * @param arguments 设置消费者的其余参数
 * @param callback 设置消费者的回调函数,用来处理RabbitMQ推送过来的消息
 */
String basicConsume(String queue,
            boolean autoAck,
            String consumerTag,
            boolean noLocal,
            boolean exclusive,
            Map<String, Object> arguments,
            Consumer callback) throws IOException;

下面是一段消费者的真实代码:

channel.basicQos(64);
channel.basicConsume("queueName", false, "tag", new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag,
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        String routingKey = envelope.getRoutingKey();
        String contentType = properties.getContentType();
        long deliveryTag = envelope.getDeliveryTag();
        String msg = new String(body, "UTF-8");
        // 这里进行消息处理
        channel.basicAck(deliveryTag,false);
    }
});

具体的DefaultConsumer类里面还有其余的几个方法,对应不一样的消费动做回调的函数:

  • handleConsumeOk(String consumerTag):会在其余方法以前调用,返回消费者标签名称
  • handleCancelOk(String consumerTag):若是手动调用Channel#basicCancel会被触发回调
  • handleCancel(String consumerTag):除了手动调用Channel#basicCancel这个方法以外的消费取消动做(例如消费队列被删除)会触发这个方法
  • handleShutdownSignal(String consumerTag, ShutdownSignalException sig):当connection或者channel关闭了,会被触发
  • handleRecoverOk(String consumerTag):后面介绍

若是手动调用了Channel#basicCancel,触发方法的序列是:

  1. handleConsumeOk
  2. handleDelivery
  3. handleCancelOk

另外还要注意的是:这些个被触发的对象都被分配到了不一样的线程池上,因此要注意线程安全性的问题

拉模式

拉模式,有别于推模式的被动获取消息,咱们能够直接在业务逻辑中,主动获取RabbitMQ服务端中的消息,下面是总体的时序图

对应的代码是:

GetResponse response = channel.basicGet("QueueName", false);
System.out.println(response.getBody());
channel.basicAck(response.getEnvelope().getDeliveryTag(),false);

比较下,推模式和拉模式:

  • 只想从队列得到单条消息而不是持续订阅,建议使用拉模式进行消费
  • 若是要实现高吞吐量,消费者应该使用推模式
  • 具体的两种模式的细节对比请看一个外部连接:推模式和拉模式比较

七、消息的确认与拒绝

这里我自我感受很关键,由于涉及到消息消费的可靠性。首先让咱们来看看ack机制:

  • autoAck为true时候RabbitMQ会自动吧发出去的消息从内存或者磁盘中删除,无论消费者是否真正消费了这些消息
  • autoAck为false的时候,RabbitMQ会等待消费者显示的回复确认信号的时候才从内存和磁盘中删除
  • 固然,这种删除是异步,先打上标记,以后再删除

总体上,RabbitMQ的服务端存在两种队列:

  • 等待投递给消费者的队列
  • 已经投递给消费者,但尚未收到消费者确认信号的消息

另外,RabbitMQ不会主动删除或者过时未确认的消息,判断是否从新投递给消费者的惟一依据是消费该消息的消费者链接是否已经断开。若是长时间没回复ack,直到链接断开,此消息会从新进入队列,等待投递给下一个消费者,固然也多是当前消费者。

另一个点,是咱们能够直接拒绝一个消息,使用的方法是:

void basicReject(long deliveryTag, boolean requeue) throws IOException;

第一个参数好理解,重点说下第二个,requeue:

  • true:RabbitMQ会将这个拒绝的消息从新存入队列,以便给下一个订阅者
  • false:RabbitMQ当即会把消息从队列中移除

还有个方法,能够请求RabbitMQ从新发送还未被确认的消息,有下面两个方法

Basic.RecoverOk basicRecover() throws IOException;

Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

再次涉及到requeue参数:

  • true:未被确认的消息从新加入到队列中,可能会被不一样消费者消费
  • false:一样是未被确认的消息从新加入队列,可是同一条消息会被分配给以前相同的消费者
  • 默认不带参数的方法,requeue值为true

八、关闭链接

关闭涉及到的有两个对象的关闭:

  • connection
  • channel

关闭的方法有:

  • void close()
  • void close(int closeCode, String closeMessage)

整个AMQP协议中connection与channel声明周期以下:

  • open:开启状态,对象可使用
  • closing:正在关闭,显示调用关闭方法,产生一个关闭请求,等待关闭动做完成
  • closed:已经关闭

固然咱们能够注册关闭以后触发的监听器:

connection.addShutdownListener(new ShutdownListener() {
    @Override
    public void shutdownCompleted(ShutdownSignalException cause) {
        // 业务逻辑
        Method reason = cause.getReason();
    }
});

5、总结

接下俩大概会有两篇,一篇是功能性的拔高,一篇是RabbitMQ的原理相关,若是可能我会说说集群构建与镜像

相关文章
相关标签/搜索