rabbitmq学习记录(八)消息发布确认机制

RabbitMQ服务器崩了致使的消息数据丢失,已经持久化的消息数据咱们能够经过消息持久化来预防。可是,若是消息从生产者发送到vhosts过程当中出现了问题,持久化消息数据的方案就无效了。 RabbitMQ为咱们提供了两种解决方案:java

方案一:经过AMQP事务机制实现,这也是AMQP协议层面提供的解决方案;

实现方法以及测试结果以下: 生产者一:服务器

package com.example.demo.queue.confirm.amqp;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer01 {

	private static final String QUEUE_NAME = "message_confirm_ampq_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		String msg = "msg from producer:";
		try {
			channel.txSelect();
			channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
			System.out.println("send msg : "+msg);
			channel.txCommit();
		} catch (Exception e1) {
			channel.txRollback();
		}
		channel.close();
		connection.close();
	}
	
}

生产者二:异步

package com.example.demo.queue.confirm.amqp;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer02 {

	private static final String QUEUE_NAME = "message_confirm_ampq_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		String msg = "msg from producer:";
		try {
			channel.txSelect();
			channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
			System.out.println("send msg : "+msg);
			int k = 2/0;// 触发rollback事件
			channel.txCommit();
		} catch (Exception e1) {
			channel.txRollback();
		}
		channel.close();
		connection.close();
	}
	
}

消费者:ide

package com.example.demo.queue.confirm.amqp;

import java.io.IOException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Consumer01 {

	// 队列名称
	private static final String QUEUE_NAME = "message_confirm_ampq_queue";
	
	public static void main(String[] args) {
		try {
			// 获取链接
			Connection connection = ConnectionUtil.getConnection();
			// 建立通道
			final Channel channel = connection.createChannel();
			// 声明队列
			channel.queueDeclare(QUEUE_NAME, true, false, false, null);
			// 定义消费者
			DefaultConsumer consumer = new DefaultConsumer(channel) {
				@Override
				public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
					String msg = new String(body,"UTF-8");
					System.out.println("[1]:receive msg:"+msg);
					System.out.println("[1]:deal msg successful.");
				}
			};
			// 接收信息
			channel.basicConsume(QUEUE_NAME, true, consumer);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
}

下面咱们开始测试: 先运行测试类的main方法,接下来运行生产者1的main方法,结果以下: 再来运行运行生产者2的main方法,结果以下: 能够看到消费者只接收到了生产者1发送的消息,消息2的没有收到。至于消息有没发送到message broker,或者说 缘由就是:在发送者发送消息过程当中,执行性能

channel.txCommit();

以前,若是出现了什么问题,就行执行测试

channel.txRollback();

回滚事务。 可是事务影响性能比较严重,因此仍是建议使用方案二。spa

方案二:经过将channel设置成confirm模式来实现; confirm:发送单条消息后确认

package com.example.demo.queue.confirm.confirm.single;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {

	private static final String QUEUE_NAME = "message_confirm_single_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		String msg = "Producer发出的信息:";
		channel.confirmSelect();
		channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
		System.out.println("send msg : "+msg);
		if (channel.waitForConfirms()) {
			System.out.println("信息发送成功.");
		} else {
			System.out.println("信息发送失败.");
		}
		channel.close();
		connection.close();
	}
	
}

confirm:批量发送消息后确认3d

package com.example.demo.queue.confirm.confirm.batch;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {

	private static final String QUEUE_NAME = "message_confirm_batch_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		String msg = "msg from producer:";
		channel.confirmSelect();
		for(int i=0;i<10;i++) {
			channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
			System.out.println("send msg["+i+"] : "+msg);
		}
		if (channel.waitForConfirms()) {
			System.out.println("msg send successfully");
		} else {
			System.out.println("msg send fail");
		}
		channel.close();
		connection.close();
	}
	
}

confirm:发送信息,异步确认code

package com.example.demo.queue.confirm.confirm.synch;

import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;

import com.example.demo.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;

public class Producer {

	private static final String QUEUE_NAME = "message_confirm_synch_queue";
	
	public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
		Connection connection = ConnectionUtil.getConnection();
		Channel channel = connection.createChannel();
		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
		channel.confirmSelect();
		// 存放信息的序列化
		SortedSet<Long> longTreeSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
		// 添加监听器
		channel.addConfirmListener(new ConfirmListener() {
			
			/**
			 * 收到消费者已经处理完消息以后发出的反馈,触发该方法
			 */
			@Override
			public void handleNack(long deliveryTag, boolean multiple) throws IOException {
				System.out.println("handleNack() deliveryTag="+deliveryTag+",multiple="+multiple);
				if(multiple) {
					longTreeSet.headSet(deliveryTag+1).clear();
				} else {
					longTreeSet.remove(deliveryTag);
				}
			}
			
			/**
			 * 长时间没收到消费者已经处理完消息以后发出的反馈,触发该方法
			 */
			@Override
			public void handleAck(long deliveryTag, boolean multiple) throws IOException {
				System.out.println("handleAck() deliveryTag="+deliveryTag+",multiple="+multiple);
				if(multiple) {
					longTreeSet.headSet(deliveryTag+1).clear();
				} else {
					longTreeSet.remove(deliveryTag);
				}
			}
		});
		while(true) {
			long seqNo = channel.getNextPublishSeqNo();
			channel.basicPublish("", QUEUE_NAME, null, ("seqNo:"+seqNo).getBytes());
			longTreeSet.add(seqNo);
		}
	}
	
}
相关文章
相关标签/搜索