消费端进行消费的时候,若是因为业务异常致使失败了,返回 NACK 达到最大重试次数,此时咱们能够进行日志的记录,而后手动 ACK 回去,最后对这个记录进行补偿。git
或者因为服务器宕机等严重问题,致使 ACK 和 NACK 都没有,那咱们就须要手工进行 ACK 保障消费端消费成功,再经过补偿机制补偿。github
消费端的重回队列 消费端的重回队列是为了对没有处理成功的消息,把消息从新递给 broker。可是在咱们的实际生产,通常都会关闭重回队列,api
代码地址: https://github.com/hmilyos/rabbitmqdemo.git rabbitmq-api 项目下
复制代码
生产端的代码基本没什么变化bash
@Slf4j
public class Procuder {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQConfig.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQConfig.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQConfig.RABBITMQ_DEFAULT_VIRTUAL_HOST);
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_ack_exchange";
String routingKey = "ack.save";
for(int i =0; i < 5; i++){
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("num", i);
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers)
.build();
String msg = "Hello RabbitMQ ACK Message " + i;
log.info("生产端发送:{}", msg);
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
}
}
}
复制代码
接着是消费端的代码服务器
注意看消费端的代码, autoack 必定要设置为 false,要否则不会生效的
复制代码
@Slf4j
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//1 建立ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitMQConfig.RABBITMQ_HOST);
connectionFactory.setPort(RabbitMQConfig.RABBITMQ_PORT);
connectionFactory.setVirtualHost(RabbitMQConfig.RABBITMQ_DEFAULT_VIRTUAL_HOST);
//2 获取C onnection
Connection connection = connectionFactory.newConnection();
//3 经过Connection建立一个新的Channel
Channel channel = connection.createChannel();
String exchangeName = "test_ack_exchange";
String queueName = "test_ack_queue";
String routingKey = "ack.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//使用自定义消费者
//1 手工签收 必需要关闭 autoAck = false
channel.basicConsume(queueName, false, new MyConsumer(channel));
log.info("消费端启动成功");
}
}
复制代码
消费端的具体消费代码:ide
/**
* 自定义消费者
*/
@Slf4j
public class MyConsumer extends DefaultConsumer {
private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, //消费者标签
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("------MyConsumer-----consume message----------");
log.info("body: " + new String(body));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if((Integer)properties.getHeaders().get("num") == 0) {
//是否为批量的,是否重回队列
channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}
复制代码
先启动消费端,再启动生产端 ui