RabbitMQ服务器崩了致使的消息数据丢失,已经持久化的消息数据咱们能够经过消息持久化来预防。可是,若是消息从生产者发送到vhosts过程当中出现了问题,持久化消息数据的方案就无效了。 RabbitMQ为咱们提供了两种解决方案:java
实现方法以及测试结果以下: 生产者一:服务器
package com.example.demo.queue.confirm.amqp; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.example.demo.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer01 { private static final String QUEUE_NAME = "message_confirm_ampq_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); String msg = "msg from producer:"; try { channel.txSelect(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("send msg : "+msg); channel.txCommit(); } catch (Exception e1) { channel.txRollback(); } channel.close(); connection.close(); } }
生产者二:异步
package com.example.demo.queue.confirm.amqp; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.example.demo.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer02 { private static final String QUEUE_NAME = "message_confirm_ampq_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); String msg = "msg from producer:"; try { channel.txSelect(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("send msg : "+msg); int k = 2/0;// 触发rollback事件 channel.txCommit(); } catch (Exception e1) { channel.txRollback(); } channel.close(); connection.close(); } }
消费者:ide
package com.example.demo.queue.confirm.amqp; import java.io.IOException; import com.example.demo.utils.ConnectionUtil; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Consumer01 { // 队列名称 private static final String QUEUE_NAME = "message_confirm_ampq_queue"; public static void main(String[] args) { try { // 获取链接 Connection connection = ConnectionUtil.getConnection(); // 建立通道 final Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 定义消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println("[1]:receive msg:"+msg); System.out.println("[1]:deal msg successful."); } }; // 接收信息 channel.basicConsume(QUEUE_NAME, true, consumer); } catch (Exception e) { e.printStackTrace(); } } }
下面咱们开始测试: 先运行测试类的main方法,接下来运行生产者1的main方法,结果以下: 再来运行运行生产者2的main方法,结果以下:
能够看到消费者只接收到了生产者1发送的消息,消息2的没有收到。至于消息有没发送到message broker,或者说 缘由就是:在发送者发送消息过程当中,执行性能
channel.txCommit();
以前,若是出现了什么问题,就行执行测试
channel.txRollback();
回滚事务。 可是事务影响性能比较严重,因此仍是建议使用方案二。spa
package com.example.demo.queue.confirm.confirm.single; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.example.demo.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer { private static final String QUEUE_NAME = "message_confirm_single_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); String msg = "Producer发出的信息:"; channel.confirmSelect(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("send msg : "+msg); if (channel.waitForConfirms()) { System.out.println("信息发送成功."); } else { System.out.println("信息发送失败."); } channel.close(); connection.close(); } }
confirm:批量发送消息后确认3d
package com.example.demo.queue.confirm.confirm.batch; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.example.demo.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Producer { private static final String QUEUE_NAME = "message_confirm_batch_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); String msg = "msg from producer:"; channel.confirmSelect(); for(int i=0;i<10;i++) { channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("send msg["+i+"] : "+msg); } if (channel.waitForConfirms()) { System.out.println("msg send successfully"); } else { System.out.println("msg send fail"); } channel.close(); connection.close(); } }
confirm:发送信息,异步确认code
package com.example.demo.queue.confirm.confirm.synch; import java.io.IOException; import java.util.Collections; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.TimeoutException; import com.example.demo.utils.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; public class Producer { private static final String QUEUE_NAME = "message_confirm_synch_queue"; public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.confirmSelect(); // 存放信息的序列化 SortedSet<Long> longTreeSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); // 添加监听器 channel.addConfirmListener(new ConfirmListener() { /** * 收到消费者已经处理完消息以后发出的反馈,触发该方法 */ @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("handleNack() deliveryTag="+deliveryTag+",multiple="+multiple); if(multiple) { longTreeSet.headSet(deliveryTag+1).clear(); } else { longTreeSet.remove(deliveryTag); } } /** * 长时间没收到消费者已经处理完消息以后发出的反馈,触发该方法 */ @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println("handleAck() deliveryTag="+deliveryTag+",multiple="+multiple); if(multiple) { longTreeSet.headSet(deliveryTag+1).clear(); } else { longTreeSet.remove(deliveryTag); } } }); while(true) { long seqNo = channel.getNextPublishSeqNo(); channel.basicPublish("", QUEUE_NAME, null, ("seqNo:"+seqNo).getBytes()); longTreeSet.add(seqNo); } } }