在上一章第四十一章: 基于SpringBoot & RabbitMQ完成DirectExchange分布式消息消费咱们讲解到了RabbitMQ
消息队列的DirectExchange
路由键消息单个消费者消费,源码请访问SpringBoot对应章节源码下载查看,消息队列目的是完成消息的分布式消费,那么咱们是否能够为一个Provider
建立并绑定多个Consumer
呢?node
基于SpringBoot
平台整合RabbitMQ
消息队列,完成一个Provider
绑定多个Consumer
进行消息消费。git
专题 | 专题名称 | 专题描述 |
---|---|---|
001 | Spring Boot 核心技术 | 讲解SpringBoot一些企业级层面的核心组件 |
002 | Spring Boot 核心技术章节源码 | Spring Boot 核心技术简书每一篇文章码云对应源码 |
003 | Spring Cloud 核心技术 | 对Spring Cloud核心技术全面讲解 |
004 | Spring Cloud 核心技术章节源码 | Spring Cloud 核心技术简书每一篇文章对应源码 |
005 | QueryDSL 核心技术 | 全面讲解QueryDSL核心技术以及基于SpringBoot整合SpringDataJPA |
006 | SpringDataJPA 核心技术 | 全面讲解SpringDataJPA核心技术 |
咱们基于上一章的项目进行升级,咱们先来将Chapter41
项目Copy
一份命名为Chapter42
。spring
基于咱们复制的Chapter42
项目,建立一个Module
子项目命名为rabbitmq-consumer-node2
,用于消费者的第二个节点,接下来咱们为rabbitmq-consumer-node2
项目建立一个入口启动类RabbitmqConsumerNode2Application
,代码以下所示:数据库
/**
* 消息队列消息消费者节点2入口
* ========================
*
* @author 恒宇少年
* Created with IntelliJ IDEA.
* Date:2017/11/26
* Time:15:15
* 码云:http://git.oschina.net/jnyqy
* ========================
*/
@SpringBootApplication
public class RabbitmqConsumerNode2Application
{
static Logger logger = LoggerFactory.getLogger(RabbitmqConsumerNode2Application.class);
/**
* rabbitmq消费者启动入口
* @param args
*/
public static void main(String[] args)
{
SpringApplication.run(RabbitmqConsumerNode2Application.class,args);
logger.info("【【【【【消息队列-消息消费者节点2启动成功.】】】】】");
}
}
复制代码
为了区分具体的消费者节点,咱们在项目启动成功后打印了相关的日志信息,下面咱们来编写application.properties
配置文件信息,能够直接从rabbitmq-consumer
子项目内复制内容,复制后须要修改server.port
以及spring.application.name
,以下所示:bash
#端口号
server.port=1112
#项目名称
spring.application.name=rabbitmq-consumer-node2
#rabbitmq相关配置
#用户名
spring.rabbitmq.username=guest
#密码
spring.rabbitmq.password=guest
#服务器ip
spring.rabbitmq.host=localhost
#虚拟空间地址
spring.rabbitmq.virtual-host=/
#端口号
spring.rabbitmq.port=5672
#配置发布消息确认回调
spring.rabbitmq.publisher-confirms=true
复制代码
由于咱们是本地测试项目,因此须要修改对应的端口号,防止端口被占用。服务器
复制rabbitmq-consumer
子项目内的UserConsumer
类到rabbitmq-consumer-node2
子项目对应的package
内,以下所示:app
/**
* 用户注册消息消费者
* 分布式节点2
* ========================
*
* @author 恒宇少年
* Created with IntelliJ IDEA.
* Date:2017/11/26
* Time:15:20
* 码云:http://git.oschina.net/jnyqy
* ========================
*/
@Component
@RabbitListener(queues = "user.register.queue")
public class UserConsumer {
/**
* logback
*/
private Logger logger = LoggerFactory.getLogger(UserConsumer.class);
@RabbitHandler
public void execute(Long userId)
{
logger.info("用户注册消费者【节点2】获取消息,用户编号:{}",userId);
//...//自行业务逻辑处理
}
}
复制代码
为了区分具体的消费者输出内容,咱们在上面UserConsumer
消费者消费方法内打印了相关日志输出,下面咱们一样把rabbitmq-consumer
子项目内UserConsumer
的消费方法写入相关日志,以下所示:框架
@RabbitHandler
public void execute(Long userId)
{
logger.info("用户注册消费者【节点1】获取消息,用户编号:{}",userId);
//...//自行业务逻辑处理
}
复制代码
到目前为止咱们的多节点
RabbitMQ
消费者已经编写完成,下面咱们来模拟多个用户注册的场景,来查看用户注册消息是否被转发并惟一性的分配给不一样的消费者节点。分布式
咱们打开上一章编写的UserTester
测试类,为了模拟多用户注册请求,咱们对应的建立一个内部线程类BatchRabbitTester
,在线程类内编写注册请求代码,以下所示:ide
/**
* 批量添加用户线程测试类
* run方法发送用户注册请求
*/
class BatchRabbitTester implements Runnable
{
private int index;
public BatchRabbitTester() { }
public BatchRabbitTester(int index) {
this.index = index;
}
@Override
public void run() {
try {
mockMvc.perform(MockMvcRequestBuilders.post("/user/save")
.param("userName","yuqiyu" + index)
.param("name","恒宇少年" + index)
.param("age","23")
)
.andDo(MockMvcResultHandlers.log())
.andReturn();
}catch (Exception e){
e.printStackTrace();
}
}
}
复制代码
为了区分每个注册信息是否都已经写入到数据库,咱们为BatchRabbitTester
添加了一个有参的构造方法,将for循环的i
值对应的传递为index
的值。下面咱们来编写对应的批量注册的测试方法,以下所示:
/**
* 测试用户批量添加
* @throws Exception
*/
@Test
public void testBatchUserAdd() throws Exception
{
for (int i = 0 ; i < 10 ; i++) {
//建立用户注册线程
Thread thread = new Thread(new BatchRabbitTester(i));
//启动线程
thread.start();
}
//等待线程执行完成
Thread.sleep(2000);
}
复制代码
咱们循环10次来测试用户注册请求,每一次都会建立一个线程去完成发送注册请求逻辑,在方法底部添加了sleep
方法,目的是为了阻塞测试用例的结束,由于咱们测试用户完成方法后会自动中止,不会去等待其余线程执行完成,因此这里咱们阻塞测试主线程来完成发送注册线程请求逻辑。
咱们在执行测试批量注册用户消息以前,先把rabbitmq-consumer
、rabbitmq-consumer-node2
两个消费者子项目启动,项目启动完成后能够看到控制台输出启动成功日志,以下所示:
rabbitmq-consumer:
2017-12-10 17:10:36.961 INFO 15644 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 1111 (http)
2017-12-10 17:10:36.964 INFO 15644 --- [ main] c.h.r.c.RabbitmqConsumerApplication : Started RabbitmqConsumerApplication in 2.405 seconds (JVM running for 3.39)
2017-12-10 17:10:36.964 INFO 15644 --- [ main] c.h.r.c.RabbitmqConsumerApplication : 【【【【【消息队列-消息消费者启动成功.】】】】】
rabbitmq-consumer-node2:
2017-12-10 17:11:31.679 INFO 13812 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 1112 (http)
2017-12-10 17:11:31.682 INFO 13812 --- [ main] c.h.c.RabbitmqConsumerNode2Application : Started RabbitmqConsumerNode2Application in 2.419 seconds (JVM running for 3.129)
2017-12-10 17:11:31.682 INFO 13812 --- [ main] c.h.c.RabbitmqConsumerNode2Application : 【【【【【消息队列-消息消费者节点2启动成功.】】】】】
复制代码
接下来咱们来运行testBatchUserAdd
方法,查看测试控制台输出内容以下所示:
2017-12-10 17:15:02.619 INFO 14456 --- [ Thread-3] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#528df369:0/SimpleConnection@39b6ba57 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 60936]
回调id:194b5e67-6913-474a-b2ac-6e938e1e85e8
消息发送成功
回调id:e88ce59c-3eb9-433c-9e25-9429e7076fbe
消息发送成功
回调id:3e5b8382-6f63-450f-a641-e3d8eee255b2
消息发送成功
回调id:39103357-6c80-4561-acb7-79b32d6171c9
消息发送成功
回调id:9795d227-b54e-4cde-9993-a5b880fcfe39
消息发送成功
回调id:e9b8b828-f069-455f-a366-380bf10a5909
消息发送成功
回调id:6b5b4a9c-5e7f-4c53-9eef-98e06f8be867
消息发送成功
回调id:619a42f3-cb94-4434-9c75-1e28a04ce350
消息发送成功
回调id:6b720465-b64a-4ed9-9d8c-3e4dafa4faed
消息发送成功
回调id:b4296f7f-98cc-423b-a4ef-0fc31d22cb08
消息发送成功
复制代码
能够看到确实已经成功的发送了10条用户注册消息到RabbitMQ
服务端,那么是否已经正确的成功的将消息转发到消费者监听方法了呢?咱们来打开rabbitmq-consumer
子项目的启动控制台查看日志输出内容以下所示:
2017-12-10 17:10:36.964 INFO 15644 --- [ main] c.h.r.c.RabbitmqConsumerApplication : 【【【【【消息队列-消息消费者启动成功.】】】】】
2017-12-10 17:15:02.695 INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer : 用户注册消费者【节点1】获取消息,用户编号:20
2017-12-10 17:15:02.718 INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer : 用户注册消费者【节点1】获取消息,用户编号:22
2017-12-10 17:15:02.726 INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer : 用户注册消费者【节点1】获取消息,用户编号:26
2017-12-10 17:15:02.729 INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer : 用户注册消费者【节点1】获取消息,用户编号:21
2017-12-10 17:15:02.789 INFO 15644 --- [cTaskExecutor-1] c.h.rabbitmq.consumer.user.UserConsumer : 用户注册消费者【节点1】获取消息,用户编号:28
复制代码
能够看到成功的接受了5
条对应用户注册消息内容,不过这里具体接受的条数并非固定的,这也是RabbitMQ
消息转发权重内部问题。 下面咱们打开rabbitmq-consumer-node2
子项目控制台查看日志输出内容以下所示:
2017-12-10 17:11:31.682 INFO 13812 --- [ main] c.h.c.RabbitmqConsumerNode2Application : 【【【【【消息队列-消息消费者节点2启动成功.】】】】】
2017-12-10 17:15:02.708 INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer : 用户注册消费者【节点2】获取消息,用户编号:25
2017-12-10 17:15:02.717 INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer : 用户注册消费者【节点2】获取消息,用户编号:23
2017-12-10 17:15:02.719 INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer : 用户注册消费者【节点2】获取消息,用户编号:24
2017-12-10 17:15:02.727 INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer : 用户注册消费者【节点2】获取消息,用户编号:27
2017-12-10 17:15:02.790 INFO 13812 --- [cTaskExecutor-1] com.hengyu.consumer.user.UserConsumer : 用户注册消费者【节点2】获取消息,用户编号:29
复制代码
一样得到了5
条用户注册消息,不过并无任何规律可言,编号也不是顺序的。
因此多节点时消息具体分发到哪一个节点并非固定的,彻底是
RabbitMQ
分发机制来控制。
本章完成了基于SpringBoot
平台整合RabbitMQ
单个Provider
对应绑定多个Consumer
来进行多节点分布式消费者消息消费,实际生产项目部署时彻底能够将消费节点分开到不一样的服务器,只要消费节点能够访问到RabbitMQ
服务端,能够正常通信,就能够完成消息消费。
本章源码已经上传到码云: SpringBoot配套源码地址:gitee.com/hengboy/spr… SpringCloud配套源码地址:gitee.com/hengboy/spr… SpringBoot相关系列文章请访问:目录:SpringBoot学习目录 QueryDSL相关系列文章请访问:QueryDSL通用查询框架学习目录 SpringDataJPA相关系列文章请访问:目录:SpringDataJPA学习目录,感谢阅读! 欢迎加入QQ技术交流群,共同进步。