目录java
consumer.nextDelivery
方法进行获取下一条消息,而后进行消费处理!DefaultConsumer
类,重写 handleDelivery
方法便可
public class Producer { public static void main(String[] args) throws Exception { //1 建立ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //2 获取Connection Connection connection = connectionFactory.newConnection(); //3 经过Connection建立一个新的Channel Channel channel = connection.createChannel(); String exchange = "test_consumer_exchange"; String routingKey = "consumer.save"; String msg = "Hello RabbitMQ Consumer Message"; //4 发送消息 for(int i =0; i<5; i ++){ channel.basicPublish(exchange, routingKey, true, null, msg.getBytes()); } } }
public class MyConsumer extends DefaultConsumer { public MyConsumer(Channel channel) { super(channel); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //consumerTag: 内部生成的消费标签 properties: 消息属性 body: 消息内容 System.err.println("-----------consume message----------"); System.err.println("consumerTag: " + consumerTag); //envelope包含属性:deliveryTag(标签), redeliver, exchange, routingKey //redeliver是一个标记,若是设为true,表示消息以前可能已经投递过了,如今是从新投递消息到监听队列的消费者 System.err.println("envelope: " + envelope); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); } }
public class Consumer { public static void main(String[] args) throws Exception { //1 建立ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //2 获取Connection Connection connection = connectionFactory.newConnection(); //3 经过Connection建立一个新的Channel Channel channel = connection.createChannel(); String exchangeName = "test_consumer_exchange"; String routingKey = "consumer.#"; String queueName = "test_consumer_queue"; //4 声明交换机和队列,而后进行绑定设置路由Key channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //5 设置channel,使用自定义消费者 channel.basicConsume(queueName, true, new MyConsumer(channel)); } }
运行说明api
先启动消费端,访问管控台:http://ip:15672,检查Exchange和Queue是否设置OK,而后启动生产端。消费端打印内容以下服务器
消费端限流机制ide
RabbitMQ提供了一种qos
(服务质量保证)功能,即在非自动确认消息的前提下,若是必定数目的消息 (经过基于consume或者channel设置Qos的值) 未被确认前,不进行消费新的消息。性能
须要注意:测试
1.不能设置自动签收功能(autoAck = false)fetch
2.若是消息没被确认,就不会到达消费端,目的就是给消费端减压ui
限流设置 - BasicQos()this
void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
prefetchSize:
单条消息的大小限制,消费端一般设置为0,表示不作限制
prefetchCount:
一次最多能处理多少条消息,一般设置为1
global:
是否将上面设置应用于channel,false表明consumer级别日志
注意事项
prefetchSize
和global
这两项,rabbitmq没有实现,暂且不研究
prefetchCount
在 autoAck=false
的状况下生效,即在自动应答的状况下这个值是不生效的
手工ACK - basicAck()
void basicAck(Integer deliveryTag,boolean multiple)
手工ACK,调用这个方法就会主动回送给Broker一个应答,表示这条消息我处理完了,你能够给我下一条了。参数multiple
表示是否批量签收,因为咱们是一次处理一条消息,因此设置为false
生产端
生产端就是正常的逻辑
public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchange = "test_qos_exchange"; String routingKey = "qos.save"; String msg = "Hello RabbitMQ QOS Message"; // 发送消息 for (int i = 0; i < 5; i++) { channel.basicPublish(exchange, routingKey, true, null, msg.getBytes()); } } }
自定义消费者
为了看到限流效果,这里不进行ACK
public class MyConsumer extends DefaultConsumer { //接收channel private Channel channel ; public MyConsumer(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("consumerTag: " + consumerTag); System.err.println("envelope: " + envelope); //System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); //手工ACK,参数multiple表示不批量签收 //channel.basicAck(envelope.getDeliveryTag(), false); } }
消费端
关闭autoACK,进行限流设置
public class Consumer { public static void main(String[] args) throws Exception { //1 建立ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //2 获取Connection Connection connection = connectionFactory.newConnection(); //3 经过Connection建立一个新的Channel Channel channel = connection.createChannel(); String exchangeName = "test_qos_exchange"; String queueName = "test_qos_queue"; String routingKey = "qos.#"; //4 声明交换机和队列,而后进行绑定设置路由Key channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //进行参数设置:单条消息的大小限制,一次最多能处理多少条消息,是否将上面设置应用于channel channel.basicQos(0, 1, false); //限流: autoAck设置为 false channel.basicConsume(queueName, false, new MyConsumer(channel)); } }
运行说明
咱们先注释掉手工ACK方法,而后启动消费端和生产端,此时消费端只打印了一条消息
这是由于咱们设置了手工签收,而且设置了一次只处理一条消息,当咱们没有回送ack应答时,Broker端就认为消费端尚未处理完这条消息,基于这种限流机制就不会给消费端发送新的消息了,因此消费端只打印了一条消息。
经过管控台也能够看到队列总共收到了5条消息,有一条消息没有ack。
将手工签收代码取消注释,再次运行消费端,此时就会打印5条消息的内容。
当咱们设置 autoACK=false
时,就可使用手工ACK方式了,那么其实手工方式包括了手工ACK与NACK。
当咱们手工 ACK
时,会发送给Broker一个应答,表明消息成功处理了,Broker就能够回送响应给生产端了。NACK
则表示消息处理失败了,若是设置重回队列,Broker端就会将没有成功处理的消息从新发送。
使用方式
NACK
并进行日志的记录,而后进行补偿!void basicNack(long deliveryTag, boolean multiple, boolean requeue)
ACK
保障消费端消费成功!void basicAck(long deliveryTag, boolean multiple)
生产端
对消息设置自定义属性以便进行区分
public class Producer { public static void main(String[] args) throws Exception { //1 建立ConnectionFactorys ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //2 获取Connection Connection connection = connectionFactory.newConnection(); //3 经过Connection建立一个新的Channel Channel channel = connection.createChannel(); String exchange = "test_ack_exchange"; String routingKey = "ack.save"; for(int i =0; i<5; i ++){ //设置消息属性 Map<String, Object> headers = new HashMap<String, Object>(); headers.put("num", i); AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) .contentEncoding("UTF-8") .headers(headers) .build(); //发送消息 String msg = "Hello RabbitMQ ACK Message " + i; channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes()); } } }
自定义消费
对第一条消息进行NACK,并设置重回队列
public class MyConsumer extends DefaultConsumer { private Channel channel ; public MyConsumer(Channel channel) { super(channel); this.channel = channel; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("body: " + new String(body)); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } if((Integer)properties.getHeaders().get("num") == 0) { //NACK,参数三requeue:是否重回队列 channel.basicNack(envelope.getDeliveryTag(), false, true); } else { channel.basicAck(envelope.getDeliveryTag(), false); } } }
消费端
关闭自动签收功能
public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_ack_exchange"; String queueName = "test_ack_queue"; String routingKey = "ack.#"; //声明交换机和队列,而后进行绑定设置路由Key channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //手工签收 必需要设置 autoAck = false channel.basicConsume(queueName, false, new MyConsumer(channel)); } }
运行说明
先启动消费端,而后启动生产端,消费端打印以下,显然第一条消息因为咱们调用了NACK,而且设置了重回队列,因此会致使该条消息一直重复发送,消费端就会一直循环消费。
通常工做中不会设置重回队列这个属性,都是本身去作补偿或者投递到延迟队列里的,而后指定时间去处理便可。
TTL说明
Time To Live
的缩写,也就是生存时间
此次演示咱们不写代码,只经过管控台进行操做,实际测试也会更为方便一些。
选择Exchange菜单,找到下面的Add a new exchange
选择Queue菜单,找到下面的Add a new queue
点击Exchange表格中的test002_exchange
,在下面添加绑定规则
点击Exchange表格中的test002_exchange
,在下面找到Publish message
,设置消息进行发送
点击Queue菜单,查看表格中test002已经有了一条消息,10秒后表格显示0条,说明过时时间到了消息被自动清除了。
点击Exchange表格中的test002_exchange
,在下面找到Publish message
,设置消息的过时时间并进行发送,此时观察test002队列,发现消息5s后就过时被清除了,即便队列设置的过时时间是10s。
TTL代码设置过时时间
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .expiration("10000") //10s过时 .build(); //发送消息 channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
队列过时时间设置
//设置队列的过时时间10s Map<String,Object> param = new HashMap<>(); param.put("x-message-ttl", 10000); //声明队列 channel.queueDeclare(queueName, true, false, false, null);
注意事项
死信队列介绍
dead-letter-exchange
(dead message)
以后,它能被从新publish到另外一个Exchange,这个Exchange就是DLX
消息变成死信有如下几种状况
死信处理过程
死信队列设置
arguments.put(" x-dead-letter-exchange","dlx.exchange");
,这样消息在过时、requeue、 队列在达到最大长度时,消息就能够直接路由到死信队列!
生产端
public class Producer { public static void main(String[] args) throws Exception { //1 建立ConnectionFactory ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); //2 获取Connection Connection connection = connectionFactory.newConnection(); //3 经过Connection建立一个新的Channel Channel channel = connection.createChannel(); String exchange = "test_dlx_exchange"; String routingKey = "dlx.save"; String msg = "Hello RabbitMQ DLX Message"; AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() .deliveryMode(2) .contentEncoding("UTF-8") .expiration("10000") .build(); //发送消息 channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes()); } }
自定义消费者
public class MyConsumer extends DefaultConsumer { public MyConsumer(Channel channel) { super(channel); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("consumerTag: " + consumerTag); System.err.println("envelope: " + envelope); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); } }
消费端
public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.244.11"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setHandshakeTimeout(20000); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // 声明一个普通的交换机 和 队列 以及路由 String exchangeName = "test_dlx_exchange"; String routingKey = "dlx.#"; String queueName = "test_dlx_queue"; String deadQueueName = "dlx.queue"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); // 指定死信发送的Exchange Map<String, Object> agruments = new HashMap<String, Object>(); agruments.put("x-dead-letter-exchange", "dlx.exchange"); // 这个agruments属性,要设置到声明队列上 channel.queueDeclare(queueName, true, false, false, agruments); channel.queueBind(queueName, exchangeName, routingKey); // 要进行死信队列的声明 channel.exchangeDeclare("dlx.exchange", "topic", true, false, null); channel.queueDeclare(deadQueueName, true, false, false, null); channel.queueBind(deadQueueName, "dlx.exchange", "#"); channel.basicConsume(queueName, true, new MyConsumer(channel)); //channel.basicConsume(deadQueueName, true, new MyConsumer(channel)); } }
运行说明
启动消费端,此时查看管控台,新增了两个Exchange,两个Queue。在test_dlx_queue
上咱们设置了DLX,也就表明死信消息会发送到指定的Exchange上,最终其实会路由到dlx.queue
上。
此时关闭消费端,而后启动生产端,查看管控台队列的消息状况,test_dlx_queue
的值为1,而dlx_queue
的值为0。
10s后的队列结果如图,因为生产端发送消息时指定了消息的过时时间为10s,而此时没有消费端进行消费,消息便被路由到死信队列中。
实际环境咱们还须要对死信队列进行一个监听和处理,固然具体的处理逻辑和业务相关,这里只是简单演示死信队列是否生效。