@[toc] 微服务能够设计成消息驱动的微服务,响应式系统也能够基于消息中间件来作,从这个角度来讲,在互联网应用开发中,消息中间件真的是过重要了。java
今天,以 RabbitMQ 为例,松哥来和你们聊一聊消息中间消息发送可靠性的问题。git
注意,如下内容我主要和你们讨论如何确保消息生产者将消息发送成功,并不涉及消息消费的问题。github
你们知道,RabbitMQ 中的消息发送引入了 Exchange(交换机)的概念,消息的发送首先到达交换机上,而后再根据既定的路由规则,由交换机将消息路由到不一样的 Queue(队列)中,再由不一样的消费者去消费。spring
大体的流程就是这样,因此要确保消息发送的可靠性,主要从两方面去确认:数据库
若是能确认这两步,那么咱们就能够认为消息发送成功了。markdown
若是这两步中任一步骤出现问题,那么消息就没有成功送达,此时咱们可能要经过重试等方式去从新发送消息,屡次重试以后,若是消息仍是不能到达,则可能就须要人工介入了。并发
通过上面的分析,咱们能够确认,要确保消息成功发送,咱们只须要作好三件事就能够了:app
上面提出的三个步骤,第三步须要咱们本身实现,前两步 RabbitMQ 则有现成的解决方案。dom
如何确保消息成功到达 RabbitMQ?RabbitMQ 给出了两种方案:ide
这是两种不一样的方案,不能够同时开启,只能选择其中之一,若是二者同时开启,则会报以下错误:
咱们分别来看。如下全部案例都在 Spring Boot 中展开,文末能够下载相关源码。
Spring Boot 中开启 RabbitMQ 事务机制的方式以下:
首先须要先提供一个事务管理器,以下:
@Bean
RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
复制代码
接下来,在消息生产者上面作两件事:添加事务注解并设置通讯信道为事务模式:
@Service
public class MsgService {
@Autowired
RabbitTemplate rabbitTemplate;
@Transactional
public void send() {
rabbitTemplate.setChannelTransacted(true);
rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes());
int i = 1 / 0;
}
}
复制代码
这里注意两点:
@Transactional
注解标记事务。这就 OK 了。
在上面的案例中,咱们在结尾来了个 1/0 ,这在运行时必然抛出异常,咱们能够尝试运行该方法,发现消息并未发送成功。
当咱们开启事务模式以后,RabbitMQ 生产者发送消息会多出四个步骤:
上面的步骤,除了第三步是原本就有的,其余几个步骤都是无缘无故多出来的。因此你们看到,事务模式其实效率有点低,这并不是一个最佳解决方案。咱们能够想一想,什么项目会用到消息中间件?通常来讲都是一些高并发的项目,这个时候并发性能尤其重要。
因此,RabbitMQ 还提供了发送方确认机制(publisher confirm)来确保消息发送成功,这种方式,性能要远远高于事务模式,一块儿来看下。
首先咱们移除刚刚关于事务的代码,而后在 application.properties 中配置开启消息发送方确认机制,以下:
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
复制代码
第一行是配置消息到达交换器的确认回调,第二行则是配置消息到达队列的回调。
第一行属性的配置有三个取值:
waitForConfirms()
和 waitForConfirmsOrDie()
方法的调用。接下来咱们要开启两个监听,具体配置以下:
@Configuration
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
public static final String JAVABOY_EXCHANGE_NAME = "javaboy_exchange_name";
public static final String JAVABOY_QUEUE_NAME = "javaboy_queue_name";
private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
Queue queue() {
return new Queue(JAVABOY_QUEUE_NAME);
}
@Bean
DirectExchange directExchange() {
return new DirectExchange(JAVABOY_EXCHANGE_NAME);
}
@Bean
Binding binding() {
return BindingBuilder.bind(queue())
.to(directExchange())
.with(JAVABOY_QUEUE_NAME);
}
@PostConstruct
public void initRabbitTemplate() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
logger.info("{}:消息成功到达交换器",correlationData.getId());
}else{
logger.error("{}:消息发送失败", correlationData.getId());
}
}
@Override
public void returnedMessage(ReturnedMessage returned) {
logger.error("{}:消息未成功路由到队列",returned.getMessage().getMessageProperties().getMessageId());
}
}
复制代码
关于这个配置类,我说以下几点:
RabbitTemplate.ConfirmCallback
和 RabbitTemplate.ReturnsCallback
两个接口,这两个接口,前者的回调用来肯定消息到达交换器,后者则会在消息路由到队列失败时被调用。这就能够了。
接下来咱们对消息发送进行测试。
首先咱们尝试将消息发送到一个不存在的交换机中,像下面这样:
rabbitTemplate.convertAndSend("RabbitConfig.JAVABOY_EXCHANGE_NAME",RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
复制代码
注意第一个参数是一个字符串,不是变量,这个交换器并不存在,此时控制台会报以下错误:
接下来咱们给定一个真实存在的交换器,可是给一个不存在的队列,像下面这样:
rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,"RabbitConfig.JAVABOY_QUEUE_NAME","hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
复制代码
注意此时第二个参数是一个字符串,不是变量。
能够看到,消息虽然成功达到交换器了,可是没有成功路由到队列(由于队列不存在)。
这是一条消息的发送,咱们再来看看消息的批量发送。
若是是消息批量处理,那么发送成功的回调监听是同样的,这里再也不赘述。
这就是 publisher-confirm 模式。
相比于事务,这种模式下的消息吞吐量会获得极大的提高。
失败重试分两种状况,一种是压根没找到 MQ 致使的失败重试,另外一种是找到 MQ 了,可是消息发送失败了。
两种重试咱们分别来看。
前面所说的事务机制和发送方确认机制,都是发送方确认消息发送成功的办法。若是发送方一开始就连不上 MQ,那么 Spring Boot 中也有相应的重试机制,可是这个重试机制就和 MQ 自己没有关系了,这是利用 Spring 中的 retry 机制来完成的,具体配置以下:
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=1000ms
spring.rabbitmq.template.retry.max-attempts=10
spring.rabbitmq.template.retry.max-interval=10000ms
spring.rabbitmq.template.retry.multiplier=2
复制代码
从上往下配置含义依次是:
配置完成后,再次启动 Spring Boot 项目,而后关掉 MQ,此时尝试发送消息,就会发送失败,进而致使自动重试。
业务重试主要是针对消息没有到达交换器的状况。
若是消息没有成功到达交换器,根据咱们第二小节的讲解,此时就会触发消息发送失败回调,在这个回调中,咱们就能够作文章了!
总体思路是这样:
每次发送消息的时候,就往数据库中添加一条记录。这里的字段都很好理解,有三个我额外说下:
其余字段都很好理解,我就不一一啰嗦了。
大体的思路就是上面这样,松哥这里就不给出代码了,松哥的 vhr 里边邮件发送就是这样的思路来处理的,完整代码你们能够参考 vhr 项目(github.com/lenve/vhr)。
固然这种思路有两个弊端:
固然,你们也要注意,消息是否要确保 100% 发送成功,也要看具体状况。
好啦,这就是关于消息生产者的一些常见问题以及对应的解决方案,下篇文章松哥和你们探讨若是保证消息消费成功并解决幂等性问题。
本文涉及到的相关源代码你们能够在这里下载:github.com/lenve/javab…