假设有个场景,RabbitMQ服务器上堆积上万条未处理的消息,咱们随便打开一个消费者客户端会出现下面状况:巨量的消息同时推送过来,可是咱们单个消费者客户端没法同时处理这么多数据,服务器可能卡死java
RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的状况下,若是必定数量的消息(经过基于consumer或者channel设置qos值)未被确认前,不消费新的消息服务器
在消费端:ide
// 单条消息的大小限制,通常设为0或不设置,不限制大小 int prefecthSize = 0; // 告诉RabbitMQ不要同时给消费端推送n条消息,一旦有n个消息还没ack,则该consumer将block掉,直到有ack;注意在自动应答下不生效 int prefecthCount = 1; // 表示是否应用于channel上,便是channel级别仍是consumer级别 boolean global = false; channel.basicQos(prefecthSize, prefecthCount, global);
producerspa
package com.wyg.rabbitmq.javaclient.consumer_limit; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 消费端限流 * * @author wyg0405@gmail.com * @date 2019-11-22 13:25 * @since JDK1.8 * @version V1.0 */ public class Producer { private static final String HOST = "localhost"; private static final int PORT = 5672; private static final String USERNAME = "guset"; private static final String PASSWORD = "guset"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setVirtualHost("/"); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_qos_exchange"; String routingKey = "qos.abc"; // 申明exchange channel.exchangeDeclare(exchangeName, "topic"); // 发送十条消息 for (int i = 0; i < 10; i++) { String msg = "这是一条 消费端限流消息," + i; channel.basicPublish(exchangeName, routingKey, false, null, msg.getBytes("UTF-8")); } channel.close(); connection.close(); } }
consumercode
package com.wyg.rabbitmq.javaclient.consumer_limit; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.*; /** * 消费端限流 * * @author wyg0405@gmail.com * @date 2019-11-22 14:07 * @since JDK1.8 * @version V1.0 */ public class Consumer { private static final String HOST = "localhost"; private static final int PORT = 5672; private static final String USERNAME = "guset"; private static final String PASSWORD = "guset"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setVirtualHost("/"); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String queueName = "test_qos_queue"; String exchangeName = "test_qos_exchange"; String routingKey = "qos.#"; // 申明exchange channel.exchangeDeclare(exchangeName, "topic"); // 申明队列 channel.queueDeclare(queueName, true, false, false, null); // 队列绑定到exchange channel.queueBind(queueName, exchangeName, routingKey, null); // 单条消息的大小限制,通常设为0或不设置,不限制大小 int prefecthSize = 0; // 告诉RabbitMQ不要同时给消费端推送n条消息,一旦有n个消息还没ack,则该consumer将block掉,直到有ack;注意在自动应答下不生效 int prefecthCount = 1; // 表示是否应用于channel上,便是channel级别仍是consumer级别 boolean global = false; channel.basicQos(prefecthSize, prefecthCount, global); DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { try { System.out.println("---消费者---"); System.out.println(new String(message.getBody(), "UTF-8")); } finally { // consumer手动 ack 给broker channel.basicAck(message.getEnvelope().getDeliveryTag(), false); } } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println("---消费者:cancelCallback"); } }; // 消费消息,autoAck必定要设置为false channel.basicConsume(queueName, false, deliverCallback, cancelCallback); } }
注意
blog
第一次咱们注释掉 手动 ack给RabbitMQ应答rabbitmq
运行结果:队列
发现一直卡在第一条消息,由于未给RabbitMQ手动应答,因此RabbitMQ认为消费端还未消费完,不推送新的消息
第二次开启手动应答rem
运行结果:get
全部消息依次消费