RabbitMQ为了解决生成者不知道消息是否真正到达broker这个问题,采用经过AMQP协议层面为咱们提供了事务机制方案,可是采用事务机制实现会下降RabbitMQ的消息吞吐量,那么有没有更加高效的解决方式呢?答案是采用Confirm模式。编程
编程优化逻辑服务器
对于固定消息体大小和线程数,若是消息持久化,生产者confirm(或者采用事务机制),消费者ack,那么对性能有很大的影响。消息持久化的优化没有太好方法,用更好的物理存储(SAS, SSD, RAID卡)总会带来改善。生产者confirm这一环节的优化则主要在于客户端程序的优化之上。概括起来,客户端实现生产者confirm有三种编程方式:异步
第1种
普通confirm模式最简单,publish一条消息后,等待服务器端confirm,若是服务端返回false或者超时时间内未返回,客户端进行消息重传。
关键代码以下:ide
1 channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); 2 if(!channel.waitForConfirms()){ 3 System.out.println("send message failed."); 4 }
第二种
批量confirm模式稍微复杂一点,客户端程序须要按期(每隔多少秒)或者定量(达到多少条)或者两则结合起来publish消息,而后等待服务器端confirm, 相比普通confirm模式,批量极大提高confirm效率,可是问题在于一旦出现confirm返回false或者超时的状况时,客户端须要将这一批次的消息所有重发,这会带来明显的重复消息数量,而且,当消息常常丢失时,批量confirm性能应该是不升反降的。
关键代码:性能
1 channel.confirmSelect(); 2 for(int i=0;i<batchCount;i++){ 3 channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); 4 } 5 if(!channel.waitForConfirms()){ 6 System.out.println("send message failed."); 7 }
第三种
异步confirm模式的编程实现最复杂,Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Chanel发出的消息序号),咱们须要本身为每个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。实际上,SDK中的waitForConfirms()方法也是经过SortedSet维护消息序号的。
关键代码:优化
1 SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); 2 channel.confirmSelect(); 3 channel.addConfirmListener(new ConfirmListener() { 4 public void handleAck(long deliveryTag, boolean multiple) throws IOException { 5 if (multiple) { 6 confirmSet.headSet(deliveryTag + 1).clear(); 7 } else { 8 confirmSet.remove(deliveryTag); 9 } 10 } 11 public void handleNack(long deliveryTag, boolean multiple) throws IOException { 12 System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple); 13 if (multiple) { 14 confirmSet.headSet(deliveryTag + 1).clear(); 15 } else { 16 confirmSet.remove(deliveryTag); 17 } 18 } 19 }); 20 21 while (true) { 22 long nextSeqNo = channel.getNextPublishSeqNo(); 23 channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); 24 confirmSet.add(nextSeqNo); 25 }
消息确认(Consumer端)
为了保证消息从队列可靠地到达消费者,RabbitMQ提供消息确认机制(message acknowledgment)。消费者在声明队列时,能够指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,若是是持久化消息的话)中移去消息。不然,RabbitMQ会在队列中消息被消费后当即删除它。spa
采用消息确认机制后,只要令noAck=false,消费者就有足够的时间处理消息(任务),不用担忧处理消息过程当中消费者进程挂掉后消息丢失的问题,由于RabbitMQ会一直持有消息直到消费者显式调用basicAck为止。命令行
当noAck=false时,对于RabbitMQ服务器端而言,队列中的消息分红了两部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,可是尚未收到消费者ack信号的消息。若是服务器端一直没有收到消费者的ack信号,而且消费此消息的消费者已经断开链接,则服务器端会安排该消息从新进入队列,等待投递给下一个消费者(也可能仍是原来的那个消费者)。线程
RabbitMQ不会为未ack的消息设置超时时间,它判断此消息是否须要从新投递给消费者的惟一依据是消费该消息的消费者链接是否已经断开。这么设计的缘由是RabbitMQ容许消费者消费一条消息的时间能够好久好久。设计
RabbitMQ管理平台界面上能够看到当前队列中Ready状态和Unacknowledged状态的消息数,分别对应上文中的等待投递给消费者的消息数和已经投递给消费者可是未收到ack信号的消息数。也能够经过命令行来查看上述信息:rabbitmqctl list_queues name messages_ready messages_unacknowledged
代码示例:
1 public static void main(String[] args) throws IOException { 2 ConnectionFactory connectionFactory = RabbitConfig.getConnectionFactory(); 3 Connection connection = connectionFactory.createConnection(); 4 Channel channel = connection.createChannel(false); 5 /** 6 * 建立队列申明 7 */ 8 boolean durable = true; 9 channel.queueDeclare(RabbitConfig.QUEUE_TOPIC2, durable, false, false, null); 10 11 /** 12 * 绑定队列到交换机 13 */ 14 channel.queueBind(RabbitConfig.QUEUE_TOPIC2, EXCHANGE_TOPIC, "commodity.*"); 15 16 17 /** 18 * 改变分发规则 19 */ 20 channel.basicQos(1); 21 DefaultConsumer consumer = new DefaultConsumer(channel) { 22 @Override 23 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 24 super.handleDelivery(consumerTag, envelope, properties, body); 25 System.out.println("[2] 接口数据 : " + new String(body, "utf-8")); 26 try { 27 Thread.sleep(200); 28 } catch (InterruptedException e) { 29 e.printStackTrace(); 30 } finally { 31 System.out.println("[2] done! "); 32 //消息应答:手动回执,手动确认消息 33 channel.basicAck(envelope.getDeliveryTag(), false); 34 } 35 } 36 }; 37 //监听队列 38 /** 39 * autoAck 消息应答 40 * 默认轮询分发打开:true :这种模式一旦rabbitmq将消息发送给消费者,就会从内存中删除该消息,不关心客户端是否消费正常。 41 * 使用公平分发须要关闭autoAck:false 须要手动发送回执 42 */ 43 boolean autoAck = false; 44 channel.basicConsume(RabbitConfig.QUEUE_TOPIC2, autoAck, consumer); 45 }
broker将在下面的状况中对消息进行confirm:
basicRecover:是路由不成功的消息可使用recovery从新发送到队列中。 basicReject:是接收端告诉服务器这个消息我拒绝接收,不处理,能够设置是否放回到队列中仍是丢掉,并且只能一次拒绝一个消息,官网中有明确说明不能批量拒绝消息,为解决批量拒绝消息才有了basicNack。 basicNack:能够一次拒绝N条消息,客户端能够设置basicNack方法的multiple参数为true,服务器会拒绝指定了delivery_tag的全部未确认的消息(tag是一个64位的long值,最大值是9223372036854775807)。