【RabbitMQ】如何进行消息可靠投递【上篇】

说明

前几天,忽然发生线上报警,钉钉连发了好几条消息,一看是RabbitMQ相关的消息,心头一紧,难道翻车了?java

u=1091165172,1855706818&fm=26&gp=0.jpg

[橙色报警] 应用[xxx]在[08-15 16:36:04]发生[错误日志异常],alertId=[xxx]。由[org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620]触发。
应用xxx 可能缘由以下
服务名为:
 异常为:org.springframework.amqp.rabbit.listener.BlockingQueueConsumer:start:620
 产生缘由以下:
1.org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
||Consumer received fatal=false exception on startup:
...
应用xxx 可能缘由以下
服务名为:
 异常为:org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer:run:1160
 产生缘由以下:
1.Stopping container from aborted consumer||Stopping container from aborted consumer:
复制代码

定睛一看,看样子像是消费者莫名其妙断开了链接,正逢公司搬家之际,难道是机房又双叒叕。。。。断电了?因而赶忙联系了运维,咨询RabbitMQ是否发生了调整。几分钟后,获得了运维的回复,因为一些不可描述的缘由,RabbitMQ进行了重启,emmmm,虽然重启只持续了10分钟,可是致使该集群下全部消费者都挂了,须要将项目重启后才能正常进行消费。redis

项目重启后,一切彷佛又正常运转起来,但好景不长,没过多久,工单就找上了门来,通过排查,发现是生产者在RabbitMQ重启期间消息投递失败,致使消息丢失,须要手动处理和恢复。spring

因而,我开始思考,如何才能进行RabbitMQ的消息可靠投递呢?特别是在这样比较极端的状况,RabbitMQ集群不可用的时候,没法投递的消息该如何处理呢?sql

可靠投递

先来讲明一个概念,什么是可靠投递呢?在RabbitMQ中,一个消息从生产者发送到RabbitMQ服务器,须要经历这么几个步骤:shell

  1. 生产者准备好须要投递的消息。
  2. 生产者与RabbitMQ服务器创建链接。
  3. 生产者发送消息。
  4. RabbitMQ服务器接收到消息,并将其路由到指定队列。
  5. RabbitMQ服务器发起回调,告知生产者消息发送成功。

所谓可靠投递,就是确保消息可以百分百从生产者发送到服务器。数据库

{6582FAF9-A46E-4239-810B-E1D6883ED070}.png.jpg

为了不争议,补充说明一下,若是没有设置Mandatory参数,是不须要先路由消息才发起回调的,服务器收到消息后就会进行回调确认。缓存

二、三、5步都是经过TCP链接进行交互,有网络调用的地方就会有事故,网络波动随时都有可能发生,不论是内部机房停电,仍是外部光缆被切,网络事故没法预测,虽然这些都是小几率事件,但对于订单等敏感数据处理来讲,这些状况下致使消息丢失都是不可接受的。服务器

20170716034945131.jpg

RabbitMQ中的消息可靠投递

默认状况下,发送消息的操做是不会返回任何信息给生产者的,也就是说,默认状况下生产者是不知道消息有没有正确地到达服务器。网络

那么如何解决这个问题呢?app

对此,RabbitMQ中有一些相关的解决方案:

  1. 使用事务机制来让生产者感知消息被成功投递到服务器。
  2. 经过生产者确认机制实现。

在RabbitMQ中,全部确保消息可靠投递的机制都会对性能产生必定影响,如使用不当,可能会对吞吐量形成重大影响,只有经过执行性能基准测试,才能在肯定性能与可靠投递之间的平衡。

在使用可靠投递前,须要先思考如下问题:

  1. 消息发布时,保证消息进入队列的重要性有多高?
  2. 若是消息没法进行路由,是否应该将该消息返回给发布者?
  3. 若是消息没法被路由,是否应该将其发送到其余地方稍后再从新进行路由?
  4. 若是RabbitMQ服务器崩溃了,是否能够接受消息丢失?
  5. RabbitMQ在处理新消息时是否应该确认它已经为发布者执行了全部请求的路由和持久化?
  6. 消息发布者是否能够批量投递消息?
  7. 在可靠投递上是否有能够接受的平衡性?是否能够接受一部分的不可靠性来提高性能?

只考虑平衡性不考虑性能是不行的,至于这个平衡的度具体如何把握,就要具体状况具体分析了,好比像订单数据这样敏感的信息,对可靠性的要求天然要比通常的业务消息对可靠性的要求高的多,由于订单数据是跟钱直接相关的,可能会致使直接的经济损失。

因此不只应该知道有哪些保证消息可靠性的解决方案,还应该知道每种方案对性能的影响程度,以此来进行方案的选择。

RabbitMQ的事务机制

RabbitMQ是支持AMQP事务机制的,在生产者确认机制以前,事务是确保消息被成功投递的惟一方法。

在SpringBoot项目中,使用RabbitMQ事务其实很简单,只须要声明一个事务管理的Bean,并将RabbitTemplate的事务设置为true便可。

配置文件以下:

spring:
 rabbitmq:
 host: localhost
 password: guest
 username: guest
 listener:
 type: simple
 simple:
 default-requeue-rejected: false
 acknowledge-mode: manual
复制代码

先来配置一下交换机和队列,以及事务管理器。

@Configuration
public class RabbitMQConfig {

    public static final String BUSINESS_EXCHANGE_NAME = "rabbitmq.tx.demo.simple.business.exchange";
    public static final String BUSINESS_QUEUEA_NAME = "rabbitmq.tx.demo.simple.business.queue";

    // 声明业务Exchange
    @Bean("businessExchange")
    public FanoutExchange businessExchange(){
        return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
    }

    // 声明业务队列
    @Bean("businessQueue")
    public Queue businessQueue(){
        return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).build();
    }

    // 声明业务队列绑定关系
    @Bean
    public Binding businessBinding(@Qualifier("businessQueue") Queue queue, @Qualifier("businessExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }


    /** * 配置启用rabbitmq事务 * @param connectionFactory * @return */
    @Bean
    public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }
}
复制代码

而后建立一个消费者,来监听消息,用以判断消息是否成功发送。

@Slf4j
@Component
public class BusinessMsgConsumer {


    @RabbitListener(queues = BUSINESS_QUEUEA_NAME)
    public void receiveMsg(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("收到业务消息:{}", msg);
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
    }
}
复制代码

而后是消息生产者:

@Slf4j
@Component
public class BusinessMsgProducer{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    private void init() {
        rabbitTemplate.setChannelTransacted(true);
    }

    @Transactional
    public void sendMsg(String msg) {
        rabbitTemplate.convertAndSend(BUSINESS_EXCHANGE_NAME, "key", msg);
        log.info("msg:{}", msg);
        if (msg != null && msg.contains("exception"))
            throw new RuntimeException("surprise!");
        log.info("消息已发送 {}" ,msg);
    }
}
复制代码

这里有两个注意的地方:

  1. 在初始化方法里,经过使用rabbitTemplate.setChannelTransacted(true); 来开启事务。
  2. 在发送消息的方法上加上 @Transactional 注解,这样在该方法中发生异常时,消息将不会发送。

在controller中加一个接口来生产消息:

@RestController
public class BusinessController {

    @Autowired
    private BusinessMsgProducer producer;

    @RequestMapping("send")
    public void sendMsg(String msg){
        producer.sendMsg(msg);
    }
}
复制代码

来验证一下:

msg:1
消息已发送 1
收到业务消息:1
msg:2
消息已发送 2
收到业务消息:2
msg:3
消息已发送 3
收到业务消息:3
msg:exception

Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is java.lang.RuntimeException: surprise!] with root cause

java.lang.RuntimeException: surprise!
	at com.mfrank.rabbitmqdemo.producer.BusinessMsgProducer.sendMsg(BusinessMsgProducer.java:30)
    ...
复制代码

msg 的值为 exception 时, 在调用rabbitTemplate.convertAndSend 方法以后,程序抛出了异常,消息并无发送出去,而是被当前事务回滚了。

固然,你能够将事务管理器注释掉,或者将初始化方法的开启事务注释掉,这样事务就不会生效,即便在调用了发送消息方法以后,程序发生了异常,消息也会被正常发送和消费。

RabbitMQ中的事务使用起来虽然简单,可是对性能的影响是不可忽视的,由于每次事务的提交都是阻塞式的等待服务器处理返回结果,而默认模式下,客户端是不须要等待的,直接发送就完事了,除此以外,事务消息须要比普通消息多4次与服务器的交互,这就意味着会占用更多的处理时间,因此若是对消息处理速度有较高要求时,尽可能不要采用事务机制。

RabbitMQ的生产者确认机制

RabbitMQ中的生产者确认功能是AMQP规范的加强功能,当生产者发布给全部队列的已路由消息被消费者应用程序直接消费时,或者消息被放入队列并根据须要进行持久化时,一个Basic.Ack请求会被发送到生产者,若是消息没法路由,代理服务器将发送一个Basic.Nack RPC请求用于表示失败。而后由生产者决定该如何处理该消息。

也就是说,经过生产者确认机制,生产者能够在消息被服务器成功接收时获得反馈,并有机会处理未被成功接收的消息。

在Springboot中开启RabbitMQ的生产者确认模式也很简单,只多了一行配置:

spring:
 rabbitmq:
 host: localhost
 password: guest
 username: guest
 listener:
 type: simple
 simple:
 default-requeue-rejected: false
 acknowledge-mode: manual
 publisher-confirms: true
复制代码

publisher-confirms: true 即表示开启生产者确认模式。

而后将消息生产者的表明进行部分修改:

@Slf4j
@Component
public class BusinessMsgProducer implements RabbitTemplate.ConfirmCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    private void init() {
// rabbitTemplate.setChannelTransacted(true);
        rabbitTemplate.setConfirmCallback(this);
    }

    public void sendCustomMsg(String exchange, String msg) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        log.info("消息id:{}, msg:{}", correlationData.getId(), msg);

        rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData);
    }
    
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (b) {
            log.info("消息确认成功, id:{}", id);
        } else {
            log.error("消息未成功投递, id:{}, cause:{}", id, s);
        }
    }
}
复制代码

让生产者继承自RabbitTemplate.ConfirmCallback 类,而后实现其confirm 方法,便可用其接收服务器回调。

须要注意的是,在发送消息时,代码也进行了调整:

CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

rabbitTemplate.convertAndSend(exchange, "key", msg, correlationData);
复制代码

这里咱们为消息设置了消息ID,以便在回调时经过该ID来判断是对哪一个消息的回调,由于在回调函数中,咱们是没法直接获取到消息内容的,因此须要将消息先暂存起来,根据消息的重要程度,能够考虑使用本地缓存,或者存入Redis中,或者Mysql中,而后在回调时更新其状态或者从缓存中移除,最后使用定时任务对一段时间内未发送的消息进行从新投递。

如下是我盗来的图,原谅我偷懒不想画了[手动狗头]:

5b65729e0001439305000294.jpg

另外,还须要注意的是,若是将消息发布到不存在的交换机上,那么发布用的信道将会被RabbitMQ关闭。

此外,生产者确认机制跟事务是不能一块儿工做的,是事务的轻量级替代方案。由于事务和发布者确认模式都是须要先跟服务器协商,对信道启用的一种模式,不能对同一个信道同时使用两种模式。

在生产者确认模式中,消息的确承认以是异步和批量的,因此相比使用事务,性能会更好。

使用事务机制和生产者确认机制都能确保消息被正确的发送至RabbitMQ,这里的“正确发送至RabbitMQ”说的是消息成功被交换机接收,但若是找不到能接收该消息的队列,这条消息也会丢失。至于如何处理那些没法被投递到队列的消息,将会在下篇进行说明。

结题

因此当公司机房“断电”时,如何处理那些须要发送的消息呢?相信看完上文以后,你的心中已经有了答案。

通常来讲,这种“断电”不会持续较长时间,通常几分钟到半小时之间,很快可以恢复,因此若是是重要消息,能够保存到数据库中,若是是非重要消息,可使用redis进行保存,固然,还要根据消息的数量级来进行判断。

若是消息量比较大,能够考虑将消息发送到另外一个集群的死信队列中,事实上,所在公司就有两个RabbitMQ集群,因此当一个集群不可用时,能够往另外一个集群发消息,emmm,若是两个机房都停电了的话,当我没说。

111.png.jpg
相关文章
相关标签/搜索