7.RabbitMQ--消息确认机制(confirm)

RabbitMQ--消息确认机制(confirm)

Confirm模式

RabbitMQ为了解决生成者不知道消息是否真正到达broker这个问题,采用经过AMQP协议层面为咱们提供了事务机制方案,可是采用事务机制实现会下降RabbitMQ的消息吞吐量,那么有没有更加高效的解决方式呢?答案是采用Confirm模式。编程

producer端confirm模式的实现原理

  • 生产者将信道设置成confirm模式,一旦信道进入confirm模式,全部在该信道上面发布的消息都会被指派一个惟一的ID(从1开始),一旦消息被投递到全部匹配的队列以后,broker就会发送一个确认给生产者(包含消息的惟一ID),这就使得生产者知道消息已经正确到达目的队列了,若是消息和队列是可持久化的,那么确认消息会将消息写入磁盘以后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也能够设置basic.ack的multiple域,表示到这个序列号以前的全部消息都已经获得了处理。
  • confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就能够在等信道返回确认的同时继续发送下一条消息,当消息最终获得确认以后,生产者应用即可以经过回调方法来处理该确认消息,若是RabbitMQ由于自身内部错误致使消息丢失,就会发送一条nack消息,生产者应用程序一样能够在回调方法中处理该nack消息。
  • 在channel 被设置成 confirm 模式以后,全部被 publish 的后续消息都将被 confirm(即 ack) 或者被nack一次。可是没有对消息被 confirm 的快慢作任何保证,而且同一条消息不会既被 confirm又被nack 。

编程优化逻辑服务器

  对于固定消息体大小和线程数,若是消息持久化,生产者confirm(或者采用事务机制),消费者ack,那么对性能有很大的影响。消息持久化的优化没有太好方法,用更好的物理存储(SAS, SSD, RAID卡)总会带来改善。生产者confirm这一环节的优化则主要在于客户端程序的优化之上。概括起来,客户端实现生产者confirm有三种编程方式:异步

  • 普通confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。其实是一种串行confirm了。
  • 批量confirm模式:每发送一批消息后,调用waitForConfirms()方法,等待服务器端confirm。
  • 异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个方法。

第1种 
普通confirm模式最简单,publish一条消息后,等待服务器端confirm,若是服务端返回false或者超时时间内未返回,客户端进行消息重传。 
关键代码以下:ide

1 channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
2 if(!channel.waitForConfirms()){
3     System.out.println("send message failed.");
4 }

 

第二种
批量confirm模式稍微复杂一点,客户端程序须要按期(每隔多少秒)或者定量(达到多少条)或者两则结合起来publish消息,而后等待服务器端confirm, 相比普通confirm模式,批量极大提高confirm效率,可是问题在于一旦出现confirm返回false或者超时的状况时,客户端须要将这一批次的消息所有重发,这会带来明显的重复消息数量,而且,当消息常常丢失时,批量confirm性能应该是不升反降的。
关键代码:性能

1 channel.confirmSelect();
2 for(int i=0;i<batchCount;i++){
3     channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
4 }
5 if(!channel.waitForConfirms()){
6     System.out.println("send message failed.");
7 }

 

第三种
异步confirm模式的编程实现最复杂,Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Chanel发出的消息序号),咱们须要本身为每个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。实际上,SDK中的waitForConfirms()方法也是经过SortedSet维护消息序号的。
关键代码:优化

 1 SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
 2  channel.confirmSelect();
 3         channel.addConfirmListener(new ConfirmListener() {
 4             public void handleAck(long deliveryTag, boolean multiple) throws IOException {
 5                 if (multiple) {
 6                     confirmSet.headSet(deliveryTag + 1).clear();
 7                 } else {
 8                     confirmSet.remove(deliveryTag);
 9                 }
10             }
11             public void handleNack(long deliveryTag, boolean multiple) throws IOException {
12                 System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
13                 if (multiple) {
14                     confirmSet.headSet(deliveryTag + 1).clear();
15                 } else {
16                     confirmSet.remove(deliveryTag);
17                 }
18             }
19         });
20 
21         while (true) {
22             long nextSeqNo = channel.getNextPublishSeqNo();
23             channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
24             confirmSet.add(nextSeqNo);
25 }

消息确认(Consumer端)
为了保证消息从队列可靠地到达消费者,RabbitMQ提供消息确认机制(message acknowledgment)。消费者在声明队列时,能够指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,若是是持久化消息的话)中移去消息。不然,RabbitMQ会在队列中消息被消费后当即删除它。spa

采用消息确认机制后,只要令noAck=false,消费者就有足够的时间处理消息(任务),不用担忧处理消息过程当中消费者进程挂掉后消息丢失的问题,由于RabbitMQ会一直持有消息直到消费者显式调用basicAck为止。命令行

当noAck=false时,对于RabbitMQ服务器端而言,队列中的消息分红了两部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,可是尚未收到消费者ack信号的消息。若是服务器端一直没有收到消费者的ack信号,而且消费此消息的消费者已经断开链接,则服务器端会安排该消息从新进入队列,等待投递给下一个消费者(也可能仍是原来的那个消费者)。线程

RabbitMQ不会为未ack的消息设置超时时间,它判断此消息是否须要从新投递给消费者的惟一依据是消费该消息的消费者链接是否已经断开。这么设计的缘由是RabbitMQ容许消费者消费一条消息的时间能够好久好久。设计

RabbitMQ管理平台界面上能够看到当前队列中Ready状态和Unacknowledged状态的消息数,分别对应上文中的等待投递给消费者的消息数和已经投递给消费者可是未收到ack信号的消息数。也能够经过命令行来查看上述信息:rabbitmqctl list_queues name messages_ready messages_unacknowledged

代码示例:

 1  public static void main(String[] args) throws IOException {
 2         ConnectionFactory connectionFactory = RabbitConfig.getConnectionFactory();
 3         Connection connection = connectionFactory.createConnection();
 4         Channel channel = connection.createChannel(false);
 5         /**
 6          * 建立队列申明
 7          */
 8         boolean durable = true;
 9         channel.queueDeclare(RabbitConfig.QUEUE_TOPIC2, durable, false, false, null);
10 
11         /**
12          * 绑定队列到交换机
13          */
14         channel.queueBind(RabbitConfig.QUEUE_TOPIC2, EXCHANGE_TOPIC, "commodity.*");
15 
16 
17         /**
18          * 改变分发规则
19          */
20         channel.basicQos(1);
21         DefaultConsumer consumer = new DefaultConsumer(channel) {
22             @Override
23             public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
24                 super.handleDelivery(consumerTag, envelope, properties, body);
25                 System.out.println("[2] 接口数据 : " + new String(body, "utf-8"));
26                 try {
27                     Thread.sleep(200);
28                 } catch (InterruptedException e) {
29                     e.printStackTrace();
30                 } finally {
31                     System.out.println("[2] done! ");
32                     //消息应答:手动回执,手动确认消息
33                     channel.basicAck(envelope.getDeliveryTag(), false);
34                 }
35             }
36         };
37         //监听队列
38         /**
39          * autoAck 消息应答
40          *  默认轮询分发打开:true :这种模式一旦rabbitmq将消息发送给消费者,就会从内存中删除该消息,不关心客户端是否消费正常。
41          *  使用公平分发须要关闭autoAck:false  须要手动发送回执
42          */
43         boolean autoAck = false;
44         channel.basicConsume(RabbitConfig.QUEUE_TOPIC2, autoAck, consumer);
45     }

broker将在下面的状况中对消息进行confirm:

  • broker发现当前消息没法被路由到指定的queues中(若是设置了mandatory属性,则broker会发送basic.return)
  • 非持久属性的消息到达了其所应该到达的全部queue中(和镜像queue中)
  • 持久消息到达了其所应该到达的全部queue中(和镜像中),并被持久化到了磁盘(fsync)
  • 持久消息从其所在的全部queue中被consume了(若是必要则会被ack)

basicRecover:是路由不成功的消息可使用recovery从新发送到队列中。 basicReject:是接收端告诉服务器这个消息我拒绝接收,不处理,能够设置是否放回到队列中仍是丢掉,并且只能一次拒绝一个消息,官网中有明确说明不能批量拒绝消息,为解决批量拒绝消息才有了basicNack。 basicNack:能够一次拒绝N条消息,客户端能够设置basicNack方法的multiple参数为true,服务器会拒绝指定了delivery_tag的全部未确认的消息(tag是一个64位的long值,最大值是9223372036854775807)。

相关文章
相关标签/搜索