探索解析微服务下的RabbitMQ

概览

本文主要介绍如何使用RabbitMQ消息代理来实现分布式系统之间的通讯,从而促进微服务的松耦合。spring

RabbitMQ,也被称为开源消息代理,它支持多种消息协议,而且能够部署在分布式系统上。它轻量级,便于部署应用程序。它主要充当一个队列,其中输入的消息能够首先被操做。RabbitMQ能够在许多操做系统和云环境中运行,并为大多数流行语言提供了普遍的开发工具。它是生产者-消费者模式,生产者发出信息,消费者消费信息。RabbitMQ的主要特色以下:sql

  1. 异步消息
  2. 分布式部署
  3. 管理和监控
  4. 企业和云计算

安装

对于RabbitMQ,首先须要在系统中安装ErLang,由于RabbitMQ是用ErLang语言编写的。安装Erlang以后,你能够经过下面的介绍从它的官网下载最新版本的 RabbitMQ 。bash

在微服务中使用RabbitMQ

在您的微服务体系结构中,RabbitMQ是实现消息队列的最简单的免费的可用选项之一。这些队列模式有助于解耦各个微服务之间的通讯来增长应用程序的弹性。咱们能够将这些队列用于各类目的,好比核心微服务之间的交互、微服务的解耦、实现故障转移机制,以及经过消息代理发送电子邮件通知。服务器

不管在哪里,只要有两个或两个以上的核心模块须要相互通讯,咱们就不该该进行直接的HTTP调用,由于它们会使核心层产生紧耦合,而且当每一个核心模块有更多实例时将很难管理。并且每当服务宕机时,HTTP调用模式就会失败,由于在服务重启以后,咱们将没法跟踪旧的HTTP请求调用。这就产生了对RabbitMQ的需求。架构

在微服务中设置RabbitMQ

在微服务架构中,为了演示,咱们将使用一个能够经过任何核心微服务发送电子邮件通知的示例模式。在这种模式下,咱们将有一个能够存在任何核心微服务的生产者,它将生成电子邮件内容并将其发送到队列。而后,这个电子邮件内容由老是在等待队列中新消息的消费者来处理。并发

请注意,因为正在使用Spring Boot构建微服务,所以咱们将为Spring提供配置。app

1)生产者:这一层负责生成电子邮件内容,并将此内容发送给RabbitMQ中的消息代理。框架

a)在properties文件中,咱们须要配置队列名和交换类型,以及安装RabbitMQ服务器的主机和端口。异步

queue.name=messagequeue
fanout.exchange=messagequeue-exchange
spring.rabbitmq.host: localhost
spring.rabbitmq.port: 5672
spring.rabbitmq.username: guest
spring.rabbitmq.password: guest
复制代码

b)咱们须要建立一个配置类,它将使用队列名和交换类型将队列绑定到微服务模块。分布式

@Configuration
public class RabbitConfiguration {
 @Value("${fanout.exchange}")
 private String fanoutExchange;
 @Value("${queue.name}")
 private String queueName;
 @Bean
 Queue queue() {
 return new Queue(queueName, true);
 }
 @Bean
 FanoutExchange exchange() {
 return new FanoutExchange(fanoutExchange);
 }
 @Bean
 Binding binding(Queue queue, FanoutExchange exchange) {
 return BindingBuilder.bind(queue).to(exchange);
 }
}
复制代码

c)最后,咱们须要一个工具类,它将使用Spring框架提供的RabbitTemplate将实际的电子邮件内容发送到队列中。

@Component
public class QueueProducer {
 protected Logger logger = LoggerFactory.getLogger(getClass());
 @Value("${fanout.exchange}")
 private String fanoutExchange;
 private final RabbitTemplate rabbitTemplate;
 @Autowired
 public QueueProducer(RabbitTemplate rabbitTemplate) {
 super();
 this.rabbitTemplate = rabbitTemplate;
 }
 public void produce(NotificationRequestDTO notificationDTO) throws Exception {
 logger.info("Storing notification...");
 rabbitTemplate.setExchange(fanoutExchange);
 rabbitTemplate.convertAndSend(new ObjectMapper().writeValueAsString(notificationDTO));
 logger.info("Notification stored in queue sucessfully");
 }
}
复制代码

d)而后,您能够在模块的任何地方调用这个produce方法。

{
 queueProducer.produce(notificationDTO);
}
复制代码

2) 消费者: 这一层负责使用FIFO方法从RabbitMQ消息代理中消费消息,而后执行与电子邮件相关的操做。

a)在这个properties文件中,咱们须要配置队列名和交换类型,以及安装RabbitMQ服务器的主机和端口。

queue.name=messagequeue
fanout.exchange=messagequeue-exchange
spring.rabbitmq.host: localhost
spring.rabbitmq.port: 5672
spring.rabbitmq.username: guest
spring.rabbitmq.password: guest
复制代码

b)咱们须要建立一个配置类,它将使用队列名和交换类型将队列绑定到微服务模块。此外,在消费者的RabbitMQ配置中,咱们须要建立一个充当消费者的MessageListenerAdapter bean,它始终侦遵从队列中传入的消息。这个MessageListenerAdapter将有一个带有消费者工具类和defaultListenerMethod的有参构造函数,在这里咱们能够指定与电子邮件相关的操做。

@Configuration
public class RabbitConfiguration {
 private static final String LISTENER_METHOD = "receiveMessage";
 @Value("${queue.name}")
 private String queueName;
 @Value("${fanout.exchange}")
 private String fanoutExchange;
 @Bean
 Queue queue() {
 return new Queue(queueName, true);
 }
 @Bean
 FanoutExchange exchange() {
 return new FanoutExchange(fanoutExchange);
 }
 @Bean
 Binding binding(Queue queue, FanoutExchange exchange) {
 return BindingBuilder.bind(queue).to(exchange);
 }
 @Bean
 SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
 MessageListenerAdapter listenerAdapter) {
 SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
 container.setConnectionFactory(connectionFactory);
 container.setQueueNames(queueName);
 container.setMessageListener(listenerAdapter);
 return container;
 }
 @Bean
 MessageListenerAdapter listenerAdapter(QueueConsumer consumer) {
 return new MessageListenerAdapter(consumer, LISTENER_METHOD);
 }
}
复制代码

c)而后,须要建立具备特定消息侦听器方法的 QueueConsumer类,在该类中咱们能够进行实际发送电子邮件的操做。

@Component
public class QueueConsumer {
 @Autowired
 MailServiceImpl mailServiceImpl;
 protected Logger logger = LoggerFactory.getLogger(getClass());
 public void receiveMessage(String message) {
 logger.info("Received (String) " + message);
 processMessage(message);
 }
 public void receiveMessage(byte[] message) {
 String strMessage = new String(message);
 logger.info("Received (No String) " + strMessage);
 processMessage(strMessage);
 }
 private void processMessage(String message) {
 try {
 MailDTO mailDTO = new ObjectMapper().readValue(message, MailDTO.class);
 ValidationUtil.validateMailDTO(mailDTO);
 mailServiceImpl.sendMail(mailDTO, null);
 } catch (JsonParseException e) {
 logger.warn("Bad JSON in message: " + message);
 } catch (JsonMappingException e) {
 logger.warn("cannot map JSON to NotificationRequest: " + message);
 } catch (Exception e) {
 logger.error(e.getMessage());
 }
 }
}
复制代码

总结

经过使用RabbitMQ,您能够避免服务之间直接的HTTP调用,并消除核心微服务的紧密耦合。这将帮助您在更高级别上实现微服务的可伸缩性,并在微服务之间添加故障转移机制。

欢迎工做一到五年的Java工程师朋友们加入Java架构开发:878249276,群内提供免费的Java架构学习资料(里面有高可用、高并发、高性能及分布式、Jvm性能调优、Spring源码,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多个知识点的架构资料)合理利用本身每一分每一秒的时间来学习提高本身,不要再用"没有时间“来掩饰本身思想上的懒惰!趁年轻,使劲拼,给将来的本身一个交代!

相关文章
相关标签/搜索