第四十三章: 基于SpringBoot & RabbitMQ完成TopicExchange分布式消息消费

咱们在以前的两个章节第四十一章: 基于SpringBoot & RabbitMQ完成DirectExchange分布式消息消费第四十二章: 基于SpringBoot & RabbitMQ完成DirectExchange分布式消息多消费者消费提升了RabbitMQ消息队列的DirectExchange交换类型的消息消费,咱们以前的章节提到了RabbitMQ比较经常使用的交换类型有三种,咱们今天来看看TopicExchange主题交换类型。java

本章目标

基于SpringBoot平台完成RabbitMQTopicExchange消息类型交换。git

SpringBoot 企业级核心技术学习专题

专题 专题名称 专题描述
001 Spring Boot 核心技术 讲解SpringBoot一些企业级层面的核心组件
002 Spring Boot 核心技术章节源码 Spring Boot 核心技术简书每一篇文章码云对应源码
003 Spring Cloud 核心技术 对Spring Cloud核心技术全面讲解
004 Spring Cloud 核心技术章节源码 Spring Cloud 核心技术简书每一篇文章对应源码
005 QueryDSL 核心技术 全面讲解QueryDSL核心技术以及基于SpringBoot整合SpringDataJPA
006 SpringDataJPA 核心技术 全面讲解SpringDataJPA核心技术

解决问题

以前少年也遇到了一个问题,分类了多模块后消息队列没法自动建立,说来也可笑,以前没有时间去看这个问题,今天在编写本章文章时发现缘由居然是SpringBoot没有扫描到common模块内的配置类。让我一阵的头大~~~,咱们在XxxApplication启动类上添加@ComponentScan(value = "com.hengyu.rabbitmq")就能够自动建立队列了!!!web

构建项目

本章构建项目时一样采用多模块的方式进行设计,能够很好的看到消息处理的效果,由于是多模块项目,咱们先来建立一个SpringBoot项目,pom.xml配置文件依赖配置以下所示:spring

<dependencies>
		<!--rabbbitMQ相关依赖-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<!--web相关依赖-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<!--lombok依赖-->
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<!--spring boot tester-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<!--fast json依赖-->
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.40</version>
		</dependency>
	</dependencies>
复制代码

下面咱们先来构建公共RabbitMQ模块,由于咱们的消费者以及生产者都是须要RabbitMQ相关的配置信息,这里咱们能够提取出来,使用时进行模块之间的引用。数据库

rabbitmq-topic-common

建立子模块rabbitmq-topic-common,在resources下添加application.yml配置文件并配置RabbitMQ相关的依赖配置,以下所示:json

spring:
  #rabbitmq消息队列配置信息
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    publisher-confirms: true
复制代码
定义交换配置信息

咱们跟以前的章节一张,独立编写一个枚举类型来配置消息队列的交换信息,以下所示:bash

/**
 * rabbitmq交换配置枚举
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:13:56
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Getter
public enum ExchangeEnum
{
    /**
     * 用户注册交换配置枚举
     */
    USER_REGISTER_TOPIC_EXCHANGE("register.topic.exchange")
    ;
    private String name;

    ExchangeEnum(String name) {
        this.name = name;
    }
}
复制代码
定义队列配置信息

一样消息队列的基本信息配置也一样采用枚举的形式配置,以下所示:app

/**
 * 队列配置枚举
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:05
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Getter
public enum QueueEnum
{
    /**
     * 用户注册
     * 建立帐户消息队列
     */
    USER_REGISTER_CREATE_ACCOUNT("register.account","register.#"),
    /**
     * 用户注册
     * 发送注册成功邮件消息队列
     */
    USER_REGISTER_SEND_MAIL("register.mail","register.#")
    ;
    /**
     * 队列名称
     */
    private String name;
    /**
     * 队列路由键
     */
    private String routingKey;

    QueueEnum(String name, String routingKey) {
        this.name = name;
        this.routingKey = routingKey;
    }
}
复制代码

消息队列枚举内添加了两个属性,分别对应了队列名称队列路由,咱们本章所讲解的TopicExchange类型消息队列能够根据路径信息配置多个消息消费者,而转发的匹配规则信息则是咱们定义的队列的路由信息。框架

定义发送消息路由信息

咱们在发送消息到队列时,须要咱们传递一个路由相关的配置信息,RabbitMQ会根据发送时的消息路由规则信息与定义消息队列时的路由信息进行匹配,若是能够匹配则调用该队列的消费者完成消息的消费,发送消息路由信息配置以下所示:dom

/**
 * 消息队列topic交换路由key配置枚举
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/12/11
 * Time:21:58
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Getter
public enum TopicEnum {
    /**
     * 用户注册topic路由key配置
     */
    USER_REGISTER("register.user")
    ;

    private String topicRouteKey;

    TopicEnum(String topicRouteKey) {
        this.topicRouteKey = topicRouteKey;
    }
}
复制代码
路由特殊字符 #

咱们在QueueEnum内配置的路由键时有个特殊的符号:#,在RabbitMQ消息队列内路由配置#时表示能够匹配零个或多个字符,咱们TopicEnum枚举内定义的register.user,则是能够匹配QueueEnum枚举定义register.#队列的路由规则。 固然发送消息时若是路由传递:register.user.account也是能够一样匹配register.#的路由规则。

路由特殊字符 *

除此以外比较经常使用到的特殊字符还有一个*,在RabbitMQ消息队列内路由配置*时表示能够匹配一个字符,咱们QueueEnum定义路由键若是修改为register.*时,发送消息时路由为register.user则是能够接受到消息的。但若是发送时的路由为register.user.account时,则是没法匹配该消息。

消息队列配置

配置准备工做已经作好,接下来咱们开始配置队列相关的内容,跟以前同样咱们须要配置QueueExchangeBinding将消息队列与交换绑定。下面咱们来看看配置跟以前的章节有什么差别的地方,代码以下所示:

/**
 * 用户注册消息队列配置
 * ========================
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:16:58
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Configuration
public class UserRegisterQueueConfiguration {

    private Logger logger = LoggerFactory.getLogger(UserRegisterQueueConfiguration.class);
    /**
     * 配置用户注册主题交换
     * @return
     */
    @Bean
    public TopicExchange userTopicExchange()
    {
        TopicExchange topicExchange = new TopicExchange(ExchangeEnum.USER_REGISTER_TOPIC_EXCHANGE.getName());
        logger.info("用户注册交换实例化成功。");
        return topicExchange;
    }

    /**
     * 配置用户注册
     * 发送激活邮件消息队列
     * 并设置持久化队列
     * @return
     */
    @Bean
    public Queue sendRegisterMailQueue()
    {
        Queue queue = new Queue(QueueEnum.USER_REGISTER_SEND_MAIL.getName());
        logger.info("建立用户注册消息队列成功");
        return queue;
    }

    /**
     * 配置用户注册
     * 建立帐户消息队列
     * 并设置持久化队列
     * @return
     */
    @Bean
    public Queue createAccountQueue()
    {
        Queue queue = new Queue(QueueEnum.USER_REGISTER_CREATE_ACCOUNT.getName());
        logger.info("建立用户注册帐号队列成功.");
        return queue;
    }

    /**
     * 绑定用户发送注册激活邮件队列到用户注册主题交换配置
     * @return
     */
    @Bean
    public Binding sendMailBinding(TopicExchange userTopicExchange,Queue sendRegisterMailQueue)
    {
        Binding binding = BindingBuilder.bind(sendRegisterMailQueue).to(userTopicExchange).with(QueueEnum.USER_REGISTER_SEND_MAIL.getRoutingKey());
        logger.info("绑定发送邮件到注册交换成功");
        return binding;
    }

    /**
     * 绑定用户建立帐户到用户注册主题交换配置
     * @return
     */
    @Bean
    public Binding createAccountBinding(TopicExchange userTopicExchange,Queue createAccountQueue)
    {
        Binding binding = BindingBuilder.bind(createAccountQueue).to(userTopicExchange).with(QueueEnum.USER_REGISTER_CREATE_ACCOUNT.getRoutingKey());
        logger.info("绑定建立帐号到注册交换成功。");
        return binding;
    }
}
复制代码

咱们从上面开始分析。 第一步: 首先咱们建立了TopicExchange消息队列对象,使用ExchangeEnum枚举内的USER_REGISTER_TOPIC_EXCHANGE类型做为交换名称。

第二步:咱们建立了发送注册邮件的队列sendRegisterMailQueue,使用QueueEnum枚举内的类型USER_REGISTER_SEND_MAIL做为队列名称。

第三步:与发送邮件队列一致,用户建立完成后须要初始化帐户信息,而createAccountQueue消息队列后续逻辑就是来完成该工做,使用QueueEnum枚举内的USER_REGISTER_CREATE_ACCOUNT枚举做为建立帐户队列名称。

第四步:在上面步骤中已经将交换、队列建立完成,下面就开始将队列绑定到用户注册交换,从而实现注册用户消息队列消息消费,sendMailBinding绑定了QueueEnum.USER_REGISTER_SEND_MAILRoutingKey配置信息。

createAccountBinding绑定了QueueEnum.USER_REGISTER_CREATE_ACCOUNTRoutingKey配置信息。

到目前为止咱们完成了rabbitmq-topic-common模块的全部配置信息,下面咱们开始编写用户注册消息消费者模块。

rabbitmq-topic-consumer

咱们首先来建立一个子模块命名为rabbitmq-topic-consumer,在pom.xml配置文件内添加rabbitmq-topic-common模块的引用,以下所示:

....//
<dependencies>
        <!--公共模块依赖-->
        <dependency>
            <groupId>com.hengyu</groupId>
            <artifactId>rabbitmq-topic-common</artifactId>
            <version>${parent.version}</version>
        </dependency>
    </dependencies>
....//
复制代码
消费者程序入口

下面咱们来建立程序启动类RabbitMqTopicConsumerApplication,在这里须要注意,手动配置下扫描路径@ComponentScan,启动类代码以下所示:

/**
 * 消息消费者程序启动入口
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/12/11
 * Time:21:48
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@SpringBootApplication
@ComponentScan(value = "com.hengyu.rabbitmq")
public class RabbitMqTopicConsumerApplication {

    /**
     * logback
     */
    private static Logger logger = LoggerFactory.getLogger(RabbitMqTopicConsumerApplication.class);

    /**
     * 程序入口
     * @param args
     */
    public static void main(String[] args)
    {
        SpringApplication.run(RabbitMqTopicConsumerApplication.class,args);

        logger.info("【【【【【Topic队列消息Consumer启动成功】】】】】");
    }
}
复制代码

手动配置扫描路径在文章的开始解释过了,主要目的是为了扫描到RabbitMQConfiguration配置类内的信息,让RabbitAdmin自动建立配置信息到server端。

发送邮件消费者

发送邮件消息费监听register.mail消息队列信息,以下所示:

/**
 * 发送用户注册成功邮件消费者
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/12/11
 * Time:22:07
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Component
@RabbitListener(queues = "register.mail")
public class SendMailConsumer
{

    /**
     * logback
     */
    Logger logger = LoggerFactory.getLogger(SendMailConsumer.class);

    /**
     * 处理消息
     * 发送用户注册成功邮件
     * @param userId 用户编号
     */
    @RabbitHandler
    public void handler(String userId)
    {

        logger.info("用户:{},注册成功,自动发送注册成功邮件.",userId);

        //... 发送注册成功邮件逻辑
    }
}
复制代码

在这里我只是完成了消息的监听,具体的业务逻辑能够根据需求进行处理。

建立帐户消费者

建立用户帐户信息消费者监听队列register.account,代码以下所示:

/**
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/12/11
 * Time:22:04
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Component
@RabbitListener(queues = "register.account")
public class CreateAccountConsumer {

    /**
     * logback
     */
    Logger logger = LoggerFactory.getLogger(CreateAccountConsumer.class);

    /**
     * 处理消息
     * 建立用户帐户
     * @param userId 用户编号
     */
    @RabbitHandler
    public void handler(String userId)
    {
        logger.info("用户:{},注册成功,自动建立帐户信息.",userId);

        //... 建立帐户逻辑
    }
}
复制代码

建立帐户,帐户初始化逻辑均可以在handler方法进行处理,本章没有作数据库复杂的处理,因此没有过多的逻辑处理在消费者业务内。

rabbitmq-topic-provider

接下来是咱们的消息提供者的模块编写,咱们依然先来建立程序入口类,并添加扫描配置@ComponentScan路径,代码以下所示:

/**
 * 消息生产者程序启动入口
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/12/11
 * Time:21:48
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@SpringBootApplication
@ComponentScan(value = "com.hengyu.rabbitmq")
public class RabbitMqTopicProviderApplication {

    /**
     * logback
     */
    private static Logger logger = LoggerFactory.getLogger(RabbitMqTopicProviderApplication.class);

    /**
     * 程序入口
     * @param args
     */
    public static void main(String[] args)
    {
        SpringApplication.run(RabbitMqTopicProviderApplication.class,args);

        logger.info("【【【【【Topic队列消息Provider启动成功】】】】】");
    }
}
复制代码
定义消息发送接口

建立QueueMessageService队列消息发送接口并添加send方法,以下所示:

/**
 * 消息队列业务
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:50
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
public interface QueueMessageService
{
    /**
     * 发送消息到rabbitmq消息队列
     * @param message 消息内容
     * @param exchangeEnum 交换配置枚举
     * @param routingKey 路由key
     * @throws Exception
     */
    public void send(Object message, ExchangeEnum exchangeEnum, String routingKey) throws Exception;
}
复制代码

send方法内有三个参数,解析以下:

  • message:发送消息内容,能够为任意类型,固然本章内仅仅是java.lang.String。
  • exchangeEnum:咱们自定义的交换枚举类型,方便发送消息到指定交换。
  • routingKey:发送消息时的路由键内容,该值采用TopicEnum枚举内的topicRouteKey做为参数值。

下面咱们来看看该接口的实现类QueueMessageServiceSupportsend方法实现,以下所示:

/**
 * 消息队列业务逻辑实现
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:52
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Component
public class QueueMessageServiceSupport
    implements QueueMessageService
{
    /**
     * 消息队列模板
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void send(Object message, ExchangeEnum exchangeEnum, String routingKey) throws Exception {
        //发送消息到消息队列
        rabbitTemplate.convertAndSend(exchangeEnum.getName(),routingKey,message);
    }
}
复制代码

咱们经过RabbitTemplate实例的convertAndSend方法将对象类型转换成JSON字符串后发送到消息队列服务端,RabbitMQ接受到消息后根据注册的消费者而且路由规则筛选后进行消息转发,并实现消息的消费。

运行测试

为了方便测试咱们建立一个名为UserService的实现类,以下所示:

/**
 * 用户业务逻辑
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/12/11
 * Time:22:10
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Service
public class UserService
{
    /**
     * 消息队列发送业务逻辑
     */
    @Autowired
    private QueueMessageService queueMessageService;

    /**
     * 随机建立用户
     * 随机生成用户uuid编号,发送到消息队列服务端
     * @return
     * @throws Exception
     */
    public String randomCreateUser() throws Exception
    {
        //用户编号
        String userId = UUID.randomUUID().toString();
        //发送消息到rabbitmq服务端
        queueMessageService.send(userId, ExchangeEnum.USER_REGISTER_TOPIC_EXCHANGE, TopicEnum.USER_REGISTER.getTopicRouteKey());
        return userId;
    }
}
复制代码

该类内添加了一个名为randomCreateUser随机建立用户的方法,经过UUID随机生成字符串做为用户的编号进行传递给用户注册消息队列,完成用户的模拟建立。

编写测试用例

接下来咱们建立RabbitMqTester测试类来完成随机用户建立消息发送,测试用例完成简单的UserService注入,并调用randomCreateUser方法,以下所示:

/**
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/12/11
 * Time:22:10
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitMqTopicProviderApplication.class)
public class RabbitMqTester
{
    /**
     * 用户业务逻辑
     */
    @Autowired
   private UserService userService;

    /**
     * 模拟随机建立用户 & 发送消息到注册用户消息队列
     * @throws Exception
     */
    @Test
    public void testTopicMessage() throws Exception
    {
        userService.randomCreateUser();
    }
}
复制代码

到目前为止,咱们的编码已经完成,下面咱们按照下面的步骤启动测试:

  1. 启动rabbitmq-topic-consumer消息消费者模块,并查看控制台输出内容是否正常
  2. 运行rabbitmq-topic-provider模块测试用例方法testTopicMessage
  3. 查看rabbitmq-topic-consumer控制台输出内容

最终效果:

2017-12-30 18:39:16.819  INFO 2781 --- [           main] c.h.r.c.RabbitMqTopicConsumerApplication : 【【【【【Topic队列消息Consumer启动成功】】】】】
2017-12-30 18:39:29.376  INFO 2781 --- [cTaskExecutor-1] c.h.r.consumer.CreateAccountConsumer     : 用户:c6ef682d-da2e-4cac-a004-c244ff4c4503,注册成功,自动建立帐户信息.
2017-12-30 18:39:29.376  INFO 2781 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.SendMailConsumer   : 用户:c6ef682d-da2e-4cac-a004-c244ff4c4503,注册成功,自动发送注册成功邮件.
复制代码

总结

本章主要讲解了TopicExchange交换类型如何消费队列消息,讲解了经常使用到了的特殊字符#*如何匹配,解决了多模块下的队列配置信息没法自动建立问题。还有一点须要注意TopicExchange交换类型在消息消费时不存在固定的前后顺序!!!

本章源码已经上传到码云: SpringBoot配套源码地址:gitee.com/hengboy/spr… SpringCloud配套源码地址:gitee.com/hengboy/spr… SpringBoot相关系列文章请访问:目录:SpringBoot学习目录 QueryDSL相关系列文章请访问:QueryDSL通用查询框架学习目录 SpringDataJPA相关系列文章请访问:目录:SpringDataJPA学习目录,感谢阅读! 欢迎加入QQ技术交流群,共同进步。

QQ技术交流群
相关文章
相关标签/搜索