实战前言
RabbitMQ 做为目前应用至关普遍的消息中间件,在企业级应用、微服务应用中充当着重要的角色。特别是在一些典型的应用场景以及业务模块中具备重要的做用,好比业务服务模块解耦、异步通讯、高并发限流、超时业务、数据延迟处理等。html
RabbitMQ 官网拜读
首先,让咱们先拜读 RabbitMQ 官网的技术开发手册以及相关的 Features,感兴趣的朋友能够耐心的阅读其中的相关介绍,相信会有必定的收获,地址可见:www.rabbitmq.com/getstarted.…spring
在阅读该手册过程当中,咱们能够得知 RabbitMQ 其实核心就是围绕 “消息模型” 来展开的,其中就包括了组成消息模型的相关组件:生产者,消费者,队列,交换机,路由,消息等!而咱们在实战应用中,实际上也是牢牢围绕着 “消息模型” 来展开撸码的!数据库
下面,我就介绍一下这一消息模型的演变历程,固然,这一历程在 RabbitMQ 官网也是能够窥览获得的!
后端
上面几个图就已经概述了几个要点,并且,这几个要点的含义能够说是字如其名!bash
正如上图所展现的消息模型的演变,接下来咱们将以代码的形式实战各类典型的业务场景!并发
工欲善其事,必先利其器。咱们首先须要借助 IDEA 的 Spring Initializr 用 Maven 构建一个 SpringBoot 的项目,并引入 RabbitMQ、Mybatis、Log4j 等第三方框架的依赖。搭建完成以后,能够简单的写个 RabbitMQController 测试一下项目是否搭建是否成功(能够暂时用单模块方式构建)
app
紧接着,咱们进入实战的核心阶段,在项目或者服务中使用 RabbitMQ,其实无非是有几个核心要点要紧紧把握住,这几个核心要点在撸码过程当中须要“时刻的游荡在本身的脑海里”,其中包括:框架
基于这样的几个要点,咱们先小试牛刀一番,采用 RabbitMQ 实战异步写日志与异步发邮件。固然啦,在进行实战前,咱们须要安装好 RabbitMQ 及其后端控制台应用,并在项目中配置一下 RabbitMQ 的相关参数以及相关 Bean 组件。异步
1.RabbitMQ 安装完成后,打开后端控制台应用:http://localhost:15672/ guest guest 登陆,看到下图即表示安装成功ide
2.而后是项目配置文件层面的配置 application.properties
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.listener.concurrency=10
spring.rabbitmq.listener.max-concurrency=20
spring.rabbitmq.listener.prefetch=5
复制代码
其中,后面三个参数主要是用于“并发量的配置”,表示:并发消费者的初始化值,并发消费者的最大值,每一个消费者每次监听时可拉取处理的消息数量。
接下来,咱们须要以 Configuration 的方式配置 RabbitMQ 并以 Bean 的方式显示注入 RabbitMQ 在发送接收处理消息时相关 Bean 组件配置其中典型的配置是 RabbitTemplate 以及 SimpleRabbitListenerContainerFactory,前者是充当消息的发送组件,后者是用于管理RabbitMQ监听器
的容器工厂,其代码以下:
@Configuration
public class RabbitmqConfig {
private static final Logger log= LoggerFactory.getLogger(RabbitmqConfig.class);
@Autowired
private Environment env;
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
/**
* 单一消费者
* @return
*/
@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(1);
factory.setPrefetchCount(1);
factory.setTxSize(1);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
return factory;
}
/**
* 多个消费者
* @return
*/
@Bean(name = "multiListenerContainer")
public SimpleRabbitListenerContainerFactory multiListenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factoryConfigurer.configure(factory,connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.NONE);
factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency",int.class));
factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency",int.class));
factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch",int.class));
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(){
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
}
});
return rabbitTemplate;
}}
复制代码
在一些企业级系统中,咱们常常能够见到一个执行 function 一般是由许多子模块组成的,这个 function 在执行过程当中,须要 同步 的将其代码从头开始执行到尾,即执行流程是 module_A -> module_B -> module_C -> module_D
,典型的案例能够参见汇编或者 C 语言等面向过程语言开发的应用,如今的一些 JavaWeb 应用也存在着这样的写法。
而咱们知道,这个执行流程其实对于整个 function 来说是有必定的弊端的,主要有两点:
故而,咱们须要想办法进行优化,咱们须要将强关联的业务模块解耦以及某些模块之间实行异步通讯!下面就以两个场景来实战咱们的优化措施!
对于企业级应用系统或者微服务应用中,咱们常常须要追溯跟踪记录用户的操做日志,而这部分的业务在某种程度上是不该该跟主业务模块耦合在一块儿的,故而咱们须要将其单独抽出并以异步的方式与主模块进行异步通讯交互数据。
下面咱们就用 RabbitMQ 的 DirectExchange+RoutingKey 消息模型也实现“用户登陆成功记录日志”的场景。如前面所言,咱们须要在脑海里回荡着几个要点:
首先咱们须要在上面的 RabbitmqConfig 类中建立消息模型:包括 Queue、Exchange、RoutingKey 等的创建,代码以下:
上图中 env 获取的信息,咱们须要在 application.properties 进行配置,其中 mq.env=local
:
此时,咱们将整个项目/服务跑起来,并打开 RabbitMQ 后端控制台应用,便可看到队列以及交换机及其绑定已经创建好了,以下所示:
接下来,咱们须要在 Controller 中执行用户登陆逻辑,记录用户登陆日志,查询获取用户角色视野资源信息等,因为篇幅关系,在这里咱们重点要实现的是用MQ实现 “异步记录用户登陆日志” 的逻辑,即在这里 Controller 将充当“生产者”的角色,核心代码以下:
@RestController
public class UserController {
private static final Logger log= LoggerFactory.getLogger(HelloWorldController.class);
private static final String Prefix="user";
@Autowired
private ObjectMapper objectMapper;
@Autowired
private UserMapper userMapper;
@Autowired
private UserLogMapper userLogMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Environment env;
@RequestMapping(value = Prefix+"/login",method = RequestMethod.POST,consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
public BaseResponse login(@RequestParam("userName") String userName,@RequestParam("password") String password){
BaseResponse response=new BaseResponse(StatusCode.Success);
try {
//TODO:执行登陆逻辑
User user=userMapper.selectByUserNamePassword(userName,password);
if (user!=null){
//TODO:异步写用户日志
try {
UserLog userLog=new UserLog(userName,"Login","login",objectMapper.writeValueAsString(user));
userLog.setCreateTime(new Date());
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setExchange(env.getProperty("log.user.exchange.name"));
rabbitTemplate.setRoutingKey(env.getProperty("log.user.routing.key.name"));
Message message=MessageBuilder.withBody(objectMapper.writeValueAsBytes(userLog)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
message.getMessageProperties().setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, MessageProperties.CONTENT_TYPE_JSON);
rabbitTemplate.convertAndSend(message);
}catch (Exception e){
e.printStackTrace();
}
//TODO:塞权限数据-资源数据-视野数据
}else{
response=new BaseResponse(StatusCode.Fail);
}
}catch (Exception e){
e.printStackTrace();
}
return response;
}}
复制代码
在上面的“发送逻辑”代码中,其实也体现了咱们最开始介绍的演进中的几种消息模型,好比咱们是将消息发送到 Exchange 的而不是 Queue,消息是以二进制流的形式进行传输等等。当用 postman 请求到这个 controller 的方法时,咱们能够在 RabbitMQ 的后端控制台应用看到一条未确认的消息,经过 GetMessage 便可看到其中的详情,以下:
最后,咱们将开发消费端的业务代码,以下:
@Component
public class CommonMqListener {
private static final Logger log= LoggerFactory.getLogger(CommonMqListener.class);
@Autowired
private ObjectMapper objectMapper;
@Autowired
private UserLogMapper userLogMapper;
@Autowired
private MailService mailService;
/**
* 监听消费用户日志
* @param message
*/
@RabbitListener(queues = "${log.user.queue.name}",containerFactory = "singleListenerContainer")
public void consumeUserLogQueue(@Payload byte[] message){
try {
UserLog userLog=objectMapper.readValue(message, UserLog.class);
log.info("监听消费用户日志 监听到消息: {} ",userLog);
//TODO:记录日志入数据表
userLogMapper.insertSelective(userLog);
}catch (Exception e){
e.printStackTrace();
}
}
复制代码
将服务跑起来以后,咱们便可监听消费到上面 Queue 中的消息,即当前用户登陆的信息,并且,咱们也能够看到“记录用户登陆日志”的逻辑是由一条异于主业务线程的异步线程去执行的:
“异步记录用户操做日志”的案例我想足以用于诠释上面所讲的相关理论知识点了,在后续篇章中,因为篇幅限制,我将重点介绍其核心的业务逻辑!
发送邮件的场景,其实也是比较常见的,好比用户注册须要邮箱验证,用户异地登陆发送邮件通知等等,在这里我以 RabbitMQ 实现异步发送邮件。实现的步骤跟场景一几乎一致!
1. 消息模型的建立
2. 配置信息的建立
3. 生产端
4. 消费端
彩蛋:本博文就先介绍RabbitMQ实战的典型业务场景之业务服务模块异步解耦与通讯吧,下篇博文将继续讲解RabbitMQ实战在高并发系统的场景的应用记忆消息确认机制跟并发量的配置实战,相关源码数据库能够来这里下载