在使用消息机制时,咱们一般须要考虑如下几个问题:javascript
- 消息不能丢失
- 保证消息必定能投递到目的地
- 保证业务处理和消息发送/消费的一致性
本文以RabbitMQ为例,讨论如何解决以上问题。html
消息持久化
若是但愿RabbitMQ重启以后消息不丢失,那么须要对如下3种实体均配置持久化:java
- exchange
- queue
- message
声明exchange时设置持久化(durable = true
)而且不自动删除(autoDelete = false):数据库
boolean durable = true;
boolean autoDelete = false;
channel.exchangeDeclare("dlx", TOPIC, durable, autoDelete, null)
声明queue时设置持久化(durable = true
)而且不自动删除(autoDelete = false):json
boolean durable = true;
boolean autoDelete = false;
channel.queueDeclare("order-summary-queue", durable, false, autoDelete, queueArguments);
发送消息时经过设置deliveryMode=2
持久化消息:markdown
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.contentType("application/json")
.deliveryMode(2)
.priority(0)
.build();
channel.basicPublish("order", "order.created", false, properties, "sample-data".getBytes())
发送确认
有时,业务处理成功,消息也发了,可是咱们并不知道消息是否成功到达了rabbitmq,若是因为网络等缘由致使业务成功而消息发送失败,那么发送方将出现不一致的问题,此时可使用rabbitmq的发送确认功能,即要求rabbitmq显式告知咱们消息是否已成功发送。网络
首先须要在channel上设置ConfirmListener
:架构
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long seqNo, boolean multiple) {
if (multiple) {
logger.info(seqNo + "号及其之前的全部消息发送成功,当消息发送成功后执行相应逻辑,好比标记事件为已发送或者删除原来事件");
} else {
logger.info(seqNo + "号发送成功,当消息发送成功后执行相应逻辑,好比标记事件为已发送或者删除原来事件");
}
}
public void handleNack(long seqNo, boolean multiple) {
if (multiple) {
logger.info(seqNo + "号及其之前的全部消息发送失败,当消息发送失败后执行相应逻辑,好比重试或者标记事件发送失败");
} else {
logger.info(seqNo + "号发送失败,当消息发送失败后执行相应逻辑,好比重试或者标记事件发送失败");
}
}
});
而后在发送消息直线须要开启发送确认模式:app
//开启发送者确认
channel.confirmSelect();
而后发送消息:less
channel.basicPublish("order", "order.created", false, properties, "sample-data".getBytes());
当消息正常投递时,rabbitmq客户端将异步调用handleAck()
表示消息已经成功投递,此时程序能够自行处理投递成功以后的逻辑,好比在数据库中将消息设置为已发送
。当消息投递出现异常时,handleNack()
将被调用。
一般来说,发送端只须要保证消息可以发送到exchange便可,而无需关注消息是否被正确地投递到了某个queue,这个是rabbitmq和消息的接收方须要考虑的事情。基于此,若是rabbitmq找不到任何须要投递的queue,那么rabbitmq依然会ack给发送方,此时发送方能够认为消息已经正确投递,而很差用关系消息没有queue接收的问题。可是,对于rabbitmq而言,这种消息是须要记录下来的,不然rabbitmq将直接丢弃该消息。此时能够为exchange设置alternate-exchange
,即表示rabbitmq将把没法投递到任何queue的消息发送到alternate-exchange
指定的exchange中,一般来讲能够设置一个死信交换(DLX)。
事实上,对于exchange存在可是却找不到任何接收queue时,若是发送是设置了mandatory=true
,那么在消息被ack前将return给客户端,此时客户端能够建立一个ReturnListener
用于接收返回的消息:
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
logger.warn("消息没法正确投递,已返回。");
}
});
可是须要注意的是,在return以后,消息依然会被ack而不是nack,还不如不设置madatory呢,所以return有时并不见得有用。
须要注意的是,在发送消息时若是exchange不存在,rabbitmq直接丢弃该消息,而且不会ack或者nack操做,可是在Spring中,会nack。
综合起来,要完成发送方确认,须要作如下几个点:
- 设置ConfirmListener
- 经过
confirmSelect()
开启 - 为exchange设置
alternate-exchange
到DLX - 发送时没有必要设置mandotory
- 发送方将消息记录在数据库中,收到ack时在数据库中标记消息为
已发送
状态 - 若是收到reject或者因为网络缘由没有收到ack,那么消息状态不会改变,下次发送时再次发送,此时可能致使消息重复,解决重复问题请参考“保证至少一次投递,而且消费端幂”小节。
手动消费确认
有时,消息被正确投递到消费方,可是消费方处理失败,那么便会出现消费方的不一致问题。好比订单已建立
的消息发送到用户积分子系统中用于增长用户积分,可是积分消费法处理却都失败了,用户就会问:我购买了东西为何积分并无增长呢?
要解决这个问题,须要引入消费方确认,即只有消息被成功处理以后才告知rabbitmq以ack,不然告知rabbitmq以nack,此时的处理流程以下:
- 接收消息,不做ack,处理消息成功则ack,不成功nack
- 对于nack的消息,能够配置rabbitmq要么从新投递,要么直接扔掉,要么传到死信交换(DLX)
- 若是处理成功,可是因为网络等问题致使确认(不管是ack仍是nack)不成功,那么rabbitmq会从新投递消息,可是此时因为消息已经成功,从新投递便致使了消费重复的消息,此时请参考“保证至少一次投递,而且消费端幂”小节。
在rabbitmq中,消息默认是字段ack的,即消息到达消费方当即ack,而无论消费方业务处理是否成功,为此能够开启手动确认模式,即有消费方自行决定什么时候应该ack,经过设置autoAck=false
开启手动确认模式:
boolean autoAck = false;
channel.basicConsume("order-summary-queue", autoAck,
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
long deliveryTag = envelope.getDeliveryTag();
if (success()) {
logger.info("成功消费消息" + deliveryTag);
channel.basicAck(deliveryTag, false);
} else {
if (!envelope.isRedeliver()) {
logger.warn("首次消费消息" + deliveryTag + "不成功,尝试重试");
boolean requeue = true;
channel.basicNack(deliveryTag, false, requeue);
} else {
logger.warn("第二次消费消息" + deliveryTag + "不成功,扔到DLX");
boolean requeue = false;
channel.basicNack(deliveryTag, false, requeue);
}
}
}
});
能够看到,在autoAck=false
状况下,经过业务处理的是否成功(success()
)来判断应该ack仍是nack。
另外,为了不消息反复requeue的状况,若是消息第一次消费不成功,则在nack时设置requeue=true
,表示告知rabbitmq将reject的消息从新投递,若是第二次消费依然不成功,那么nack时设置requeue=false
,告知rabbitmq不要从新投递了,此时rabbitmq将根据本身的配置要么直接扔掉消息,要么将消息发送到DLX中,具体配置请参考“设置死信交换(DLX)和死信队列(DLQ)”。
保证至少一次投递,而且消费端幂等
一般来讲,程序中会先完成写数据库的操做,而后发送消息,此时一个重要的点是保证这二者的一致性,即一旦数据库保存成功消息必须也可以发送成功。要保证发送发一致性,一种作法是使用全局事务,即将数据库操做和消息发送放到一个事务中,好比JTA,可是全局事务是很重的,而且rabbitmq目前并不支持全局事务。
要解决发送发的一致性问题,能够实现将消息保存到数据库的事件表中,此时业务处理的数据库操做和保存消息到数据库属于同一个本地数据库事务,那么到此能够保证业务处理和消息产生的原子性,而后有一个异步的后台任务从数据库的事件表中一次读取未发送的消息发送至rabbitmq,发送成功后更新消息的状态为已发布
。
然而,此时咱们依然没法保证发送消息和更新消息状态之间的原子性,由于可能发生消息发送成功可是数据库状态更新不成功的状况,为了解决这种极端状况,能够屡次重试消息发送,步骤以下:
- 读取时间表中未发送消息,发送到rabbitmq
- 若是发送成功,事件表中消息状态也更新成功,皆大欢喜
- 若是消息发送不成功,那么消息状态也不做改变,下次重试
- 若是消息发送成功而状态更新不成功,下次重试
不断重试,总有一个可以达到发送消息和状态更新的原子性。
那么问题也来了:rabbitmq中可能出现多条重复消息,此时消费端就懵了。为了解决这个问题,消费方应该设计为幂等的,即对相同消息的屡次消费与单次消费结果相同。有些消费方的业务逻辑自己即是幂等的,而对于自己不幂等的消费方,须要在数据库中记录已经被正确消费的消息,当重复消息来时,判断该消息是否已经被消费,若是没有则执行消费逻辑,若是已经消费则直接忽略。此时消费方的处理步骤以下:
- 接收到消息,判断消息是否已经消费,若是是,则直接忽略,此时已然须要作消费成功确认
- 若是消息还未被消费,则处理业务逻辑,记录消息,业务逻辑自己和记录消息在同一个数据库事务中,若是都成功,则皆大欢喜;若是失败,那么消费方业务回滚,消息也不记录,此时reject消息,等下次重发
设置消息的TTL和消息队列的max-length
为了保证消息的时效性,能够设置队列中消息的TTL(x-message-ttl
),而为了保证消息队列不至于太大而影响性能,能够设置队列的最大消息数(x-max-length
)。在建立队列时设置以下:
ImmutableMap<String, Object> orderSummaryQueueArguments = of(
"x-max-length",
300,
"x-message-ttl",
24 * 60 * 60 * 1000);
channel.queueDeclare("order-summary-queue", true, false, false, orderSummaryQueueArguments);
设置死信交换(DLX)和死信队列(DLQ)
对于没法投递的消息,咱们须要将其记录下来便于后续跟踪排查,此时能够将这样的消息放入DLX和DLQ中。默认状况下,queue中被抛弃的消息将被直接丢掉,可是能够经过设置queue的x-dead-letter-exchange
参数,将被抛弃的消息发送到x-dead-letter-exchange
作指定的exchange中,这样的exchange成为DLX。
设置了x-dead-letter-exchange
以后,在如下三种状况下消息将被扔到DLX中:
- 消费方nack时指定了
requeue=false
- 消息的TTL已到
- 消息队列的max-length已到
在声明queue时定义x-dead-letter-exchange
:
ImmutableMap<String, Object> orderNotificationQueueArguments = of("x-dead-letter-exchange", "dlx");
channel.queueDeclare("order-notification-queue", true, false, false, orderNotificationQueueArguments);
- 设置DLQ为lazy,而且没有TTL,而且没有max-length
在如下3种状况下,消息会被投递到DLX中:
须要注意的是,在发送消息时,当已经达到queue的上限,而当queue定义为x-overflow=reject-publish时,rabbitmq将nack。当有多个queue同时绑定到exchange时,若是有些queue设置了reject-publish,而有些却没有,那么依然会nack,这对发送方来讲很差处理。所以,仍是那句话,发送方只须要保证正确地投递到了exchange便可,而不用关系exchange后面有哪些queue。
设置Prefetch count
Prefetch count表示消费方一次性从rabbitmq读取的消息数量,若是设置过大,那么消费方可能始终处于高负荷运转状态,而若是过小又会增长网络开销,一般设置为20-50。另外,有时为了保证多个消费方均衡地分摊消息处理任务,一般设置prefetch count为1。
异常处理
在以上设置的状况下,咱们来看看当各类异常发生时,rabbitmq是如何运做的:
- broker不可达:直接抛出异常;
- 发送方本身始终发送不出去:消息状态始终处于“未发送”,不会破坏一致性,可是对于事件表中累计太多的事件须要关注;
- exchange不存在:消息被丢掉,rabbitmq不会ack,消息状态始终处于“未发送”,下次将从新发送,不会破坏一致性,可是当exchange持续不存在下去,那么事件表中事件也会累计太多;
- exchange存在可是没有接受queue:消息将被ack并标记为“已发送”,但因为设置了alternative exchange为dlx,那么消息将发送到dlx对应的dlq中保存以便后续处理;
- consumer不在线,而累积消息太多:消息一致性没有问题,可是当累计到了max-length上限,消息队列头部的消息将被放置dlq中以便后续处理;
- consumer临时性失败:经过redelivered判断是否为重复投递,若是是,则nack而且requeue=false,表示若是重复投递的一次的消息若是再失败,那么直接扔到dlx中,也即消息最多重复投递一次;
- consumer始终失败:全部消息均被投入dlq以便后续处理,此时可能须要关注dlq的长度是否太长。
路由策略
系统中每每会发布多种类型的消息,在发送时有几种路由策略:
- 全部类型的消息都发送到同一个exchange中
- 每种类型的消息都单独配置一个exchange
- 对消息类型进行归类,同一类型的消息对应一个exchange
笔者建议采用最后一种,而且结合DDD中的聚合划分,路由策略建议以下:
每个聚合根下发布的全部类型的事件对应一个exchange,exchange设置为topic,queue能够配置接收某一种类型的事件,也能够配置接收全部某种聚合相关的事件,还能够配置接收全部事件。
案例
假设有个订单(Order)系统,用户下单后须要向用户发送短信通知,而全部对订单的数据显示采用了CQRS架构,即将订单的读模型和写模型分离,即全部订单的更新都经过事件发到rabbitmq,而后专门有个consumer接收这些消息用于更新订单的读模型。
订单相关有两个事件:order.created和order.updated,全部与订单相关的事件都发布到同一个 topic exchange中,exchange名为“order",设置短信通知queue(order-notification-queue)只接收order.created消息,由于只有订单在新建时才会发出通知,即order-notification-queue的routing key为order.created
,设置读模型的queue(order-summary-queue)接收全部与Order相关的消息,即配置order-summary-queue的routing key为order.#
,示例代码以下:
package com.ecommerce.order.spike.rabbitmq;
import com.ecommerce.order.common.logging.AutoNamingLoggerFactory;
import com.google.common.collect.ImmutableMap;
import com.rabbitmq.client.*;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import static com.google.common.collect.ImmutableMap.of;
import static com.rabbitmq.client.BuiltinExchangeType.TOPIC;
public class RabbitMQSender {
private static final Logger logger = AutoNamingLoggerFactory.getLogger();
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("rabbitmq-user");
factory.setPassword("rabbitmq-password");
factory.setVirtualHost("/");
factory.setPort(5672);
try (Connection conn = factory.newConnection(); Channel channel = conn.createChannel();) {
//设置死信交换,Topic类型,持久化
channel.exchangeDeclare("dlx", TOPIC, true, false, null);
//设置死信队列,持久化,lazy型
channel.queueDeclare("dlq", true, false, false, of("x-queue-mode", "lazy"));
//接收全部发给dlx的消息,另外能够定义其余queue接收指定类型的消息
channel.queueBind("dlq", "dlx", "#");
//定义与order相关的事件exchange,若是没法路由,则路由到死信交换dlx
channel.exchangeDeclare("order", TOPIC, true, false, of("alternate-exchange", "dlx"));
//定义用于异步更新order读模型的queue,设置死信交换为dlx,队列满(x-overflow)时将头部消息发到dlx
//定义queue的最大消息数(x-max-length)为300,满后发到dlx,另外定义消息的存活时间(x-message-ttl)为1天,1天后发送到dlx
ImmutableMap<String, Object> orderSummaryQueueArguments = of("x-dead-letter-exchange",
"dlx",
"x-overflow",
"drop-head",
"x-max-length",
300,
"x-message-ttl",
24 * 60 * 60 * 1000);
channel.queueDeclare("order-summary-queue", true, false, false, orderSummaryQueueArguments);
channel.queueBind("order-summary-queue", "order", "order.#");
//定义用于order建立时向用户发出通知的queue,设置死信交换为dlx
ImmutableMap<String, Object> orderNotificationQueueArguments = of("x-dead-letter-exchange",
"dlx",
"x-overflow",
"drop-head",
"x-max-length",
300,
"x-message-ttl",
24 * 60 * 60 * 1000);
channel.queueDeclare("order-notification-queue", true, false, false, orderNotificationQueueArguments);
channel.queueBind("order-notification-queue", "order", "order.created");
//设置发送端确认
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long seqNo, boolean multiple) {
if (multiple) {
logger.info(seqNo + "号及其之前的全部消息发送成功,当消息发送成功后执行相应逻辑,好比标记事件为已发送或者删除原来事件");
} else {
logger.info(seqNo + "号发送成功,当消息发送成功后执行相应逻辑,好比标记事件为已发送或者删除原来事件");
}
}
public void handleNack(long seqNo, boolean multiple) {
if (multiple) {
logger.info(seqNo + "号及其之前的全部消息发送失败,当消息发送失败后执行相应逻辑,好比重试或者标记事件发送失败");
} else {
logger.info(seqNo + "号发送失败,当消息发送失败后执行相应逻辑,好比重试或者标记事件发送失败");
}
}
});
//开启发送者确认
channel.confirmSelect();
//设置消息持久化
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.contentType("application/json")
.deliveryMode(2)
.priority(0)
.build();
//发送时没有必要设置mandatory,由于没法路由的消息会记录在dlq中
//达到queue的上限时,queue头部消息将被放入dlx中
try {
channel.basicPublish("order", "order.created", false, properties, "create order data".getBytes());
channel.basicPublish("order", "order.updated", false, properties, "update order data".getBytes());
} catch (IOException e) {
e.printStackTrace();
}
Thread.sleep(5000);
}
}
}
以上,咱们发送了一条order.created消息和一条order.updated消息,基于routing key设置,两条消息都会到达order-summary-queue,可是只有order.created消息到达了order-notification-queue:
在consumer端,开启手动ack,而且对于处理失败的场景,只容许从新投递一次,不然扔到DLX中:
package com.ecommerce.order.spike.rabbitmq;
import com.ecommerce.order.common.logging.AutoNamingLoggerFactory;
import com.rabbitmq.client.*;
import org.slf4j.Logger;
import java.io.IOException;
import java.util.Random;
import java.util.concurrent.TimeoutException;
public class RabbitMQReceiver {
private static final Logger logger = AutoNamingLoggerFactory.getLogger();
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("rabbitmq-user");
factory.setPassword("rabbitmq-password");
factory.setVirtualHost("/");
factory.setPort(5672);
Connection conn = factory.newConnection();
Channel channel = conn.createChannel();
channel.basicQos(1, true);
boolean autoAck = false;
channel.basicConsume("order-summary-queue", autoAck,
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
long deliveryTag = envelope.getDeliveryTag();
//用Random来模拟有时处理成功有时处理失败的场景
if (new Random().nextBoolean()) {
logger.info("成功消费消息" + deliveryTag);
channel.basicAck(deliveryTag, false);
} else {
if (!envelope.isRedeliver()) {
logger.warn("首次消费消息" + deliveryTag + "不成功,尝试重试");
channel.basicNack(deliveryTag, false, true);
} else {
logger.warn("第二次消费消息" + deliveryTag + "不成功,扔到DLX");
channel.basicNack(deliveryTag, false, false);
}
}
}
});
}
}
来自:https://www.cnblogs.com/davenkin/p/rabbitmq-best-practices.html