第四十六章:SpringBoot & RabbitMQ完成消息延迟消费

2018-3-1SpringBoot官方发版了2.0.0.RELEASE最新版本,新版本彻底基于Spring5.0来构建,JDK最低支持也从原来的1.6也改为了1.8,再也不兼容1.8如下的版本,更多新特性请查看官方文档html

本章目标

基于SpringBoot整合RabbitMQ完成消息延迟消费。git

构建项目

注意前言

因为SpringBoot的内置扫描机制,咱们若是不自动配置扫描路径,请保持下面rabbitmq-common模块内的配置能够被SpringBoot扫描到,不然不会自动建立队列,控制台会输出404的错误信息。web

SpringBoot 企业级核心技术学习专题


专题 专题名称 专题描述
001 Spring Boot 核心技术 讲解SpringBoot一些企业级层面的核心组件
002 Spring Boot 核心技术章节源码 Spring Boot 核心技术简书每一篇文章码云对应源码
003 Spring Cloud 核心技术 对Spring Cloud核心技术全面讲解
004 Spring Cloud 核心技术章节源码 Spring Cloud 核心技术简书每一篇文章对应源码
005 QueryDSL 核心技术 全面讲解QueryDSL核心技术以及基于SpringBoot整合SpringDataJPA
006 SpringDataJPA 核心技术 全面讲解SpringDataJPA核心技术
007 SpringBoot核心技术学习目录 SpringBoot系统的学习目录,敬请关注点赞!!!

咱们本章采用2.0.0.RELEASE版本的SpringBoot,添加相关的依赖以下所示:spring

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
</parent>
......
<dependencies>
        <!--rabbbitMQ相关依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--web相关依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--lombok依赖-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <!--spring boot tester-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--fast json依赖-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.40</version>
        </dependency>
    </dependencies>
......
复制代码

咱们仍然采用多模块的方式来测试队列的Provider以及Consumerjson

队列公共模块

咱们先来建立一个名为rabbitmq-common公共依赖模块(Create New Maven Module) 在公共模块内添加一个QueueEnum队列枚举配置,该枚举内配置队列的ExchangeQueueNameRouteKey等相关内容,以下所示:bash

package com.hengyu.rabbitmq.lazy.enums;

import lombok.Getter;

/**
 * 消息队列枚举配置
 *
 * @author:于起宇 <br/>
 * ===============================
 * Created with IDEA.
 * Date:2018/3/3
 * Time:下午4:33
 * 简书:http://www.jianshu.com/u/092df3f77bca
 * ================================
 */
@Getter
public enum QueueEnum {
    /**
     * 消息通知队列
     */
    MESSAGE_QUEUE("message.center.direct", "message.center.create", "message.center.create"),
    /**
     * 消息通知ttl队列
     */
    MESSAGE_TTL_QUEUE("message.center.topic.ttl", "message.center.create.ttl", "message.center.create.ttl");
    /**
     * 交换名称
     */
    private String exchange;
    /**
     * 队列名称
     */
    private String name;
    /**
     * 路由键
     */
    private String routeKey;

    QueueEnum(String exchange, String name, String routeKey) {
        this.exchange = exchange;
        this.name = name;
        this.routeKey = routeKey;
    }
}
复制代码

能够看到MESSAGE_QUEUE队列配置跟咱们以前章节的配置同样,而咱们另外新建立了一个后缀为ttl的消息队列配置。咱们采用的这种方式是RabbitMQ消息队列其中一种的延迟消费模块,经过配置队列消息过时后转发的形式。微信

这种模式比较简单,咱们须要将消息先发送到ttl延迟队列内,当消息到达过时时间后会自动转发到ttl队列内配置的转发Exchange以及RouteKey绑定的队列内完成消息消费。app

下面咱们来模拟消息通知的延迟消费场景,先来建立一个名为MessageRabbitMqConfiguration的队列配置类,该配置类内添加消息通知队列配置以及消息经过延迟队列配置,以下所示:框架

/**
 * 消息通知 - 消息队列配置信息
 *
 * @author:恒宇少年 <br/>
 * ===============================
 * Created with IDEA.
 * Date:2018/3/3
 * Time:下午4:32
 * 简书:http://www.jianshu.com/u/092df3f77bca
 * ================================
 */
@Configuration
public class MessageRabbitMqConfiguration {
    /**
     * 消息中心实际消费队列交换配置
     *
     * @return
     */
    @Bean
    DirectExchange messageDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.MESSAGE_QUEUE.getExchange())
                .durable(true)
                .build();
    }

    /**
     * 消息中心延迟消费交换配置
     *
     * @return
     */
    @Bean
    DirectExchange messageTtlDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.MESSAGE_TTL_QUEUE.getExchange())
                .durable(true)
                .build();
    }

    /**
     * 消息中心实际消费队列配置
     *
     * @return
     */
    @Bean
    public Queue messageQueue() {
        return new Queue(QueueEnum.MESSAGE_QUEUE.getName());
    }


    /**
     * 消息中心TTL队列
     *
     * @return
     */
    @Bean
    Queue messageTtlQueue() {
        return QueueBuilder
                .durable(QueueEnum.MESSAGE_TTL_QUEUE.getName())
                // 配置到期后转发的交换
                .withArgument("x-dead-letter-exchange", QueueEnum.MESSAGE_QUEUE.getExchange())
                // 配置到期后转发的路由键
                .withArgument("x-dead-letter-routing-key", QueueEnum.MESSAGE_QUEUE.getRouteKey())
                .build();
    }

    /**
     * 消息中心实际消息交换与队列绑定
     *
     * @param messageDirect 消息中心交换配置
     * @param messageQueue  消息中心队列
     * @return
     */
    @Bean
    Binding messageBinding(DirectExchange messageDirect, Queue messageQueue) {
        return BindingBuilder
                .bind(messageQueue)
                .to(messageDirect)
                .with(QueueEnum.MESSAGE_QUEUE.getRouteKey());
    }

    /**
     * 消息中心TTL绑定实际消息中心实际消费交换机
     *
     * @param messageTtlQueue
     * @param messageTtlDirect
     * @return
     */
    @Bean
    public Binding messageTtlBinding(Queue messageTtlQueue, DirectExchange messageTtlDirect) {
        return BindingBuilder
                .bind(messageTtlQueue)
                .to(messageTtlDirect)
                .with(QueueEnum.MESSAGE_TTL_QUEUE.getRouteKey());
    }
}
复制代码

咱们声明了消息通知队列的相关ExchangeQueueBinding等配置,将message.center.create队列经过路由键message.center.create绑定到了message.center.direct交换上。ide

除此以外,咱们还添加了消息通知延迟队列ExchangeQueueBinding等配置,将message.center.create.ttl队列经过message.center.create.ttl路由键绑定到了message.center.topic.ttl交换上。

咱们仔细来看看messageTtlQueue延迟队列的配置,跟messageQueue队列配置不一样的地方这里多出了x-dead-letter-exchangex-dead-letter-routing-key两个参数,而这两个参数就是配置延迟队列过时后转发的ExchangeRouteKey,只要在建立队列时对应添加了这两个参数,在RabbitMQ管理平台看到的队列配置就不只是单纯的Direct类型的队列类型,以下图所示:

队列类型差别

在上图内咱们能够看到message.center.create.ttl队列多出了DLXDLK的配置,这就是RabbitMQ死信交换的标志。 知足死信交换的条件,在官方文档中表示:

Messages from a queue can be 'dead-lettered'; that is, republished to another exchange when any of the following events occur:

The message is rejected (basic.reject or basic.nack) with requeue=false, The TTL for the message expires; or The queue length limit is exceeded.

  • 该消息被拒绝(basic.reject或 basic.nack),requeue = false
  • 消息的TTL过时
  • 队列长度限制已超出 官方文档地址

咱们须要知足上面的其中一种方式就能够了,咱们采用知足第二个条件,采用过时的方式。

队列消息提供者

咱们再来建立一个名为rabbitmq-lazy-provider的模块(Create New Maven Module),而且在pom.xml配置文件内添加rabbitmq-common模块的依赖,以下所示:

<!--添加公共模块依赖-->
<dependency>
      <groupId>com.hengyu</groupId>
      <artifactId>rabbitmq-common</artifactId>
      <version>0.0.1-SNAPSHOT</version>
</dependency>
复制代码

配置队列

resource下建立一个名为application.yml的配置文件,在该配置文件内添加以下配置信息:

spring:
  #rabbitmq消息队列配置信息
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /hengboy
    publisher-confirms: true
复制代码

消息提供者类

接下来咱们来建立名为MessageProvider消息提供者类,用来发送消息内容到消息通知延迟队列,代码以下所示:

/**
 * 消息通知 - 提供者
 *
 * @author:于起宇 <br/>
 * ===============================
 * Created with IDEA.
 * Date:2018/3/3
 * Time:下午4:40
 * 简书:http://www.jianshu.com/u/092df3f77bca
 * ================================
 */
@Component
public class MessageProvider {
    /**
     * logger instance
     */
    static Logger logger = LoggerFactory.getLogger(MessageProvider.class);
    /**
     * RabbitMQ 模版消息实现类
     */
    @Autowired
    private AmqpTemplate rabbitMqTemplate;

    /**
     * 发送延迟消息
     *
     * @param messageContent 消息内容
     * @param exchange       队列交换
     * @param routerKey      队列交换绑定的路由键
     * @param delayTimes     延迟时长,单位:毫秒
     */
    public void sendMessage(Object messageContent, String exchange, String routerKey, final long delayTimes) {
        if (!StringUtils.isEmpty(exchange)) {
            logger.info("延迟:{}毫秒写入消息队列:{},消息内容:{}", delayTimes, routerKey, JSON.toJSONString(messageContent));
            // 执行发送消息到指定队列
            rabbitMqTemplate.convertAndSend(exchange, routerKey, messageContent, message -> {
                // 设置延迟毫秒值
                message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
                return message;
            });
        } else {
            logger.error("未找到队列消息:{},所属的交换机", exchange);
        }
    }
}
复制代码

因为咱们在 pom.xml配置文件内添加了RabbitMQ相关的依赖而且在上面application.yml文件内添加了对应的配置,SpringBoot为咱们自动实例化了AmqpTemplate,该实例能够发送任何类型的消息到指定队列。 咱们采用convertAndSend方法,将消息内容发送到指定ExchangeRouterKey队列,而且经过setExpiration方法设置过时时间,单位:毫秒。

编写发送测试

咱们在test目录下建立一个测试类,以下所示:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitMqLazyProviderApplication.class)
public class RabbitMqLazyProviderApplicationTests {
    /**
     * 消息队列提供者
     */
    @Autowired
    private MessageProvider messageProvider;

    /**
     * 测试延迟消息消费
     */
    @Test
    public void testLazy() {
        // 测试延迟10秒
        messageProvider.sendMessage("测试延迟消费,写入时间:" + new Date(),
                QueueEnum.MESSAGE_TTL_QUEUE.getExchange(),
                QueueEnum.MESSAGE_TTL_QUEUE.getRouteKey(),
                10000);
    }
}
复制代码

注意:@SpringBootTest注解内添加了classes入口类的配置,由于咱们是模块建立的项目并非默认建立的SpringBoot项目,这里须要配置入口程序类才能够运行测试。

在测试类咱们注入了MessageProvider消息提供者,调用sendMessage方法发送消息到消息通知延迟队列,而且设置延迟的时间为10秒,这里衡量发送到指定队列的标准是要看MessageRabbitMqConfiguration配置类内的相关Binding配置,经过ExchangeRouterKey值进行发送到指定的队列。

到目前为止咱们的rabbitmq-lazy-provider消息提供模块已经编写完成了,下面咱们来看看消息消费者模块。

队列消息消费者

咱们再来建立一个名为rabbitmq-lazy-consumer的模块(Create New Maven Module),一样须要在pom.xml配置文件内添加rabbitmq-common模块的依赖,以下所示:

<!--添加公共模块依赖-->
<dependency>
      <groupId>com.hengyu</groupId>
      <artifactId>rabbitmq-common</artifactId>
      <version>0.0.1-SNAPSHOT</version>
</dependency>
复制代码

固然一样须要在resource下建立application.yml并添加消息队列的相关配置,代码就不贴出来了,能够直接从rabbitmq-lazy-provider模块中复制application.yml文件到当前模块内。

消息消费者类

接下来建立一个名为MessageConsumer的消费者类,该类须要监听消息通知队列,代码以下所示:

/**
 * 消息通知 - 消费者
 *
 * @author:于起宇 <br/>
 * ===============================
 * Created with IDEA.
 * Date:2018/3/3
 * Time:下午5:00
 * 简书:http://www.jianshu.com/u/092df3f77bca
 * ================================
 */
@Component
@RabbitListener(queues = "message.center.create")
public class MessageConsumer {
    /**
     * logger instance
     */
    static Logger logger = LoggerFactory.getLogger(MessageConsumer.class);

    @RabbitHandler
    public void handler(String content) {
        logger.info("消费内容:{}", content);
    }
}
复制代码

@RabbitListener注解内配置了监听的队列,这里配置内容是QueueEnum枚举内的queueName属性值,固然若是你采用常量的方式在注解属性上是直接可使用的,枚举不支持这种配置,这里只能把QueueName字符串配置到queues属性上了。 因为咱们在消息发送时采用字符串的形式发送消息内容,这里在@RabbitHandler处理方法的参数内要保持数据类型一致!

消费者入口类

咱们为消费者模块添加一个入口程序类,用于启动消费者,代码以下所示:

/**
 * 【第四十六章:SpringBoot & RabbitMQ完成消息延迟消费】
 * 队列消费者模块 - 入口程序类
 *
 * @author:于起宇 <br/>
 * ===============================
 * Created with IDEA.
 * Date:2018/3/3
 * Time:下午4:55
 * 简书:http://www.jianshu.com/u/092df3f77bca
 * ================================
 */
@SpringBootApplication
public class RabbitMqLazyConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitMqLazyConsumerApplication.class, args);
    }
}
复制代码

测试

咱们的代码已经编写完毕,下面来测试下是否完成了咱们预想的效果,步骤以下所示:

1. 启动消费者模块
2. 执行RabbitMqLazyProviderApplicationTests.testLazy()方法进行发送消息到通知延迟队列
3. 查看消费者模块控制台输出内容
复制代码

咱们能够在消费者模块控制台看到输出内容:

2018-03-04 10:10:34.765  INFO 70486 --- [cTaskExecutor-1] c.h.r.lazy.consumer.MessageConsumer      : 消费内容:测试延迟消费,写入时间:Sun Mar 04 10:10:24 CST 2018
复制代码

咱们在提供者测试方法发送消息的时间为10:10:24,而真正消费的时间则为10:10:34,与咱们预计的同样,消息延迟了10秒后去执行消费。

总结

终上所述咱们完成了消息队列的延迟消费,采用死信方式,经过消息过时方式触发,在实际项目研发过程当中,延迟消费仍是颇有必要的,能够省去一些定时任务的配置。

本章源码已经上传到码云: SpringBoot配套源码地址:gitee.com/hengboy/spr… SpringCloud配套源码地址:gitee.com/hengboy/spr… SpringBoot相关系列文章请访问:目录:SpringBoot学习目录 QueryDSL相关系列文章请访问:QueryDSL通用查询框架学习目录 SpringDataJPA相关系列文章请访问:目录:SpringDataJPA学习目录,感谢阅读!

微信扫码关注 - 专一分享
相关文章
相关标签/搜索