RabbitMQ基础教程之使用进阶篇

RabbitMQ基础教程之使用进阶篇

相关博文,推荐查看:html

  1. RabbitMq基础教程之安装与测试
  2. RabbitMq基础教程之基本概念
  3. RabbitMQ基础教程之基本使用篇

I. 背景

前一篇基本使用篇的博文中,介绍了rabbitmq的三种使用姿式,能够知道如何向RabbitMQ发送消息以及如何消费,但遗留下几个疑问,本篇则主要但愿弄清楚这几点java

  • Exchange声明的问题(是否必须声明,若是不声明会怎样)
  • Exchange声明的几个参数(durable, autoDelete)有啥区别
  • 当没有队列和Exchange绑定时,直接往队列中塞数据,好像不会有数据增长(即先塞数据,而后建立queue,创建绑定,从控制台上看这个queue里面也不会有数据)
  • 消息消费的两种姿式(一个主动去拿数据,一个是rabbit推数据)对比

<!--more-->git

II. 基本进阶篇

1. Exchange默认场景

将前面的消息发送代码捞出来,干掉Exchange的声明,以下github

public class DefaultProducer {
    public static void publishMsg(String queue, String message) throws IOException, TimeoutException {
        ConnectionFactory factory = RabbitUtil.getConnectionFactory();

        //建立链接
        Connection connection = factory.newConnection();

        //建立消息通道
        Channel channel = connection.createChannel();
        channel.queueDeclare(queue, true, false, true, null);

        // 发布消息
        channel.basicPublish("", queue, null, message.getBytes());

        channel.close();
        connection.close();
    }

    public static void main(String[] args) throws IOException, TimeoutException {
        for (int i = 0; i < 20; i++) {
            publishMsg("hello", "msg" + i);
        }
    }
}

在发布消息时,传入的Exchange名为“”,再到控制台查看,发现数据被投递到了(AMQP default)这个交换器,对应的截图以下api

image

看一下上面的绑定描述内容,重点以下ide

  • 默认交换器选择Direct策略
  • 将rountingKey绑定到同名的queue上
  • 不支持显示的绑定和解绑

上面的代码为了演示数据的流向,在发布消息的同时也定义了一个同名的Queue,所以能够在控制台上看到同名的 "hello" queue,且内部有20条数据学习

当咱们去掉queue的声明时,会发现另外一个问题,投入的数据好像并无存下来(由于没有queue来接收这些数据,而以后再声明queue时,以前的数据也不会分配过来)测试

2. 绑定以后才有数据

首先是将控制台中的hello这个queue删掉,而后再次执行下面的代码(相对于前面的就是注释了queue的声明)ui

public class DefaultProducer {
    public static void publishMsg(String queue, String message) throws IOException, TimeoutException {
        ConnectionFactory factory = RabbitUtil.getConnectionFactory();

        //建立链接
        Connection connection = factory.newConnection();

        //建立消息通道
        Channel channel = connection.createChannel();
        //        channel.queueDeclare(queue, true, false, true, null);

        // 发布消息
        channel.basicPublish("", queue, null, message.getBytes());

        channel.close();
        connection.close();
    }

    public static void main(String[] args) throws IOException, TimeoutException {
        for (int i = 0; i < 20; i++) {
            publishMsg("hello", "msg" + i);
        }
    }
}

而后从控制台上看,能够看到有数据写入Exchange,可是没有queue来接收这些数据code

IMAGE

而后开启消费进程,而后再次执行上面的塞入数据,新后面从新塞入的数据能够被消费;可是以前塞入的数据则没有,消费消息的代码以下:

public class MyDefaultConsumer {
    public void consumerMsg(String queue) throws IOException, TimeoutException {
        ConnectionFactory factory = RabbitUtil.getConnectionFactory();

        //建立链接
        Connection connection = factory.newConnection();

        //建立消息通道
        Channel channel = connection.createChannel();
        channel.queueDeclare(queue, true, false, true, null);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                    byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                try {
                    System.out.println(" [ " + queue + " ] Received '" + message);
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        // 取消自动ack
        channel.basicConsume(queue, false, consumer);
    }

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        MyDefaultConsumer consumer = new MyDefaultConsumer();
        consumer.consumerMsg("hello");

        Thread.sleep(1000 * 60 * 10);
    }
}

小结:

  • 经过上面的演示得知一点
  • 当没有Queue绑定到Exchange时,往Exchange中写入的消息也不会从新分发到以后绑定的queue上

3. Durable, autoDeleted参数

在定义Queue时,能够指定这两个参数,这两个参数的区别是什么呢?

a. durable

持久化,保证RabbitMQ在退出或者crash等异常状况下数据没有丢失,须要将queue,exchange和Message都持久化。

如果将queue的持久化标识durable设置为true,则表明是一个持久的队列,那么在服务重启以后,也会存在,由于服务会把持久化的queue存放在硬盘上,当服务重启的时候,会从新什么以前被持久化的queue。队列是能够被持久化,可是里面的消息是否为持久化那还要看消息的持久化设置。也就是说,重启以前那个queue里面尚未发出去的消息的话,重启以后那队列里面是否是还存在原来的消息,这个就要取决于发生着在发送消息时对消息的设置

b. autoDeleted

自动删除,若是该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列

这个比较容易演示了,当一个Queue被设置为自动删除时,当消费者断掉以后,queue会被删除,这个主要针对的是一些不是特别重要的数据,不但愿出现消息积累的状况

// 倒数第二个参数,true表示开启自动删除
// 正数第二个参数,true表示持久化
channel.queueDeclare(queue, true, false, true, null);

c. 小结

  • 当一个Queue已经声明好了以后,不能更新durable或者autoDelted值;当须要修改时,须要先删除再从新声明
  • 消费的Queue声明应该和投递的Queue声明的 durable,autoDelted属性一致,不然会报错
  • 对于重要的数据,通常设置 durable=true, autoDeleted=false
  • 对于设置 autoDeleted=true 的队列,当没有消费者以后,队列会自动被删除

4. ACK

执行一个任务可能须要花费几秒钟,你可能会担忧若是一个消费者在执行任务过程当中挂掉了。一旦RabbitMQ将消息分发给了消费者,就会从内存中删除。在这种状况下,若是正在执行任务的消费者宕机,会丢失正在处理的消息和分发给这个消费者但还没有处理的消息。 可是,咱们不想丢失任何任务,若是有一个消费者挂掉了,那么咱们应该将分发给它的任务交付给另外一个消费者去处理。

为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收而且处理完毕了。RabbitMQ就能够删除它了。

所以手动ACK的常见手段

// 接收消息以后,主动ack/nak
Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
            byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        try {
            System.out.println(" [ " + queue + " ] Received '" + message);
            channel.basicAck(envelope.getDeliveryTag(), false);
        } catch (Exception e) {
            channel.basicNack(envelope.getDeliveryTag(), false, true);
        }
    }
};

// 取消自动ack
channel.basicConsume(queue, false, consumer);

手动ack时,有个multiple,其含义表示:

能够理解为每一个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录

III. 其余

1. 参考

Java Client API Guide

2. 一灰灰Bloghttps://liuyueyi.github.io/hexblog

一灰灰的我的博客,记录全部学习和工做中的博文,欢迎你们前去逛逛

3. 声明

尽信书则不如,已上内容,纯属一家之言,因我的能力有限,不免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激

4. 扫描关注

QrCode

相关文章
相关标签/搜索