这是我参与8月更文挑战的第11天,活动详情查看:8月更文挑战java
RabbitMQ系列汇总:RabbitMQ系列编程
生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,全部在该信道上面发布的消息都将会被指派一个惟一的 ID(从 1 开始),一旦消息被投递到全部匹配的队列以后。安全
broker 就会发送一个确认给生产者(包含消息的惟一 ID),这就使得生产者知道消息已经正确到达目的队列了,若是消息和队列是可持久化的,那么确认消息会在将消息写入磁盘以后发出。markdown
broker 回传 给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也能够设置 basic.ack 的 multiple 域,表示到这个序列号以前的全部消息都已经获得了处理。并发
confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就能够在等信 道返回确认的同时继续发送下一条消息,当消息最终获得确认以后。dom
生产者应用即可以经过回调方法来处理该确认消息,若是 RabbitMQ 由于自身内部错误致使消息丢失,就会发送一条 nack 消息,生产者应用程序一样能够在回调方法中处理该 nack 消息。异步
开启发布确认方式函数
发布确认默认是没有开启的,若是要开启须要调用方法 confirmSelect,每当你要想使用发布 确认,都须要在 channel 上调用该方法高并发
//开启发布确认
channel.confirmSelect();
复制代码
这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息以后只有它 被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认 的时候才返回,若是在指定时间范围内这个消息没有被确认那么它将抛出异常。post
生产者
/** * 这是一个测试的生产者 *@author DingYongJun *@date 2021/8/1 */
public class DyProducerTest_dingyuefabu {
//设置执行次数
public static final int MESSAGE_COUNT = 888;
/** * 这里为了方便,咱们使用main函数来测试 * 纯属看你我的选择 * @param args */
public static void main(String[] args) throws Exception {
//单个发布确认执行
publishMessageIndividually();
}
/** * 单个发布确认 */
public static void publishMessageIndividually() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel.confirmSelect();
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
//服务端返回 false 或超时时间内未返回,生产者能够消息重发
boolean flag = channel.waitForConfirms();
if(flag){
System.out.println("消息发送成功");
}
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) +
"ms");
}
}
复制代码
执行结果
这种确认方式有一个最大的缺点就是:发布速度特别的慢,由于若是没有确认发布的消息就会 阻塞全部后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。固然对于某 些应用程序来讲这可能已经足够了。
固然,如今跟你说慢,你莫得感知,下面几种综合起来对比你就会发现他的效率有多低了!
与单个等待确认消息相比,先发布一批消息而后一块儿确承认以极大地 提升吞吐量。
生产者
/** * 批量发布确认 */
public static void publishMessageBatch() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
//队列名使用uuid来获取不重复的值,不须要本身再进行命名了。
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel.confirmSelect();
//批量确认消息大小
int batchSize = 88;
//未确认消息个数
int outstandingMessageCount = 0;
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", queueName, null, message.getBytes());
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
channel.waitForConfirms();//确认代码
outstandingMessageCount = 0;
}
}
//为了确保还有剩余没有确认消息 再次确认
if (outstandingMessageCount > 0) {
channel.waitForConfirms();
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个批量确认消息,耗时" + (end - begin) +
"ms");
}
复制代码
执行结果
缺点:当发生故障致使发布出现问题时,不知道是哪一个消息出现问题了,咱们必须将整个批处理保存在内存中,以记录重要的信息然后从新发布消息。
固然这种方案仍然是同步的,也同样阻塞消息的发布。
异步确认虽然编程逻辑比上两个要复杂,可是性价比最高,不管是可靠性仍是效率都没得说, 他是利用回调函数来达到消息可靠性传递的,这个中间件也是经过函数回调来保证是否投递成功, 下面就让咱们来详细讲解异步确认是怎么实现的。
生产者
/** * 异步发布确认 */
public static void publishMessageAsync() throws Exception {
try (Channel channel = RabbitMqUtils.getChannel()) {
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
//开启发布确认
channel.confirmSelect();
/** * 线程安全有序的一个哈希表,适用于高并发的状况 * 1.轻松的将序号与消息进行关联 * 2.轻松批量删除条目 只要给到序列号 * 3.支持并发访问 */
ConcurrentSkipListMap<Long, String> outstandingConfirms = new
ConcurrentSkipListMap<>();
/** * 确认收到消息的一个回调 * 1.消息序列号 * 2.true 能够确认小于等于当前序列号的消息 * false 确认当前序列号消息 */
ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
if (multiple) {
//返回的是小于等于当前序列号的未确认消息 是一个 map
ConcurrentNavigableMap<Long, String> confirmed =
outstandingConfirms.headMap(sequenceNumber, true);
//清除该部分未确认消息
confirmed.clear();
}else{
//只清除当前序列号的消息
outstandingConfirms.remove(sequenceNumber);
}
};
ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
String message = outstandingConfirms.get(sequenceNumber);
System.out.println("发布的消息"+message+"未被确认,序列号"+sequenceNumber);
};
/** * 添加一个异步确认的监听器 * 1.确认收到消息的回调 * 2.未收到消息的回调 */
channel.addConfirmListener(ackCallback, null);
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息" + i;
/** * channel.getNextPublishSeqNo()获取下一个消息的序列号 * 经过序列号与消息体进行一个关联 * 所有都是未确认的消息体 */
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", queueName, null, message.getBytes());
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "个异步确认消息,耗时" + (end - begin) +
"ms");
}
}
复制代码
执行结果
很容易看出,这种方式速度快的飞起呀!
如何处理未确认的消息?
单独发布消息
批量发布消息
异步处理
路漫漫其修远兮,吾必将上下求索~
若是你认为i博主写的不错!写做不易,请点赞、关注、评论给博主一个鼓励吧~hahah