第四十二章: 基于SpringBoot & RabbitMQ完成DirectExchange分布式消息多消费者消费

在上一章第四十一章: 基于SpringBoot & RabbitMQ完成DirectExchange分布式消息消费咱们讲解到了RabbitMQ消息队列的DirectExchange路由键消息单个消费者消费,源码请访问SpringBoot对应章节源码下载查看,消息队列目的是完成消息的分布式消费,那么咱们是否能够为一个Provider建立并绑定多个Consumer呢?node

本章目标

基于SpringBoot平台整合RabbitMQ消息队列,完成一个Provider绑定多个Consumer进行消息消费。git

SpringBoot 企业级核心技术学习专题

专题 专题名称 专题描述
001 Spring Boot 核心技术 讲解SpringBoot一些企业级层面的核心组件
002 Spring Boot 核心技术章节源码 Spring Boot 核心技术简书每一篇文章码云对应源码
003 Spring Cloud 核心技术 对Spring Cloud核心技术全面讲解
004 Spring Cloud 核心技术章节源码 Spring Cloud 核心技术简书每一篇文章对应源码
005 QueryDSL 核心技术 全面讲解QueryDSL核心技术以及基于SpringBoot整合SpringDataJPA
006 SpringDataJPA 核心技术 全面讲解SpringDataJPA核心技术

构建项目

咱们基于上一章的项目进行升级,咱们先来将Chapter41项目Copy一份命名为Chapter42spring

构建 rabbitmq-consumer-node2

基于咱们复制的Chapter42项目,建立一个Module子项目命名为rabbitmq-consumer-node2,用于消费者的第二个节点,接下来咱们为rabbitmq-consumer-node2项目建立一个入口启动类RabbitmqConsumerNode2Application,代码以下所示:数据库

/**
 * 消息队列消息消费者节点2入口
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:15:15
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@SpringBootApplication
public class RabbitmqConsumerNode2Application
{
    static Logger logger = LoggerFactory.getLogger(RabbitmqConsumerNode2Application.class);

    /**
     * rabbitmq消费者启动入口
     * @param args
     */
    public static void main(String[] args)
    {
        SpringApplication.run(RabbitmqConsumerNode2Application.class,args);

        logger.info("【【【【【消息队列-消息消费者节点2启动成功.】】】】】");
    }
}
复制代码

为了区分具体的消费者节点,咱们在项目启动成功后打印了相关的日志信息,下面咱们来编写application.properties配置文件信息,能够直接从rabbitmq-consumer子项目内复制内容,复制后须要修改server.port以及spring.application.name,以下所示:bash

#端口号
server.port=1112
#项目名称
spring.application.name=rabbitmq-consumer-node2


#rabbitmq相关配置
#用户名
spring.rabbitmq.username=guest
#密码
spring.rabbitmq.password=guest
#服务器ip
spring.rabbitmq.host=localhost
#虚拟空间地址
spring.rabbitmq.virtual-host=/
#端口号
spring.rabbitmq.port=5672
#配置发布消息确认回调
spring.rabbitmq.publisher-confirms=true
复制代码

由于咱们是本地测试项目,因此须要修改对应的端口号,防止端口被占用。服务器

建立用户注册消费者

复制rabbitmq-consumer子项目内的UserConsumer类到rabbitmq-consumer-node2子项目对应的package内,以下所示:app

/**
 * 用户注册消息消费者
 * 分布式节点2
 * ========================
 *
 * @author 恒宇少年
 * Created with IntelliJ IDEA.
 * Date:2017/11/26
 * Time:15:20
 * 码云:http://git.oschina.net/jnyqy
 * ========================
 */
@Component
@RabbitListener(queues = "user.register.queue")
public class UserConsumer {

    /**
     * logback
     */
    private Logger logger = LoggerFactory.getLogger(UserConsumer.class);

    @RabbitHandler
    public void execute(Long userId)
    {
        logger.info("用户注册消费者【节点2】获取消息,用户编号:{}",userId);

        //...//自行业务逻辑处理
    }
}
复制代码

为了区分具体的消费者输出内容,咱们在上面UserConsumer消费者消费方法内打印了相关日志输出,下面咱们一样把rabbitmq-consumer子项目内UserConsumer的消费方法写入相关日志,以下所示:框架

@RabbitHandler
    public void execute(Long userId)
    {
        logger.info("用户注册消费者【节点1】获取消息,用户编号:{}",userId);

        //...//自行业务逻辑处理
    }
复制代码

到目前为止咱们的多节点RabbitMQ消费者已经编写完成,下面咱们来模拟多个用户注册的场景,来查看用户注册消息是否被转发并惟一性的分配给不一样的消费者节点。分布式

运行测试

咱们打开上一章编写的UserTester测试类,为了模拟多用户注册请求,咱们对应的建立一个内部线程类BatchRabbitTester,在线程类内编写注册请求代码,以下所示:ide

/**
     * 批量添加用户线程测试类
     * run方法发送用户注册请求
     */
    class BatchRabbitTester implements Runnable
    {
        private int index;
        public BatchRabbitTester() { }

        public BatchRabbitTester(int index) {
            this.index = index;
        }


        @Override
        public void run() {
            try {
                mockMvc.perform(MockMvcRequestBuilders.post("/user/save")
                        .param("userName","yuqiyu" + index)
                        .param("name","恒宇少年" + index)
                        .param("age","23")
                )
                        .andDo(MockMvcResultHandlers.log())
                        .andReturn();
            }catch (Exception e){
                e.printStackTrace();
            }

        }
    }
复制代码

为了区分每个注册信息是否都已经写入到数据库,咱们为BatchRabbitTester添加了一个有参的构造方法,将for循环的i值对应的传递为index的值。下面咱们来编写对应的批量注册的测试方法,以下所示:

/**
     * 测试用户批量添加
     * @throws Exception
     */
    @Test
    public void testBatchUserAdd() throws Exception
    {
        for (int i = 0 ; i < 10 ; i++) {
            //建立用户注册线程
            Thread thread = new Thread(new BatchRabbitTester(i));
            //启动线程
            thread.start();
        }
        //等待线程执行完成
        Thread.sleep(2000);
    }
复制代码

咱们循环10次来测试用户注册请求,每一次都会建立一个线程去完成发送注册请求逻辑,在方法底部添加了sleep方法,目的是为了阻塞测试用例的结束,由于咱们测试用户完成方法后会自动中止,不会去等待其余线程执行完成,因此这里咱们阻塞测试主线程来完成发送注册线程请求逻辑。

执行批量注册测试方法

咱们在执行测试批量注册用户消息以前,先把rabbitmq-consumerrabbitmq-consumer-node2两个消费者子项目启动,项目启动完成后能够看到控制台输出启动成功日志,以下所示:

rabbitmq-consumer:
2017-12-10 17:10:36.961  INFO 15644 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 1111 (http)
2017-12-10 17:10:36.964  INFO 15644 --- [           main] c.h.r.c.RabbitmqConsumerApplication      : Started RabbitmqConsumerApplication in 2.405 seconds (JVM running for 3.39)
2017-12-10 17:10:36.964  INFO 15644 --- [           main] c.h.r.c.RabbitmqConsumerApplication      : 【【【【【消息队列-消息消费者启动成功.】】】】】

rabbitmq-consumer-node2:
2017-12-10 17:11:31.679  INFO 13812 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 1112 (http)
2017-12-10 17:11:31.682  INFO 13812 --- [           main] c.h.c.RabbitmqConsumerNode2Application   : Started RabbitmqConsumerNode2Application in 2.419 seconds (JVM running for 3.129)
2017-12-10 17:11:31.682  INFO 13812 --- [           main] c.h.c.RabbitmqConsumerNode2Application   : 【【【【【消息队列-消息消费者节点2启动成功.】】】】】

复制代码

接下来咱们来运行testBatchUserAdd方法,查看测试控制台输出内容以下所示:

2017-12-10 17:15:02.619  INFO 14456 --- [       Thread-3] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#528df369:0/SimpleConnection@39b6ba57 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 60936]
 回调id:194b5e67-6913-474a-b2ac-6e938e1e85e8
消息发送成功
 回调id:e88ce59c-3eb9-433c-9e25-9429e7076fbe
消息发送成功
 回调id:3e5b8382-6f63-450f-a641-e3d8eee255b2
消息发送成功
 回调id:39103357-6c80-4561-acb7-79b32d6171c9
消息发送成功
 回调id:9795d227-b54e-4cde-9993-a5b880fcfe39
消息发送成功
 回调id:e9b8b828-f069-455f-a366-380bf10a5909
消息发送成功
 回调id:6b5b4a9c-5e7f-4c53-9eef-98e06f8be867
消息发送成功
 回调id:619a42f3-cb94-4434-9c75-1e28a04ce350
消息发送成功
 回调id:6b720465-b64a-4ed9-9d8c-3e4dafa4faed
消息发送成功
 回调id:b4296f7f-98cc-423b-a4ef-0fc31d22cb08
消息发送成功
复制代码

能够看到确实已经成功的发送了10条用户注册消息到RabbitMQ服务端,那么是否已经正确的成功的将消息转发到消费者监听方法了呢?咱们来打开rabbitmq-consumer子项目的启动控制台查看日志输出内容以下所示:

2017-12-10 17:10:36.964  INFO 15644 --- [           main] c.h.r.c.RabbitmqConsumerApplication      : 【【【【【消息队列-消息消费者启动成功.】】】】】
2017-12-10 17:15:02.695  INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer  : 用户注册消费者【节点1】获取消息,用户编号:20
2017-12-10 17:15:02.718  INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer  : 用户注册消费者【节点1】获取消息,用户编号:22
2017-12-10 17:15:02.726  INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer  : 用户注册消费者【节点1】获取消息,用户编号:26
2017-12-10 17:15:02.729  INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer  : 用户注册消费者【节点1】获取消息,用户编号:21
2017-12-10 17:15:02.789  INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer  : 用户注册消费者【节点1】获取消息,用户编号:28
复制代码

能够看到成功的接受了5条对应用户注册消息内容,不过这里具体接受的条数并非固定的,这也是RabbitMQ消息转发权重内部问题。 下面咱们打开rabbitmq-consumer-node2子项目控制台查看日志输出内容以下所示:

2017-12-10 17:11:31.682  INFO 13812 --- [           main] c.h.c.RabbitmqConsumerNode2Application   : 【【【【【消息队列-消息消费者节点2启动成功.】】】】】
2017-12-10 17:15:02.708  INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer    : 用户注册消费者【节点2】获取消息,用户编号:25
2017-12-10 17:15:02.717  INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer    : 用户注册消费者【节点2】获取消息,用户编号:23
2017-12-10 17:15:02.719  INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer    : 用户注册消费者【节点2】获取消息,用户编号:24
2017-12-10 17:15:02.727  INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer    : 用户注册消费者【节点2】获取消息,用户编号:27
2017-12-10 17:15:02.790  INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer    : 用户注册消费者【节点2】获取消息,用户编号:29
复制代码

一样得到了5条用户注册消息,不过并无任何规律可言,编号也不是顺序的。

因此多节点时消息具体分发到哪一个节点并非固定的,彻底是RabbitMQ分发机制来控制。

总结

本章完成了基于SpringBoot平台整合RabbitMQ单个Provider对应绑定多个Consumer来进行多节点分布式消费者消息消费,实际生产项目部署时彻底能够将消费节点分开到不一样的服务器,只要消费节点能够访问到RabbitMQ服务端,能够正常通信,就能够完成消息消费。

本章源码已经上传到码云: SpringBoot配套源码地址:gitee.com/hengboy/spr… SpringCloud配套源码地址:gitee.com/hengboy/spr… SpringBoot相关系列文章请访问:目录:SpringBoot学习目录 QueryDSL相关系列文章请访问:QueryDSL通用查询框架学习目录 SpringDataJPA相关系列文章请访问:目录:SpringDataJPA学习目录,感谢阅读! 欢迎加入QQ技术交流群,共同进步。

QQ技术交流群
相关文章
相关标签/搜索