6、发布消息到RabbitMQ与性能权衡(读书笔记与我的实践)

摘要

本文主要学习的目标有两个:java

  • RabbitMQ中的消息可靠性投递的方式;
  • 发布的性能权衡;

虽然不是全部的系统都要求像银行同样对消息可靠投递有很是严格的要求,但确保消息被接收和投递是很是重要的。RabbitMQ基于AMQP规范,后者提供消息发布中的事务以及消息持久化选项,以提供比自身普通消息发布更高级的可靠消息通讯机制。git

发布性能的权衡

在RabbitMQ中,建立可靠性投递的每一个机制都会对性能产生必定的影响。单独使用时可能不太会注意到吞吐量的差别,可是当它们组合使用时,吞吐量就会由明显不一样,只有经过执行本身的性能基准测试,才能肯定性能与可靠性投递之间能够接受的平衡。github

下面从左到右依次说明这些机制会产生哪些性能影响。spring

另外,会使用Spring提供的RabbitTemplate客户端工具(使用过RabbitTemplate,后续可能不会介绍RabbitTemplate),对每种机制进行配置,并发送消息到RabbitMQ。bash

代码在Github:https://github.com/XuePeng87/rabbitmq-example服务器

没有保障

在完美世界里,无须任何额外的配置或操做,RabbitMQ就能够可靠的投递消息。并发

不幸的是,当墨菲定律肆虐咱们的程序时,完美世界并不存在。dom

在非核心应用中,发布的消息没必要处理每一个可能的故障点,例如发一些容许丢弃的消息,那么咱们能够不使用任何保障机制,直接使用Basic.Publish发送消息。异步

使用RabbitTemplate时,能够在配置文件中设置:函数

spring:
  #消息队列配置
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    publisher-returns: false
    publisher-confirms: false
    connection-timeout: 5000ms

将publisher-returns和publisher-confirms设置为false。

失败通知

设置mandatory后,RabbitMQ将不接受不可路由的消息。

mandatory标志是一个与Basic.Publish命令一块儿传递的参数,该参数会告诉RabbitMQ,若是消息不可路由,它应该经过Basic.Return命令将消息返回给发布者。设置mandatory标志能够被认为是开启故障检测模式,它只会让RabbitMQ向你通知失败,而不会通知成功。若是消息路由正确,你的发布者将不会收到通知。

/**
     * 定制AmqpTemplate对象。
     * 可根据须要定制多个。
     *
     * @return AmqpTemplate对象。
     */
    @Bean
    public AmqpTemplate amqpTemplate() {
        rabbitTemplate.setEncoding("UTF-8");
        // 设置不接受不可路由的消息,须要在yml中配置:publisher-returns: true
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            String correlationId = message.getMessageProperties().getCorrelationId();
            log.warn("ReturnCallback -> 消息 {} 发送失败,应答码:{},缘由:{},交换器: {},路由键:{}",
                    correlationId,
                    replyCode,
                    replyText,
                    exchange,
                    routingKey);
        });
        return rabbitTemplate;
    }

如上面的配置,咱们设置了mandatory等于true,同时将配置文件中的publisher-returns也设置为true,这样就打开了失败通知。下面作个测试:

/**
     * 发送direct消息。
     * 交换器存在,但队列不存在,为了测试Mandatory与ReturnCallback。
     *
     * @param message 消息内容。
     */
    public void directNotExistQueue(String message) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY_NOT_EXIST", message, correlationData);
    }

咱们建立了交换器DIRECT_EXCHANGE,可是使用一个不存在的RoutingKey,这就等于发送消息到交换器成功,可是没法路由到某一个队列,执行测试用例,观察结果:

/**
     * 发送direct消息,但消息路由不存在。
     * 交换器存在,但队列不存在,为了测试Mandatory与ReturnCallback。
     */
    @Test
    public void testDirectNotExistQueue() {
        messageProducer.directNotExistQueue("{}");
    }

结果以下: 

ReturnCallback -> 消息 null 发送失败,应答码:312,缘由:NO_ROUTE,交换器: DIRECT_EXCHANGE,路由键:DIRECT_ROUTING_KEY_NOT_EXIST

Basic.Return调用是一个RabbitMQ的异步调用,而且在消息发布后的任什么时候候均可能发生。

若是代码中没有设置setReturnCallback,那么该调用将被忽略。

其实setReturnCallback就是处理Basic.Return的回调方法,RabbitTemplate接收到Basic.Return命令后,调用该方法。

发布者确认

发布者确认模式是AMQP规范的扩展功能,只能用在支持这个特定扩展的客户端,RabbitTemplate支持这个模式。

在协议层,发布任何消息以前,消息发布者必须向RabbitMQ发送Confirm.Select请求,并等待Confirm.SelectOk响应以获知投递确认已经被启动。在这一点上,对于发布者发送给RabbitMQ的每条消息,服务器会发送一个确认响应(Basic.Ack)或否认确认响应(Basic.Nack)。

在RabbitTemplate中,要使用发布者确认,须要在配置文件中配置:

publisher-confirms: true

而后在设置回调函数:

/**
     * 定制AmqpTemplate对象。
     * 可根据须要定制多个。
     *
     * @return AmqpTemplate对象。
     */
    @Bean
    public AmqpTemplate amqpTemplate() {
        // 设置消息转换器为Jackson
        rabbitTemplate.setEncoding("UTF-8");
        // 设置不接受不可路由的消息,须要在yml中配置:publisher-returns: true
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            String correlationId = message.getMessageProperties().getCorrelationId();
            log.warn("ReturnCallback -> 消息 {} 发送失败,应答码:{},缘由:{},交换器: {},路由键:{}",
                    correlationId,
                    replyCode,
                    replyText,
                    exchange,
                    routingKey);
        });
        // 设置消息发布确认功能,须要在yml中配置:publisher-confirms: true
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.info("ConfirmCallback -> 消息发布到交换器成功,id:{}", correlationData);
            } else {
                log.warn("ConfirmCallback -> 消息发布到交换器失败,错误缘由为:{}", cause);
            }
        });
        // 开启事务模式,须要在yml中配置:publisher-confirms: false
        // rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
    }

调用setConfirmCallback方法,设置回调函数,每次发送消息到RabbitMQ,服务器都会返回响应,能够经过判断ack来肯定是否发送成功。

当成功发送到交换器后,ConfirmCallback会接收到ack为true的响应,若是没有成功发送到交换器,则会接收到ack为false的响应。

具体测试代码以下:

/**
     * 发送direct消息。
     * 交换器不存在,队列也不存在,为了测试ConfirmCallback。
     *
     * @param message 消息内容。
     */
    public void directNotExistExchangeAndQueue(String message) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("DIRECT_EXCHANGE_NOT_EXIST", "DIRECT_ROUTING_KEY_NOT_EXIST", message, correlationData);
    }

首先向不存在的交换器发送消息,结果为:

/**
     * 发送direct消息,交换器和路由都不存在。
     * 交换器不存在,队列也不存在,为了测试ConfirmCallback。
     */
    @Test
    public void testDirectNotExistExchangeAndQueue() {
        messageProducer.directNotExistExchangeAndQueue("{}");
    }
ConfirmCallback -> 消息发布到交换器失败,错误缘由为:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'DIRECT_EXCHANGE_NOT_EXIST' in vhost '/', class-id=60, method-id=40)

而后在使用失败通知模式的测试用例测试一下,即能发送到交换器,可是没法路由到队列:

ReturnCallback -> 消息 null 发送失败,应答码:312,缘由:NO_ROUTE,交换器: DIRECT_EXCHANGE,路由键:DIRECT_ROUTING_KEY_NOT_EXIST

ConfirmCallback -> 消息发布到交换器成功,id:CorrelationData [id=9282dbe9-4fe9-4b85-af06-79305f4c99e1]
不管是否使用发布者确认模式,若是你发布消息到不存在的交换器,那么发布用的信道将会被RabbitMQ关闭。

发布者确认模式不能与事务模式一块儿工做,此外,做为对Basic.Publish请求的异步响应,它并不能保证什么时候会收到确认。

备用交换器

备用交换器是RabbitMQ对AMQP的另外一种扩展,用于处理没法路由的消息。备用交换器在第一次声明交换器时被指定,用来提供一种预先存在的交换器,即若是交换器没法路由消息,那么消息就会被路由到这个新的备用交换器。

若是将消息发送到具备备用交换器的交换器(设置了mandatory=true)上, 那么一旦预期的交换器没法正常路由消息,Basic.Return就不会发给发布者。由于消息成功的发布到了备用交换器。

RabbitTemplate声明备用交换器的代码以下:

/**
     * 声明Direct交换器。
     * 同时指定备用交换器。
     *
     * @return Exchange对象。
     */
    @Bean("directExchange")
    public Exchange directExchange() {
        return ExchangeBuilder.directExchange("DIRECT_EXCHANGE")
                .durable(false)
                .withArgument("alternate-exchange", "UN_ROUTE_EXCHANGE")
                .build();
    }

在声明交换器时,调用withArgument函数,key为alternate-exchange,value为备用交换器的名称,这里是UN_ROUTE_EXCHANGE(备用服务器也须要建立)。

下面进行测试,发送一个没法路由的消息到DIRECT_EXCHANGE,这个消息将不能被路由,但不会回调ReturnCallback,而是会进入到UN_ROUTE_EXCHANGE交换器中:

事务提交

AMQP事务提供了一种机制,经过这种机制,消息能够批量发布到RabbitMQ,而后提交到队列或者回滚。

在RabbitTemplate中,使用事务就不能使用ReturnConfime模式,因此要把publisher-confimes设置为false,具体代码以下:

/**
     * 定制AmqpTemplate对象。
     * 可根据须要定制多个。
     *
     * @return AmqpTemplate对象。
     */
    @Bean
    public AmqpTemplate amqpTemplate() {
        // 设置消息转换器为Jackson
        rabbitTemplate.setEncoding("UTF-8");
        // 设置不接受不可路由的消息,须要在yml中配置:publisher-returns: true
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            String correlationId = message.getMessageProperties().getCorrelationId();
            log.warn("ReturnCallback -> 消息 {} 发送失败,应答码:{},缘由:{},交换器: {},路由键:{}",
                    correlationId,
                    replyCode,
                    replyText,
                    exchange,
                    routingKey);
        });
        // 开启事务模式,须要在yml中配置:publisher-confirms: false
        rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
    }

代码中,要设置setChannelTransacted为true,而后声明RabbitMQ的事务管理器:

/**
     * 声明RabbitMQ事务管理器。
     *
     * @param connectionFactory 链接工厂。
     * @return PlatformTransactionManager对象。
     */
    @Bean
    public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }

到这里,事务的配置准备工做就作好了,接下来,基于事务模式发送消息:

/**
     * 在事务模式下,发送direct消息。
     * <p>
     * 第一次发送,消息能够正常路由到队列。
     * 第二次发送,消息不能路由到队列。
     */
    @Transactional(rollbackFor = Exception.class)
    public void directOnTransaction(String message) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("DIRECT_TRANSACTION_EXCHANGE", "DIRECT_TRANSACTION_ROUTING_KEY", message, correlationData);
        rabbitTemplate.convertAndSend("DIRECT_TRANSACTION_EXCHANGE_NOT_EXIST", "DIRECT_TRANSACTION_ROUTING_KEY_NOT_EXIST", message, correlationData);
    }

代码中,加入了@Transactional修饰方法,前后发送两条消息到交换器,第一次发送的消息会正常路由到队列,第二次发送的消息则不会发送到队列,下面是测试代码和结果:

/**
     * 在事务模式下,发送direct消息。
     * 第一次发送,消息能够正常路由到队列。
     * 第二次发送,消息不能路由到队列。
     */
    @Test
    public void testDirectOnTransaction() {
        messageProducer.directOnTransaction("{}");
    }
org.springframework.amqp.AmqpException: failed to commit RabbitMQ transaction

因为发生了异常,执行了回滚,因此第一条消息也没有被发送到队列:

若是两条数据都会成功发送到RabbitMQ,则会成功提交两条消息。

若是不用@Transactional修饰方法,那么会有一条消息进入RabbitMQ,另外一条消息丢失,具体测试以下,首先是两条消息都能发送到RabbitMQ:

/**
     * 在事务模式下,发送direct消息。
     * <p>
     * 第一次发送,消息能够正常路由到队列。
     * 第二次发送,消息不能路由到队列。
     */
    @Transactional(rollbackFor = Exception.class)
    public void directOnTransaction(String message) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("DIRECT_TRANSACTION_EXCHANGE", "DIRECT_TRANSACTION_ROUTING_KEY", message, correlationData);
        rabbitTemplate.convertAndSend("DIRECT_TRANSACTION_EXCHANGE", "DIRECT_TRANSACTION_ROUTING_KEY", message, correlationData);
    }

下面把@Transactional修饰去掉,而后一条能够发送到RabbitMQ,另外一条不能够:

/**
     * 在事务模式下,发送direct消息。
     * <p>
     * 第一次发送,消息能够正常路由到队列。
     * 第二次发送,消息不能路由到队列。
     */
    // @Transactional(rollbackFor = Exception.class)
    public void directOnTransaction(String message) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("DIRECT_TRANSACTION_EXCHANGE", "DIRECT_TRANSACTION_ROUTING_KEY", message, correlationData);
        rabbitTemplate.convertAndSend("DIRECT_TRANSACTION_EXCHANGE_NOT_EXIST", "DIRECT_TRANSACTION_ROUTING_KEY_NOT_EXIST", message, correlationData);
    }

执行后结果以下:

org.springframework.amqp.AmqpIOException: java.io.IOException

能够看到程序依旧抛出了异常,但第一条消息发送到了RabbitMQ中:

在协议层,当RabbitMQ因为错误而没法路由时,它将发送一个Basic.Return响应,但愿终止事务的发布者应该发送TX.Rollback请求,并等待TX.RollbackOk响应,而后继续工做。

RabbitMQ只在每一个发出的命令做用于单个队列时才执行原子事务。若是不仅一个队列受到事务中任何命令的影响,则提交就不具有原子性。

推荐使用发布确认模式用做轻量级替代方案,由于它的速度快,能够同时提供确定或否认的确认。

高可用队列以及高可用队列事务

高可用队列(HA队列)时RabbitMQ的一项加强功能,它容许队列在多个服务器上拥有冗余副本。

当消息发送到高可用队列是,消息会发送到集群中的每台服务器,一旦消息在集群中的任何节点都完成消费,那么消息全部副本将当即从其余节点删除。

HA队列中有一个节点是主节点,其余全部节点都是辅助节点。当主节点发生故障,会在辅助节点中选择一个接管主节点的角色。若是HA节点中的一个辅助节点故障了,其余节点将照常工做。

当一个故障节点恢复了,或者新添加进来一个辅助节点,它将不包含任何已经存在于现有节点中的消息,当现有节点的消息被消费后,故障节点或新节点则开始接收消息,并执行同步操做。

若是使用事务或消息确认机制,则消息须要在HA队列中全部活动节点肯定后,RabbitMQ才会发送成功响应。

高可用队列的配置在后面会单独写一篇。

消息持久化

若是将一个消息的delivery-mode设置为1,RabbitMQ会被告知不须要将消息存储到磁盘,而消息会一直保存在内存中。

为了使消息在RabbitMQ重启后仍然存在,除了将delivery-mode设置为2,还须要在建立队列时设置durable,使队列变为持久化队列。

在发布消息时,RabbitTemplate默认采用持久化策略,若是但愿持久化存储消息,须要在发送消息时作以下设置:

/**
     * 发送direct非持久化消息。
     * RabbitTemplate默认采用消息持久化存储。
     *
     * @param message 消息内容。
     */
    public void directNonPersistent(String message) {
        rabbitTemplate.convertAndSend("DIRECT_EXCHANGE",
                "DIRECT_ROUTING_KEY",
                message, msg -> {
                    msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
                    msg.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());
                    return msg;
                }
        );
    }

setDeliveryMode为非持久化模式后,发送的消息将只保存在RabbitMQ的内存中。

在I/O密集型服务器中,经过操做系统在存储设备之间传输数据时,操做系统将阻塞I/O操做的进程。当RabbitMQ服务器正在尝试执行I/O操做,并等待存储设备响应时,操做系统内核发生阻塞,那么RabbitMQ能作的就只有等待。

尽管消息持久化时保障消息最终被投递的最重要的方式之一,但实现它的代价也时最大的。

相关文章
相关标签/搜索