第四十一章: 基于SpringBoot & RabbitMQ完成DirectExchange分布式消息消费

消息队列目前流行的有KafKa、RabbitMQ、ActiveMQ等,它们的诞生无非不是为了解决消息的分布式消费,完成项目、服务之间的解耦动做。消息队列提供者与消费者之间彻底采用异步通讯方式,极力的提升了系统的响应能力,从而提升系统的网络请求吞吐量。 每一种的消息队列都有它在设计上的独一无二的优点,在实际的项目技术选型时根据项目的需求来肯定。html

本章目标

基于SpringBoot项目整合RabbitMQ消息队列,完成DirectExchange(路由键)分布式消息消费。java

Exchange

RabbitMQ中有三种转发方式,分别是:mysql

DirectExchange:路由键方式转发消息。 FanoutExchange:广播方式转发消息。 TopicExchange:主题匹配方式转发消息。git

咱们本章先来说解DirectExchange路由键方式,根据设置的路由键的值进行彻底匹配时转发,下面咱们来看一张图,形象的介绍了转发消息匹配流程,以下图所示: web

DirectExchange

咱们能够看到上图,当消息被提供者发送到RabbitMQ后,会根据配置队列的交换以及绑定实例进行转发消息,上图只会将消息转发路由键为KEY的队列消费者对应的实现方法逻辑中,从而完成消息的消费过程。spring

安装RabbitMQ

由于RabbitMQ是跨平台的分布式消息队列服务,能够部署在任意的操做系统上,下面咱们分别介绍在不一样的系统下该怎么去安装RabbitMQ服务。sql

咱们本章采用的环境版本以下:数据库

  • RabbitMQ Server 3.6.14
  • Erlang/OTP_X64 20.1

Windows下安装

咱们先去RabbitMQ官方网站下载最新版的安装包,下载地址:https://www.rabbitmq.com/download.html,能够根据不一样的操做系统选择下载。 咱们在安装RabbitMQ服务端时须要Erlang环境的支持,因此咱们须要先安装Erlangjson

  1. 咱们经过Erlang官方网站http://www.erlang.org/downloads下载最新的安装包,由于是国外的网站因此下载比较慢,不过没有关系,我再本章源码的resource目录下存放了安装包,本章源码在文章底部。bash

  2. 咱们访问RabiitmQ官方下载地址https://www.rabbitmq.com/download.html下载最新安装包,该安装包一样存放在resource目录下。

  3. 运行安装Erlang

  4. 运行安装RabbitMQ

5.检查服务是否安装完成,RabbitMQ安装完成后会以服务的形式建立,而且随着开机启动,以下所示:

Rabbit服务

Mac OS X 安装

在Mac OS X中咱们使用brew工具能够很简单的安装RabbitMQ服务端,步骤以下:

  1. brew更新到最新版本,执行:brew update
  2. 接下来咱们安装Erlang,执行:brew install erlang
  3. 最后安装RabbitMQ,执行:brew install rabbitmq

咱们经过上面的步骤安装后,RabbitMQ会被自动安装到/usr/local/sbin目录下,下面咱们须要手动设置环境变量,来支持服务运行,修改.profile配置文件并添加以下配置:

PATH=$PATH:/usr/local/sbin
复制代码

配置完成后,能够直接经过rabbitmq-server命令来操做RabbitMQ服务。

Ubuntu 安装

Ubuntu操做系统中,咱们能够直接使用APT仓库进行安装,我使用的系统版本是16.04,系统版本并不影响安装。

  1. 安装Erlang,执行命令:sudo apt-get install erlang
  2. 下面咱们须要将RabbitMQ的安装源配置信息写入到系统的/etc/apt/sources.list.d配置文件内,执行以下命令:
echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list
复制代码
  1. 下面咱们更新APT本地仓库的安装包列表,执行命令:sudo apt-get update
  2. 最后安装RabbitMQ服务,执行命令:sudo apt-get install rabbitmq-server

启用界面管理插件

RabbitMQ提供了界面管理的web插件,咱们只须要启用指定的插件就能够了,下面咱们来看看Windows操做系统下该怎么启动界面管理插件。 咱们使用CMD进入RabbitMQ安装目录C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.14,而后咱们进入sbin目录,能够看到目录内存在不少个bat脚本程序,咱们找到rabbitmq-plugins.bat,这个脚本程序能够控制RabbitMQ插件启用禁用,咱们执行以下脚本命令来启用界面管理插件:

rabbitmq-plugins.bat enable rabbitmq_management
复制代码

命令行输出内容以下所示:

The following plugins have been enabled:
  amqp_client
  cowlib
  cowboy
  rabbitmq_web_dispatch
  rabbitmq_management_agent
  rabbitmq_management

Applying plugin configuration to rabbit@yuqiyu... started 6 plugins.
复制代码

能够看到输出的内容RabbitMQ自动启动了6个插件,咱们如今访问http://127.0.0.1:15672地址能够直接打开RabbitMQ的界面管理平台,而默认的用户名/密码分别为:guest/guest,经过该用户能够直接登陆管理平台。

禁用界面管理插件

咱们一样能够禁用RabbitMQ指定插件,执行以下命令:

rabbitmq-plugins.bat disable rabbitmq_management
复制代码

命令建立输出内容则是相关中止插件的日志,以下:

The following plugins have been disabled:
  amqp_client
  cowlib
  cowboy
  rabbitmq_web_dispatch
  rabbitmq_management_agent
  rabbitmq_management

Applying plugin configuration to rabbit@yuqiyu... stopped 6 plugins.
复制代码

这样咱们再访问http://127.0.0.1:15672就会发现咱们没法访问到界面。

构建项目

咱们使用idea开发工具建立一个SpringBoot项目,添加依赖,pom.xml配置文件以下所示:

<dependencies>
		<!--rabbitmq依赖-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<!--web依赖-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<!--lombok依赖-->
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<!--fastjson依赖-->
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.40</version>
		</dependency>
		<!--测试依赖-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>
复制代码

咱们本章来模拟用户注册完成后,将注册用户的编号经过Provider模块发送到RabbitMQ,而后RabbitMQ根据配置的DirectExchange的路由键进行异步转发。

初始化用户表

下面咱们先来建立所须要的用户基本信息表,建表SQL以下所示:

CREATE TABLE `user_info` (
  `UI_ID` int(11) DEFAULT NULL COMMENT '用户编号',
  `UI_USER_NAME` varchar(20) DEFAULT NULL COMMENT '用户名称',
  `UI_NAME` varchar(20) DEFAULT NULL COMMENT '真实姓名',
  `UI_AGE` int(11) DEFAULT NULL COMMENT '用户年龄',
  `UI_BALANCE` decimal(10,0) DEFAULT NULL COMMENT '用户余额'
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='用户基本信息表';
复制代码

构建 rabbitmq-provider 项目

基于咱们上述的项目建立一个Maven子模块,命名为:rabbitmq-provider,由于是直接建立的Module项目,IDEA并无给我建立SpringApplication启用类。

建立入口类

下面咱们自行建立一个Provider项目启动入口程序,以下所示:

/**
 * 消息队列消息提供者启动入口
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:14
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@SpringBootApplication
public class RabbitmqProviderApplication
{
    static Logger logger = LoggerFactory.getLogger(RabbitmqProviderApplication.class);

    /**
     * 消息队列提供者启动入口
     * @param args
     */
    public static void main(String[] args)
    {
        SpringApplication.run(RabbitmqProviderApplication.class,args);

        logger.info("【【【【【消息队列-消息提供者启动成功.】】】】】");
    }
}
复制代码
application.properties配置文件

下面咱们在src/main/resource目录下建立application.properties并将对应RabbitMQ以及Druid的配置加入,以下所示:

#用户名
spring.rabbitmq.username=guest
#密码
spring.rabbitmq.password=guest
#服务器ip
spring.rabbitmq.host=localhost
#虚拟空间地址
spring.rabbitmq.virtual-host=/
#端口号
spring.rabbitmq.port=5672
#配置发布消息确认回调
spring.rabbitmq.publisher-confirms=true

#数据源配置
spring.datasource.druid.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.druid.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true
spring.datasource.druid.username=root
spring.datasource.druid.password=123456
复制代码

RabbitMQ内有个virtual-host即虚拟主机的概念,一个RabbitMQ服务能够配置多个虚拟主机,每个虚拟机主机之间是相互隔离,相互独立的,受权用户到指定的virtual-host就能够发送消息到指定队列。

用户实体

本章数据库操做采用spring-data-jpa,相关文章请访问:第十三章:SpringBoot实战SpringDataJPA,咱们基于user_info数据表对应建立实体,以下所示:

@Data
@Table(name = "user_info")
@Entity
public class UserEntity
    implements Serializable
{
    /**
     * 用户编号
     */
    @Id
    @GeneratedValue
    @Column(name = "UI_ID")
    private Long id;
    /**
     * 用户名称
     */
    @Column(name = "UI_USER_NAME")
    private String userName;
    /**
     * 姓名
     */
    @Column(name = "UI_NAME")
    private String name;
    /**
     * 年龄
     */
    @Column(name = "UI_AGE")
    private int age;
    /**
     * 余额
     */
    @Column(name = "UI_BALANCE")
    private BigDecimal balance;
}
复制代码
用户数据接口

建立UserRepository用户数据操做接口,并继承JpaRepository得到spring-data-jpa相关的接口定义方法。以下所示:

/**
 * 用户数据接口定义
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:35
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
public interface UserRepository
    extends JpaRepository<UserEntity,Long>
{
}
复制代码
用户业务逻辑实现

本章只是简单完成了数据的添加,代码以下所示:

/**
 * 用户业务逻辑实现类
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:37
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Service
@Transactional(rollbackFor = Exception.class)
public class UserService
{
    @Autowired
    private UserRepository userRepository;
    /**
     * 消息队列业务逻辑实现
     */
    @Autowired
    private QueueMessageService queueMessageService;

    /**
     * 保存用户
     * 并写入消息队列
     * @param userEntity
     * @return
     */
    public Long save(UserEntity userEntity) throws Exception
    {
        /**
         * 保存用户
         */
        userRepository.save(userEntity);
        /**
         * 将消息写入消息队列
         */
        queueMessageService.send(userEntity.getId(), ExchangeEnum.USER_REGISTER, QueueEnum.USER_REGISTER);

        return userEntity.getId();
    }
复制代码

在上面业务逻辑实现类内出现了一个名为QueueMessageService消息队列实现类,该类是咱们定义的用于发送消息到消息队列的统一入口,在下面咱们会详细讲解。

用户控制器

建立一个名为UserController的控制器类,对应编写一个添加用户的请求方法,以下所示:

/**
 * 用户控制器
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:41
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@RestController
@RequestMapping(value = "/user")
public class UserController
{
    /**
     * 用户业务逻辑
     */
    @Autowired
    private UserService userService;

    /**
     * 保存用户基本信息
     * @param userEntity
     * @return
     */
    @RequestMapping(value = "/save")
    public UserEntity save(UserEntity userEntity) throws Exception
    {
        userService.save(userEntity);
        return userEntity;
    }
}
复制代码

到这咱们添加用户的流程已经编写完成了,那么咱们就来看下消息队列QueueMessageService接口的定义以及实现类的定义。

消息队列方法定义接口

建立一个名为QueueMessageService的接口而且继承了RabbitTemplate.ConfirmCallback接口,而RabbitTemplate.ConfirmCallback接口是用来回调消息发送成功后的方法,当一个消息被成功写入到RabbitMQ服务端时,就会自动的回调RabbitTemplate.ConfirmCallback接口内的confirm方法完成通知,QueueMessageService接口以下所示:

/**
 * 消息队列业务
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:50
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
public interface QueueMessageService
    extends RabbitTemplate.ConfirmCallback
{
    /**
     * 发送消息到rabbitmq消息队列
     * @param message 消息内容
     * @param exchangeEnum 交换配置枚举
     * @param queueEnum 队列配置枚举
     * @throws Exception
     */
    public void send(Object message, ExchangeEnum exchangeEnum, QueueEnum queueEnum) throws Exception;
}
复制代码

接下来咱们须要实现该接口内的全部方法,并作出一些业务逻辑的处理。

消息队列业务实现

建立名为QueueMessageServiceSupport实体类实现QueueMessageService接口,并实现接口内的全部方法,以下所示:

/**
 * 消息队列业务逻辑实现
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:52
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Component
public class QueueMessageServiceSupport
    implements QueueMessageService
{
    /**
     * 消息队列模板
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Override
    public void send(Object message, ExchangeEnum exchangeEnum, QueueEnum queueEnum) throws Exception {
        //设置回调为当前类对象
        rabbitTemplate.setConfirmCallback(this);
        //构建回调id为uuid
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        //发送消息到消息队列
        rabbitTemplate.convertAndSend(exchangeEnum.getValue(),queueEnum.getRoutingKey(),message,correlationId);
    }

    /**
     * 消息回调确认方法
     * @param correlationData 请求数据对象
     * @param ack 是否发送成功
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println(" 回调id:" + correlationData.getId());
        if (ack) {
            System.out.println("消息发送成功");
        } else {
            System.out.println("消息发送失败:" + cause);
        }
    }
}
复制代码

convertAndSend方法用于将Object类型的消息转换后发送到RabbitMQ服务端,发送是的消息类型要与消息消费者方法参数保持一致。

confirm方法内,咱们仅仅打印了消息发送时的id,根据ack参数输出消息发送状态。

在上面代码中咱们注入了RabbitTemplate消息队列模板实例,而经过该实例咱们能够将消息发送到RabbitMQ服务端。那么这个实例具体在什么地方定义的呢?咱们带着这个疑问来建立下面的模块,咱们须要将RabbitMQ相关的配置抽取出来做为一个单独的Module存在。

构建 rabbitmq-common 项目

该模块项目很简单,只是添加RabbitMQ相关的配置信息,因为Module是一个子模块因此继承了parent全部的依赖,固然咱们用到的RabbitMQ相关依赖也不例外。

配置rabbitmq

在建立配置类以前,咱们先来定义两个枚举,分别存放了队列的交换信息、队列路由信息,

  • ExchangeEnum (存放了队列交换配置信息)
/**
 * rabbitmq交换配置枚举
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:13:56
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Getter
public enum ExchangeEnum
{
    /**
     * 用户注册交换配置枚举
     */
    USER_REGISTER("user.register.topic.exchange")
    ;
    private String value;

    ExchangeEnum(String value) {
        this.value = value;
    }
}
复制代码
  • QueueEnum (存放了队列信息以及队列的路由配置信息)
/**
 * 队列配置枚举
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:14:05
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Getter
public enum QueueEnum
{
    /**
     * 用户注册枚举
     */
    USER_REGISTER("user.register.queue","user.register")
    ;
    /**
     * 队列名称
     */
    private String name;
    /**
     * 队列路由键
     */
    private String routingKey;

    QueueEnum(String name, String routingKey) {
        this.name = name;
        this.routingKey = routingKey;
    }
}
复制代码

建立名为UserRegisterQueueConfiguration的实体类用于配置本章用到的用户注册队列信息,若是你得项目中使用多个队列,建议每个业务逻辑建立一个配置类,分开维护,这样不容易出错。配置信息以下:

/**
 * 用户注册消息队列配置
 * ========================
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:16:58
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Configuration
public class UserRegisterQueueConfiguration {
    /**
     * 配置路由交换对象实例
     * @return
     */
    @Bean
    public DirectExchange userRegisterDirectExchange()
    {
        return new DirectExchange(ExchangeEnum.USER_REGISTER.getValue());
    }

    /**
     * 配置用户注册队列对象实例
     * 并设置持久化队列
     * @return
     */
    @Bean
    public Queue userRegisterQueue()
    {
        return new Queue(QueueEnum.USER_REGISTER.getName(),true);
    }

    /**
     * 将用户注册队列绑定到路由交换配置上并设置指定路由键进行转发
     * @return
     */
    @Bean
    public Binding userRegisterBinding()
    {
        return BindingBuilder.bind(userRegisterQueue()).to(userRegisterDirectExchange()).with(QueueEnum.USER_REGISTER.getRoutingKey());
    }
}
复制代码

该配置类大体分为以下三部分:

  • 配置交换实例 配置DirectExchange实例对象,为交换设置一个名称,引用ExchangeEnum枚举配置的交换名称,消息提供者与消息消费者的交换名称必须一致才具有的第一步的通信基础。

  • 配置队列实例 配置Queue实例对象,为消息队列设置一个名称,引用QueueEnum枚举配置的队列名称,固然队列的名称一样也是提供者与消费者之间的通信基础。

  • 绑定队列实例到交换实例 配置Binding实例对象,消息绑定的目的就是将Queue实例绑定到Exchange上,而且经过设置的路由Key进行消息转发,配置了路由Key后,只有符合该路由配置的消息才会被转发到绑定交换上的消息队列。

咱们的rabbitmq-common模块已经编写完成。

添加 rabbitmq-provider 依赖 rabbitmq-common

下面咱们回到rabbitmq-provider模块,修改pom.xml配置文件,以下所示:

<dependencies>
        <!--添加common模块依赖-->
        <dependency>
            <groupId>com.hengyu</groupId>
            <artifactId>rabbitmq-common</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
        <!--mysql依赖-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <!--druid数据源依赖-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.5</version>
        </dependency>
        <!--data jpa依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
    </dependencies>
复制代码

能够看到咱们将rabbitmq-common模块添加到了rabbitmq-provider模块的pom配置文件内,完成了模块之间的相互依赖,这样咱们rabbitmq-provider就自动添加了对应的消息队列配置。

构建rabbitmq-consumer

咱们再来建立一个rabbitmq-consumer队列消息消费者模块,用于接受消费用户注册消息。

建立入口类

一样咱们先来建立一个SpringApplication入口启动类,以下所示:

/**
 * 消息队列消息消费者入口
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:15:15
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@SpringBootApplication
public class RabbitmqConsumerApplication
{
    static Logger logger = LoggerFactory.getLogger(RabbitmqConsumerApplication.class);

    /**
     * rabbitmq消费者启动入口
     * @param args
     */
    public static void main(String[] args)
    {
        SpringApplication.run(RabbitmqConsumerApplication.class,args);

        logger.info("【【【【【消息队列-消息消费者启动成功.】】】】】");
    }
}
复制代码
application.properties配置文件

配置文件的消息队列配置信息要与rabbitmq-provider配置文件一致,以下所示:

spring.application.name=rabbitmq-consumer
#启动端口
server.port=1111
#用户名
spring.rabbitmq.username=guest
#密码
spring.rabbitmq.password=guest
#服务器ip
spring.rabbitmq.host=localhost
#虚拟空间地址
spring.rabbitmq.virtual-host=/
#端口号
spring.rabbitmq.port=5672
#配置发布消息确认回调
spring.rabbitmq.publisher-confirms=true
复制代码

咱们修改了程序启动的端口号,为了咱们下面进行测试的时候不出现端口占用的状况。

若是RabbitMQ配置信息与rabbitmq-provider不一致,就不会收到消费消息。

用户注册消息消费者

建立名为UserConsumer类,用于完成消息监听,而且实现消息消费,以下所示:

/**
 * 用户注册消息消费者
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:15:20
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Component
@RabbitListener(queues = "user.register.queue")
public class UserConsumer {

    @RabbitHandler
    public void execute(Long userId)
    {
        System.out.println("用户:" + userId+",完成了注册");

        //...//自行业务逻辑处理
    }
}
复制代码

在消息消费者类内,有两个陌生的注解:

  • @RabbitListener RabbitMQ队列消息监听注解,该注解配置监听queues内的队列名称列表,能够配置多个。队列名称对应本章rabbitmq-common模块内QueueEnum枚举name属性。
  • @RabbitHandler RabbitMQ消息处理方法,该方法的参数要与rabbitmq-provider发送消息时的类型保持一致,不然没法自动调用消费方法,也就没法完成消息的消费。

#运行测试 咱们接下来在rabbitmq-provider模块src/test/java下建立一个测试用例,访问用户注册控制器请求路径,以下所示:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = RabbitmqProviderApplication.class)
public class UserTester
{
    /**
     * 模拟mvc测试对象
     */
    private MockMvc mockMvc;

    /**
     * web项目上下文
     */
    @Autowired
    private WebApplicationContext webApplicationContext;

    /**
     * 全部测试方法执行以前执行该方法
     */
    @Before
    public void before() {
        //获取mockmvc对象实例
        mockMvc = MockMvcBuilders.webAppContextSetup(webApplicationContext).build();
    }

    /**
     * 测试添加用户
     * @throws Exception
     */
    @Test
    public void testUserAdd() throws Exception
    {
        mockMvc.perform(MockMvcRequestBuilders.post("/user/save")
                .param("userName","yuqiyu")
                .param("name","恒宇少年")
                .param("age","23")
        )
                .andDo(MockMvcResultHandlers.log())
                .andReturn();
    }
}
复制代码

调用测试用例时会自动将参数保存到数据库,而且将用户编号发送到RabbitMQ服务端,而RabbitMQ根据交换配置以及队列配置转发消息到消费者实例。

启动 rabbitmq-consumer

咱们先来把rabbitmq-consumer项目启动,控制台输出启动日志以下所示:

.....
51.194  INFO 2340 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Bean with name 'rabbitConnectionFactory' has been autodetected for JMX exposure
2017-12-03 16:58:51.196  INFO 2340 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Located managed bean 'rabbitConnectionFactory': registering with JMX server as MBean [org.springframework.amqp.rabbit.connection:name=rabbitConnectionFactory,type=CachingConnectionFactory]
2017-12-03 16:58:51.216  INFO 2340 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2017-12-03 16:58:51.237  INFO 2340 --- [cTaskExecutor-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#443ff8ef:0/SimpleConnection@4369ac5c [delegate=amqp://guest@127.0.0.1:5672/, localPort= 62107]
2017-12-03 16:58:51.287  INFO 2340 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 1111 (http)
2017-12-03 16:58:51.290  INFO 2340 --- [           main] c.h.r.c.RabbitmqConsumerApplication      : Started RabbitmqConsumerApplication in 2.354 seconds (JVM running for 3.026)
2017-12-03 16:58:51.290  INFO 2340 --- [           main] c.h.r.c.RabbitmqConsumerApplication      : 【【【【【消息队列-消息消费者启动成功.】】】】】
复制代码

该部分启动日志就是咱们配置的RabbitMQ初始化信息,咱们能够看到项目启动时会自动与配置的RabbitMQ进行关联:

[delegate=amqp://guest@127.0.0.1:5672/, localPort= 62107]
复制代码
运行测试用例

接下来咱们执行rabbitmq-provider项目的测试用例,来查看控制台的输出内容以下所示:

......
 回调id:e08f6d82-57bc-4c3f-9899-31c4b990c5be
消息发送成功
......
复制代码

已经能够正常的将消息发送到RabbitMQ服务端,而且接收到了回调通知,那么咱们的rabbitmq-consumer项目是否是已经执行了消息的消费呢?咱们打开rabbitmq-consumer控制台查看输出内容以下所示:

用户:2,完成了注册
复制代码

看以看到已经能够成功的执行UserConsumer消息监听类内的监听方法逻辑,到这里消息队列路由一对一的方式已经讲解完了。

总结

本章主要讲解了RabbitMQ在不一样操做系统下的安装方式,以及经过三个子模块形象的展现了消息的分布式处理,总体流程:rabbitmq-provider -> RabbitMQ服务端 -> rabbitmq-consumer,消息的转发是很是快的,RabbitMQ在收到消息后就会检索当前服务端是否存在该消息的消费者,若是存在将会立刻将消息转发。

本章源码已经上传到码云: SpringBoot配套源码地址:gitee.com/hengboy/spr… SpringCloud配套源码地址:gitee.com/hengboy/spr… SpringBoot相关系列文章请访问:目录:SpringBoot学习目录 QueryDSL相关系列文章请访问:QueryDSL通用查询框架学习目录 SpringDataJPA相关系列文章请访问:目录:SpringDataJPA学习目录 SpringBoot相关文章请访问:目录:SpringBoot学习目录,感谢阅读! 欢迎加入QQ技术交流群,共同进步。

QQ技术交流群
相关文章
相关标签/搜索