RabbitMQ从入门到精通(三)

1. 自定义消费者使用

  • 咱们以前呢都是在代码中编写while循环,进行 consumer.nextDelivery 方法进行获取下一条消息,而后进行消费处理!
  • 其实咱们还可使用自定义的Consumer,它更加的方便,解耦性更加的强,也是在实际工做中最经常使用的使用方式!
  • 自定义消费端实现只须要继承 DefaultConsumer 类,重写 handleDelivery 方法便可

 

自定义消费端演示

public class Producer {
     public static void main(String[] args) throws Exception {
            //1 建立ConnectionFactory
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("192.168.244.11");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            connectionFactory.setHandshakeTimeout(20000);
            //2 获取Connection
            Connection connection = connectionFactory.newConnection();
            //3 经过Connection建立一个新的Channel
            Channel channel = connection.createChannel();
            
            String exchange = "test_consumer_exchange";
            String routingKey = "consumer.save";
            
            String msg = "Hello RabbitMQ Consumer Message";
            //4 发送消息
            for(int i =0; i<5; i ++){
                channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
            }
        }
}

 

public class MyConsumer extends DefaultConsumer {

    public MyConsumer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        //consumerTag: 内部生成的消费标签  properties: 消息属性  body: 消息内容  
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        //envelope包含属性:deliveryTag(标签), redeliver, exchange, routingKey
        //redeliver是一个标记,若是设为true,表示消息以前可能已经投递过了,如今是从新投递消息到监听队列的消费者
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
    }
}

 

public class Consumer {
    public static void main(String[] args) throws Exception {
        //1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);
        //2 获取Connection
        Connection connection = connectionFactory.newConnection();
        //3 经过Connection建立一个新的Channel
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_consumer_exchange";
        String routingKey = "consumer.#";
        String queueName = "test_consumer_queue";
        //4 声明交换机和队列,而后进行绑定设置路由Key
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //5 设置channel,使用自定义消费者
        channel.basicConsume(queueName, true, new MyConsumer(channel));
    }
}

 

运行说明api

先启动消费端,访问管控台:http://ip:15672,检查Exchange和Queue是否设置OK,而后启动生产端。消费端打印内容以下服务器

 

2.消费端的限流策略

2.1 限流的场景与机制

  • 假设一个场景,咱们Rabbitmq服务器有上万条未处理的消息,咱们随便打开一个消费者客户端,会出现这种状况:巨量的消息瞬间所有推送过来,可是咱们单个客户端没法同时处理这么多数据!此时颇有可能致使服务器崩溃,严重的可能致使线上的故障。
  • 除了这种场景,还有一些其余的场景,好比说单个生产者一分钟生产出了几百条数据,可是单个消费者一分钟可能只能处理60条数据,这个时候生产端和消费端确定是不平衡的。一般生产端是没办法作限制的。因此消费端确定须要作一些限流措施,不然若是超出最大负载,可能致使消费端性能降低,服务器卡顿甚至崩溃等一系列严重后果。

 

消费端限流机制ide

RabbitMQ提供了一种qos (服务质量保证)功能,即在非自动确认消息的前提下,若是必定数目的消息 (经过基于consume或者channel设置Qos的值) 未被确认前,不进行消费新的消息。性能

须要注意:测试

1.不能设置自动签收功能(autoAck = false)fetch

2.若是消息没被确认,就不会到达消费端,目的就是给消费端减压ui

 

2.2 限流相关API

限流设置 - BasicQos()this

void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
prefetchSize: 单条消息的大小限制,消费端一般设置为0,表示不作限制
prefetchCount: 一次最多能处理多少条消息,一般设置为1
global: 是否将上面设置应用于channel,false表明consumer级别日志

注意事项

prefetchSizeglobal这两项,rabbitmq没有实现,暂且不研究
prefetchCountautoAck=false 的状况下生效,即在自动应答的状况下这个值是不生效的
 
手工ACK - basicAck()

void basicAck(Integer deliveryTag,boolean multiple)
手工ACK,调用这个方法就会主动回送给Broker一个应答,表示这条消息我处理完了,你能够给我下一条了。参数multiple表示是否批量签收,因为咱们是一次处理一条消息,因此设置为false

 

2.3 限流演示

生产端

生产端就是正常的逻辑

public class Producer {
    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String exchange = "test_qos_exchange";
        String routingKey = "qos.save";

        String msg = "Hello RabbitMQ QOS Message";
        // 发送消息
        for (int i = 0; i < 5; i++) {
            channel.basicPublish(exchange, routingKey, true, null,
                    msg.getBytes());
        }
    }
}

 

自定义消费者

为了看到限流效果,这里不进行ACK

public class MyConsumer extends DefaultConsumer {

    //接收channel
    private Channel channel ;
    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        //System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
        //手工ACK,参数multiple表示不批量签收
        //channel.basicAck(envelope.getDeliveryTag(), false);   
    }
}

 

消费端

关闭autoACK,进行限流设置

public class Consumer {

    public static void main(String[] args) throws Exception {
        //1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);
        //2 获取Connection
        Connection connection = connectionFactory.newConnection();
        //3 经过Connection建立一个新的Channel
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_qos_exchange";
        String queueName = "test_qos_queue";
        String routingKey = "qos.#";
        //4 声明交换机和队列,而后进行绑定设置路由Key
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //进行参数设置:单条消息的大小限制,一次最多能处理多少条消息,是否将上面设置应用于channel
        channel.basicQos(0, 1, false);
        
        //限流: autoAck设置为 false
        channel.basicConsume(queueName, false, new MyConsumer(channel));
    }
}

 

运行说明

咱们先注释掉手工ACK方法,而后启动消费端和生产端,此时消费端只打印了一条消息2019-06-09_163747

这是由于咱们设置了手工签收,而且设置了一次只处理一条消息,当咱们没有回送ack应答时,Broker端就认为消费端尚未处理完这条消息,基于这种限流机制就不会给消费端发送新的消息了,因此消费端只打印了一条消息。

经过管控台也能够看到队列总共收到了5条消息,有一条消息没有ack。

将手工签收代码取消注释,再次运行消费端,此时就会打印5条消息的内容。

 

3. 消费端ACK与重回队列机制

3.1 ACK与NACK

当咱们设置 autoACK=false 时,就可使用手工ACK方式了,那么其实手工方式包括了手工ACK与NACK。

当咱们手工 ACK 时,会发送给Broker一个应答,表明消息成功处理了,Broker就能够回送响应给生产端了。NACK 则表示消息处理失败了,若是设置重回队列,Broker端就会将没有成功处理的消息从新发送。

 

使用方式

  1. 消费端进行消费的时候,若是因为业务异常咱们能够手工 NACK 并进行日志的记录,而后进行补偿!
    方法:void basicNack(long deliveryTag, boolean multiple, boolean requeue)
  2. 若是因为服务器宕机等严重问题,那咱们就须要手工进行 ACK 保障消费端消费成功!
    方法:void basicAck(long deliveryTag, boolean multiple)

 

3.2 重回队列演示

  • 消费端重回队列是为了对没有处理成功的消息,把消息从新会递给Broker!
  • 重回队列,会把消费失败的消息从新添加到队列的尾端,供消费者继续消费。
  • 通常咱们在实际应用中,都会关闭重回队列,也就是设置为false

 

生产端

对消息设置自定义属性以便进行区分

public class Producer {

    public static void main(String[] args) throws Exception {
        //1 建立ConnectionFactorys
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);
        //2 获取Connection
        Connection connection = connectionFactory.newConnection();
        //3 经过Connection建立一个新的Channel
        Channel channel = connection.createChannel();
        
        String exchange = "test_ack_exchange";
        String routingKey = "ack.save";
        
        for(int i =0; i<5; i ++){
            //设置消息属性
            Map<String, Object> headers = new HashMap<String, Object>();
            headers.put("num", i);
            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)
                    .contentEncoding("UTF-8")
                    .headers(headers)
                    .build();
            //发送消息
            String msg = "Hello RabbitMQ ACK Message " + i;
            channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
        }   
    }
}

 

自定义消费

对第一条消息进行NACK,并设置重回队列

public class MyConsumer extends DefaultConsumer {

    private Channel channel ;

    public MyConsumer(Channel channel) {
        super(channel);
        this.channel = channel;
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("body: " + new String(body));
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        if((Integer)properties.getHeaders().get("num") == 0) {
            //NACK,参数三requeue:是否重回队列
            channel.basicNack(envelope.getDeliveryTag(), false, true);
        } else {
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    }
}

 

消费端

关闭自动签收功能

public class Consumer {
    
    public static void main(String[] args) throws Exception {
        
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);
        
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        
        String exchangeName = "test_ack_exchange";
        String queueName = "test_ack_queue";
        String routingKey = "ack.#";
        //声明交换机和队列,而后进行绑定设置路由Key
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);
        
        //手工签收 必需要设置 autoAck = false
        channel.basicConsume(queueName, false, new MyConsumer(channel));
    }
}

 

运行说明

先启动消费端,而后启动生产端,消费端打印以下,显然第一条消息因为咱们调用了NACK,而且设置了重回队列,因此会致使该条消息一直重复发送,消费端就会一直循环消费。

 

通常工做中不会设置重回队列这个属性,都是本身去作补偿或者投递到延迟队列里的,而后指定时间去处理便可。

 

4. TTL

TTL说明

  • TTL是Time To Live的缩写,也就是生存时间
  • RabbitMQ支持消息的过时时间,在消息发送时能够进行指定
  • RabbitMQ支持为每一个队列设置消息的超时时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除

 

TTL演示

此次演示咱们不写代码,只经过管控台进行操做,实际测试也会更为方便一些。
 

1. 建立Exchange

选择Exchange菜单,找到下面的Add a new exchange

 

2.建立Queue

选择Queue菜单,找到下面的Add a new queue

 

3.创建队列和交换机的绑定关系

点击Exchange表格中的test002_exchange,在下面添加绑定规则

 

4.发送消息

点击Exchange表格中的test002_exchange,在下面找到Publish message,设置消息进行发送

 

5.验证

点击Queue菜单,查看表格中test002已经有了一条消息,10秒后表格显示0条,说明过时时间到了消息被自动清除了。
14795543-2f85c29d91d33480

 

6.设置单条消息过时时间

点击Exchange表格中的test002_exchange,在下面找到Publish message,设置消息的过时时间并进行发送,此时观察test002队列,发现消息5s后就过时被清除了,即便队列设置的过时时间是10s。

 
TTL代码设置过时时间

AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .expiration("10000") //10s过时
                .build();
        //发送消息
        channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());

 

队列过时时间设置

//设置队列的过时时间10s
        Map<String,Object> param = new HashMap<>();
        param.put("x-message-ttl", 10000);
        //声明队列
        channel.queueDeclare(queueName, true, false, false, null);

 
注意事项

  1. 二者的区别是设置队列的过时时间是对该队列的全部消息生效的。
  2. 为消息设置TTL有一个问题:RabbitMQ只对处于队头的消息判断是否过时(即不会扫描队列),因此,极可能队列中已存在死消息,可是队列并不知情。这会影响队列统计数据的正确性,妨碍队列及时释放资源。

 

5.死信队列

死信队列介绍

  • 死信队列:DLX,dead-letter-exchange
  • 利用DLX,当消息在一个队列中变成死信 (dead message) 以后,它能被从新publish到另外一个Exchange,这个Exchange就是DLX

 

消息变成死信有如下几种状况

  • 消息被拒绝(basic.reject / basic.nack),而且requeue = false
  • 消息TTL过时
  • 队列达到最大长度

 

死信处理过程

  • DLX也是一个正常的Exchange,和通常的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
  • 当这个队列中有死信时,RabbitMQ就会自动的将这个消息从新发布到设置的Exchange上去,进而被路由到另外一个队列。
  • 能够监听这个队列中的消息作相应的处理。

 

死信队列设置

  1. 首先须要设置死信队列的exchange和queue,而后进行绑定:

  1. 而后须要有一个监听,去监听这个队列进行处理
  2. 而后咱们进行正常声明交换机、队列、绑定,只不过咱们须要在队列加上一个参数便可:arguments.put(" x-dead-letter-exchange","dlx.exchange");,这样消息在过时、requeue、 队列在达到最大长度时,消息就能够直接路由到死信队列!

 

死信队列演示

生产端

public class Producer {
    public static void main(String[] args) throws Exception {
        //1 建立ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);
        //2 获取Connection
        Connection connection = connectionFactory.newConnection();
        //3 经过Connection建立一个新的Channel
        Channel channel = connection.createChannel();
        
        String exchange = "test_dlx_exchange";
        String routingKey = "dlx.save";
        
        String msg = "Hello RabbitMQ DLX Message";
        
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .deliveryMode(2)
                .contentEncoding("UTF-8")
                .expiration("10000")
                .build();
        //发送消息
        channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
    }
}

 

自定义消费者

public class MyConsumer extends DefaultConsumer {

    public MyConsumer(Channel channel) {
        super(channel);
    }

    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.err.println("-----------consume message----------");
        System.err.println("consumerTag: " + consumerTag);
        System.err.println("envelope: " + envelope);
        System.err.println("properties: " + properties);
        System.err.println("body: " + new String(body));
    }
}

 

消费端

  • 声明正常处理消息的交换机、队列及绑定规则
  • 在正常交换机上指定死信发送的Exchange
  • 声明死信交换机、队列及绑定规则
  • 监听死信队列,进行后续处理,这里省略
public class Consumer {
    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.244.11");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHandshakeTimeout(20000);

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        // 声明一个普通的交换机 和 队列 以及路由
        String exchangeName = "test_dlx_exchange";
        String routingKey = "dlx.#";
        String queueName = "test_dlx_queue";
        String deadQueueName = "dlx.queue";
        
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        // 指定死信发送的Exchange
        Map<String, Object> agruments = new HashMap<String, Object>();
        agruments.put("x-dead-letter-exchange", "dlx.exchange");
        // 这个agruments属性,要设置到声明队列上
        channel.queueDeclare(queueName, true, false, false, agruments);
        channel.queueBind(queueName, exchangeName, routingKey);

        // 要进行死信队列的声明
        channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
        channel.queueDeclare(deadQueueName, true, false, false, null);
        channel.queueBind(deadQueueName, "dlx.exchange", "#");

        channel.basicConsume(queueName, true, new MyConsumer(channel));
        //channel.basicConsume(deadQueueName, true, new MyConsumer(channel));

    }
}

 

运行说明

启动消费端,此时查看管控台,新增了两个Exchange,两个Queue。在test_dlx_queue上咱们设置了DLX,也就表明死信消息会发送到指定的Exchange上,最终其实会路由到dlx.queue上。

此时关闭消费端,而后启动生产端,查看管控台队列的消息状况,test_dlx_queue的值为1,而dlx_queue的值为0。
10s后的队列结果如图,因为生产端发送消息时指定了消息的过时时间为10s,而此时没有消费端进行消费,消息便被路由到死信队列中。

实际环境咱们还须要对死信队列进行一个监听和处理,固然具体的处理逻辑和业务相关,这里只是简单演示死信队列是否生效。

相关文章
相关标签/搜索