ack
表示告知RabbitMQ
已经成功消费消息java
nack
表示告知RabbitMQ
消费端处理消息失败服务器
RabbitMQ
既收不到ack
也收不到nack
,此时消费端采用手工ack
,等消费端服务重启好后,RabbitMQ
回重发此未能消费成功的消息,保障消息消费成功消费端重回队列是为了对没有处理成功的消息,把消息从新递给Brokeride
通常咱们在实际应用中,都会关闭重回队列ui
package com.wyg.rabbitmq.javaclient.consumer_ack; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 消费者手工ack和nack * * @author wyg0405@gmail.com * @date 2019-11-22 13:25 * @since JDK1.8 * @version V1.0 */ public class Producer { private static final String HOST = "localhost"; private static final int PORT = 5672; private static final String USERNAME = "guset"; private static final String PASSWORD = "guset"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setVirtualHost("/"); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_ack_exchange"; String routingKey = "ack.abc"; // 申明exchange channel.exchangeDeclare(exchangeName, "topic"); for (int i = 0; i < 6; i++) { Map<String, Object> map = new HashMap<>(); map.put("num", i); AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2).contentEncoding("UTF-8").headers(map).build(); String msg = "这是第" + i + "条ack消息"; channel.basicPublish(exchangeName, routingKey, false, props, msg.getBytes("UTF-8")); } channel.close(); connection.close(); } }
package com.wyg.rabbitmq.javaclient.consumer_ack; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.*; /** * 消费者手工ack和nack * * @author wyg0405@gmail.com * @date 2019-11-22 14:07 * @since JDK1.8 * @version V1.0 */ public class Consumer { private static final String HOST = "localhost"; private static final int PORT = 5672; private static final String USERNAME = "guset"; private static final String PASSWORD = "guset"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setVirtualHost("/"); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String queueName = "test_ack_queue"; String exchangeName = "test_ack_exchange"; String routingKey = "ack.#"; // 申明exchange channel.exchangeDeclare(exchangeName, "topic"); // 申明队列 channel.queueDeclare(queueName, true, false, false, null); // 队列绑定到exchange channel.queueBind(queueName, exchangeName, routingKey, null); DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { // consumer手动 ack 给broker int num = (int)message.getProperties().getHeaders().get("num"); // 根据headers里的num作判断,num<3,发ack给broker,并将消息从新入队 if (num < 3) { System.out.println("---消费端nack---DeliveryTag:" + message.getEnvelope().getDeliveryTag() + "," + new String(message.getBody(), "UTF-8")); channel.basicAck(message.getEnvelope().getDeliveryTag(), false); } else { // 根据headers里的num作判断,num>=3,发nack给broker,并将消息从新入队 System.out.println("---消费端nack---DeliveryTag:" + message.getEnvelope().getDeliveryTag() + "," + new String(message.getBody(), "UTF-8")); channel.basicNack(message.getEnvelope().getDeliveryTag(), false, true); } } }; CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println("---消费者--:cancelCallback"); } }; // 消费消息,autoAck必定要设为false,手工ack channel.basicConsume(queueName, false, deliverCallback, cancelCallback); } }
发现前3条消息成功消费,手工发ack
给Brokerspa
最后3条消息,发nack
给Broker
,并不断重回队列尾端,broker再将其推给消费端,一直循环消费失败3d