消息队列做为分布式系统中重要的组件,能够有效解决应用耦合,异步消息,流量削锋等系列问题,有利于实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等,各类消息队列也都各有特色,好比Kafka提供高性能、高吞吐量,但可靠性有所欠缺,因此比较适合像日志处理这类对性能要求高但对可靠性要求没那么严格的业务,再好比RabbitMQ支持了各类协议,实现较为臃肿,性能和吞吐量都通常,但却提供了很好的可靠性,比较适合像银行金融一类对可靠性要求较高的业务。html
如下简单介绍几个消息队列在实际应用中的使用场景(如下场景资料引用自网络)。前端
场景说明:用户注册后,须要发注册邮件和注册短信。传统的作法有两种 1.串行的方式;2.并行方式java
(1)串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务所有完成后,返回给客户端git
(2)并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差异是,并行的方式能够提升处理的时间web
假设三个业务节点每一个使用50毫秒钟,不考虑网络等其余开销,则串行方式的时间是150毫秒,并行的时间多是100毫秒。ajax
由于CPU在单位时间内处理的请求数是必定的,假设CPU1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150)。并行方式处理的请求量是10次(1000/100)spring
小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?数据库
引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构以下:apache
按照以上约定,用户的响应时间至关因而注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,所以写入消息队列的速度很快,基本能够忽略,所以用户的响应时间多是50毫秒。所以架构改变后,系统的吞吐量提升到每秒20 QPS。比串行提升了3倍,比并行提升了两倍api
场景说明:用户下单后,订单系统须要通知库存系统。传统的作法是,订单系统调用库存系统的接口。以下图
传统模式的缺点:
假如库存系统没法访问,则订单减库存将失败,从而致使订单失败
订单系统与库存系统耦合
如何解决以上问题呢?引入应用消息队列后的方案,以下图:
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操做
假如:在下单时库存系统不能正常使用。也不影响正常下单,由于下单后,订单系统写入消息队列就再也不关心其余的后续操做了。实现订单系统与库存系统的应用解耦
流量削锋也是消息队列中的经常使用场景,通常在秒杀或团抢活动中使用普遍
应用场景:秒杀活动,通常会由于流量过大,致使流量暴增,应用挂掉。为解决这个问题,通常须要在应用前端加入消息队列。
能够控制活动的人数
能够缓解短期内高流量压垮应用
用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面
秒杀业务根据消息队列中的请求信息,再作后续处理
日志处理是指将消息队列用在日志处理中,好比Kafka的应用,解决大量日志传输的问题。架构简化以下
日志采集客户端,负责日志数据采集,定时写受写入Kafka队列
Kafka消息队列,负责日志数据的接收,存储和转发
日志处理应用:订阅并消费kafka队列中的日志数据
如下是新浪kafka日志处理应用案例:
(1)Kafka:接收用户日志的消息队列
(2)Logstash:作日志解析,统一成JSON输出给Elasticsearch
(3)Elasticsearch:实时日志分析服务的核心技术,一个schemaless,实时的数据存储服务,经过index组织数据,兼具强大的搜索和统计功能
(4)Kibana:基于Elasticsearch的数据可视化组件,超强的数据可视化能力是众多公司选择ELK stack的重要缘由
消息通信是指,消息队列通常都内置了高效的通讯机制,所以也能够用在纯的消息通信。好比实现点对点消息队列,或者聊天室等
点对点通信:
客户端A和客户端B使用同一队列,进行消息通信。
聊天室通信:
客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。实现相似聊天室效果。
以上实际是消息队列的两种消息模式,点对点或发布订阅模式。模型为示意图,供参考。
ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。Connection是RabbitMQ的socket连接,它封装了socket协议相关部分逻辑。ConnectionFactory为Connection的制造工厂。 Channel是咱们与RabbitMQ打交道的最重要的一个接口,咱们大部分的业务操做是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。
Queue(队列)是RabbitMQ的内部对象,用于存储消息。
RabbitMQ中的消息都只能存储在Queue中,生产者(下图中的P)生产消息并最终投递到Queue中,消费者(下图中的C)能够从Queue中获取消息并消费。
生产者Send Message “A”被传送到Queue中,消费者发现消息队列Queue中有订阅的消息,就会将这条消息A读取出来进行一些列的业务操做。这里只是一个消费正对应一个队列Queue,也能够多个消费者订阅同一个队列Queue,固然这里就会将Queue里面的消息平分给其余的消费者,可是会存在一个一个问题就是若是每一个消息的处理时间不一样,就会致使某些消费者一直在忙碌中,而有的消费者处理完了消息后一直处于空闲状态,由于前面已经说起到了Queue会平分这些消息给相应的消费者。这里咱们就可使用prefetchCount来限制每次发送给消费者消息的个数。详情见下图所示:
这里的prefetchCount=1是指每次从Queue中发送一条消息来。等消费者处理完这条消息后Queue会再发送一条消息给消费者。
在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其余意外)的状况,这种状况下就可能会致使消息丢失。为了不这种状况发生,咱们能够要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;若是RabbitMQ没有收到回执并检测到消费者的RabbitMQ链接断开,则RabbitMQ会将该消息发送给其余消费者(若是存在多个消费者)进行处理。这里不存在timeout概念,一个消费者处理消息时间再长也不会致使该消息被发送给其余消费者,除非它的RabbitMQ链接断开。 这里会产生另一个问题,若是咱们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会致使严重的bug——Queue中堆积的消息会愈来愈多;消费者重启后会重复消费这些消息并重复执行业务逻辑…
另外pub message是没有ack的。
若是咱们但愿即便在RabbitMQ服务重启的状况下,也不会丢失消息,咱们能够将Queue与Message都设置为可持久化的(durable),这样能够保证绝大部分状况下咱们的RabbitMQ消息不会丢失。但依然解决不了小几率丢失事件的发生(好比RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),若是咱们须要对这种小几率事件也要管理起来,那么咱们要用到事务。因为这里仅为RabbitMQ的简单介绍,因此这里将不讲解RabbitMQ相关的事务。
首先明确一点就是生产者产生的消息并非直接发送给消息队列Queue的,而是要通过Exchange(交换器),由Exchange再将消息路由到一个或多个Queue,固然这里还会对不符合路由规则的消息进行丢弃掉,这里指的是后续要谈到的Exchange Type。那么Exchange是怎样将消息准确的推送到对应的Queue的呢?那么这里的功劳最大的当属Binding,RabbitMQ是经过Binding将Exchange和Queue连接在一块儿,这样Exchange就知道如何将消息准确的推送到Queue中去。简单示意图以下所示:
在绑定(Binding)Exchange和Queue的同时,通常会指定一个Binding Key,生产者将消息发送给Exchange的时候,通常会产生一个Routing Key,当Routing Key和Binding Key对应上的时候,消息就会发送到对应的Queue中去。那么Exchange有四种类型,不一样的类型有着不一样的策略。也就是代表不一样的类型将决定绑定的Queue不一样,换言之就是说生产者发送了一个消息,Routing Key的规则是A,那么生产者会将Routing Key=A的消息推送到Exchange中,这时候Exchange中会有本身的规则,对应的规则去筛选生产者发来的消息,若是可以对应上Exchange的内部规则就将消息推送到对应的Queue中去。那么接下来就来详细讲解下Exchange里面类型。
Exchange Types
fanout类型的Exchange路由规则很是简单,它会把全部发送到该Exchange的消息路由到全部与它绑定的Queue中。
上图所示,生产者(P)生产消息1将消息1推送到Exchange,因为Exchange Type=fanout这时候会遵循fanout的规则将消息推送到全部与它绑定Queue,也就是图上的两个Queue最后两个消费者消费。
direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key彻底匹配的Queue中
当生产者(P)发送消息时Rotuing key=booking时,这时候将消息传送给Exchange,Exchange获取到生产者发送过来消息后,会根据自身的规则进行与匹配相应的Queue,这时发现Queue1和Queue2都符合,就会将消息传送给这两个队列,若是咱们以Rotuing key=create和Rotuing key=confirm发送消息时,这时消息只会被推送到Queue2队列中,其余Routing Key的消息将会被丢弃。
前面提到的direct规则是严格意义上的匹配,换言之Routing Key必须与Binding Key相匹配的时候才将消息传送给Queue,那么topic这个规则就是模糊匹配,能够经过通配符知足一部分规则就能够传送。它的约定是:
当生产者发送消息Routing Key=F.C.E的时候,这时候只知足Queue1,因此会被路由到Queue中,若是Routing Key=A.C.E这时候会被同是路由到Queue1和Queue2中,若是Routing Key=A.F.B时,这里只会发送一条消息到Queue2中。
headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否彻底匹配Queue与Exchange绑定时指定的键值对;若是彻底匹配则消息会路由到该Queue,不然不会路由到该Queue。
首先,须要安装Rabbit MQ,能够直接安装,也能够用Docker安装,这个网上教程不少,这里就再也不赘述了。
为方便咱们初始化项目,Spring Boot给咱们提供一个项目模板生成网站。
1. 打开浏览器,访问:https://start.spring.io/
2. 根据页面提示,选择构建工具,开发语言,项目信息等。
3. 点击 Generate the project,生成项目模板,生成以后会将压缩包下载到本地。
4. 使用IDE导入项目,我这里使用Eclipse,经过导入Maven项目的方式导入。
清理掉不须要的测试类及测试依赖,添加 rabbitmq相关依赖。
<!-- rabbitmq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
下面给出完整的POM文件。
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.5.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.louis.springboot</groupId> <artifactId>demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>demo</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!-- web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- swagger --> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.9.2</version> </dependency> <!-- rabbitmq --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
添加一个swagger 配置类,在工程下新建 config 包并添加一个 SwaggerConfig 配置类。
SwaggerConfig.java
package com.louis.springboot.demo.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import springfox.documentation.builders.ApiInfoBuilder; import springfox.documentation.builders.PathSelectors; import springfox.documentation.builders.RequestHandlerSelectors; import springfox.documentation.service.ApiInfo; import springfox.documentation.spi.DocumentationType; import springfox.documentation.spring.web.plugins.Docket; import springfox.documentation.swagger2.annotations.EnableSwagger2; @Configuration @EnableSwagger2 public class SwaggerConfig { @Bean public Docket createRestApi(){ return new Docket(DocumentationType.SWAGGER_2).apiInfo(apiInfo()) .select() .apis(RequestHandlerSelectors.any()) .paths(PathSelectors.any()).build(); } private ApiInfo apiInfo(){ return new ApiInfoBuilder() .title("Swagger API Doc") .description("This is a restful api document of Swagger.") .version("1.0") .build(); } }
修改application.properties文件名为application.yml,在其中添加RabbitMQ配置信息,根据本身安装的RabbitMQ配置。
application.yml
# rabbitmq配置 spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest
新建一个RabbitMQ配置类,并添加一个demoQueue队列。
RabbitConfig.java
package com.louis.springboot.demo.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { /** * 定义demoQueue队列 * @return */ @Bean public Queue demoString() { return new Queue("demoQueue"); } }
编写一个消息发布者,并编写一个发送方法,经过AmqpTemplate往"demoQueue"发送消息。
RabbitProducer.java
package com.louis.springboot.demo.mq; import java.text.SimpleDateFormat; import java.util.Date; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class RabbitProducer { @Autowired private AmqpTemplate rabbitTemplate; public void sendDemoQueue() { Date date = new Date(); String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date); System.out.println("[demoQueue] send msg: " + dateString); // 第一个参数为刚刚定义的队列名称 this.rabbitTemplate.convertAndSend("demoQueue", dateString); } }
编写一个消息消费者,经过@RabbitListener(queues = "demoQueue")注解监听"demoQueue"队列,并用@RabbitHandler注解相关方法,这样在在队列收到消息以后,交友@RabbitHandler注解的方法进行处理。
DemoQueueConsumer.java
package com.louis.springboot.demo.mq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "demoQueue") public class DemoQueueConsumer { /** * 消息消费 * @RabbitHandler 表明此方法为接受到消息后的处理方法 */ @RabbitHandler public void recieved(String msg) { System.out.println("[demoQueue] recieved message: " + msg); } }
编写一个控制器,注入RabbitProducer调用相关消息发送方法,方便经过接口触发消息发送。
RabbitMqController.java
package com.louis.springboot.demo.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import com.louis.springboot.demo.mq.RabbitProducer; @RestController public class RabbitMqController { @Autowired private RabbitProducer rabbitProducer; @GetMapping("/sendDemoQueue") public Object sendDemoQueue() { rabbitProducer.sendDemoQueue(); return "success"; } }
编译并启动应用,打开浏览器,访问:http://localhost:8080/swagger-ui.html,进入swagger接口文档界面。
调用两次sendDemoQueue接口,在控制台能够看到咱们输出的信息,说明消息已经成功发送并被消费。
[demoQueue] send msg: 2019-58-183 04:07:38 [demoQueue] recieved message: 2019-58-183 04:07:38 [demoQueue] send msg: 2019-01-183 05:07:05 [demoQueue] recieved message: 2019-01-183 05:07:05
Fanout其实就是广播模式,只要跟它绑定的队列都会通知而且接受到消息。修改配置类,在RabbitConfig中添加以下fanout模式的队列跟交换机信息。在代码中咱们配置了三个队列名、一个fanout交换机,而且将这三个队列绑定到了fanout交换器上。只要咱们往这个交换机生产新的消息,那么这三个队列都会收到。
RabbitConfig.java
//=================== fanout广播模式 ==================== @Bean public Queue fanoutA() { return new Queue("fanout.a"); } @Bean public Queue fanoutB() { return new Queue("fanout.b"); } @Bean public Queue fanoutC() { return new Queue("fanout.c"); } /** * 定义个fanout交换器 * @return */ @Bean FanoutExchange fanoutExchange() { // 定义一个名为fanoutExchange的fanout交换器 return new FanoutExchange("fanoutExchange"); } /** * 将定义的fanoutA队列与fanoutExchange交换机绑定 * @return */ @Bean public Binding bindingExchangeWithA() { return BindingBuilder.bind(fanoutA()).to(fanoutExchange()); } /** * 将定义的fanoutB队列与fanoutExchange交换机绑定 * @return */ @Bean public Binding bindingExchangeWithB() { return BindingBuilder.bind(fanoutB()).to(fanoutExchange()); } /** * 将定义的fanoutC队列与fanoutExchange交换机绑定 * @return */ @Bean public Binding bindingExchangeWithC() { return BindingBuilder.bind(fanoutC()).to(fanoutExchange()); }
而后咱们在RabbitProducer中添加一个sendFanout方法,用来向fanout队列发送消息。
RabbitProducer.java
public void sendFanout() { Date date = new Date(); String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date); System.out.println("[fanout] send msg:" + dateString); // 注意 第一个参数是咱们交换机的名称 ,第二个参数是routerKey 咱们不用管空着就能够,第三个是你要发送的消息 this.rabbitTemplate.convertAndSend("fanoutExchange", "", dateString); }
一样的,在控制器里添加一个访问接口。
RabbitMqController.java
@GetMapping("/sendFanout") public Object sendFanout() { rabbitProducer.sendFanout(); return "success"; }
接着针对三个广播队列分别编写一个消息消费者,指定队列和处理函数。
FanoutAConsumer.java
package com.louis.springboot.demo.mq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "fanout.a") public class FanoutAConsumer { /** * 消息消费 * @RabbitHandler 表明此方法为接受到消息后的处理方法 */ @RabbitHandler public void recieved(String msg) { System.out.println("[fanout.a] recieved message: " + msg); } }
FanoutBConsumer.java
package com.louis.springboot.demo.mq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "fanout.b") public class FanoutBConsumer { /** * 消息消费 * @RabbitHandler 表明此方法为接受到消息后的处理方法 */ @RabbitHandler public void recieved(String msg) { System.out.println("[fanout.b] recieved message: " + msg); } }
FanoutCConsumer.java
package com.louis.springboot.demo.mq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "fanout.c") public class FanoutCConsumer { /** * 消息消费 * @RabbitHandler 表明此方法为接受到消息后的处理方法 */ @RabbitHandler public void recieved(String msg) { System.out.println("[fanout.c] recieved message: " + msg); } }
从新启动应用,调用sendFanout接口,经过控制台能够看到消息发送以后,a, b, c三个队列都收到了消息。
[fanout] send msg:2019-47-183 05:07:12 [fanout.c] recieved message: 2019-47-183 05:07:12 [fanout.b] recieved message: 2019-47-183 05:07:12 [fanout.a] recieved message: 2019-47-183 05:07:12
利用topic模式能够实现模糊匹配,一样的,在RabbitConfig中配置topic队列跟交换器,注意的是这里须要多配置一个bindingKey。
RabbitConfig.java
//=================== topic主题模式 ==================== @Bean public Queue topiocA() { return new Queue("topic.a"); } @Bean public Queue topicB() { return new Queue("topic.b"); } @Bean public Queue topicC() { return new Queue("topic.c"); } /** * 定义个topic交换器 * @return */ @Bean TopicExchange topicExchange() { // 定义一个名为fanoutExchange的fanout交换器 return new TopicExchange("topicExchange"); } /** * 将定义的topicA队列与topicExchange交换机绑定 * @return */ @Bean public Binding bindingTopicExchangeWithA() { return BindingBuilder.bind(topiocA()).to(topicExchange()).with("topic.msg"); } /** * 将定义的topicB队列与topicExchange交换机绑定 * @return */ @Bean public Binding bindingTopicExchangeWithB() { return BindingBuilder.bind(topicB()).to(topicExchange()).with("topic.#"); } /** * 将定义的topicC队列与topicExchange交换机绑定 * @return */ @Bean public Binding bindingTopicExchangeWithC() { return BindingBuilder.bind(topicC()).to(topicExchange()).with("topic.*.z"); }
上述配置中:
topicA的key为topic.msg 那么他只会接收包含topic.msg的消息
topicB的key为topic.#那么他只会接收topic开头的消息
topicC的key为topic.*.z那么他只会接收topic.x.z这样格式的消息
而后修改RabbitProducer,在其中添加以下三个方法,如方法名所示,分别根据匹配规则发送到A\B,B,B\C队列。
RabbitProducer.java
public void sendTopicTopicAB() { Date date = new Date(); String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date); dateString = "[topic.msg] send msg:" + dateString; System.out.println(dateString); // 注意 第一个参数是咱们交换机的名称 ,第二个参数是routerKey topic.msg,第三个是你要发送的消息 // 这条信息将会被 topic.a topic.b接收 this.rabbitTemplate.convertAndSend("topicExchange", "topic.msg", dateString); } public void sendTopicTopicB() { Date date = new Date(); String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date); dateString = "[topic.good.msg] send msg:" + dateString; System.out.println(dateString); // 注意 第一个参数是咱们交换机的名称 ,第二个参数是routerKey ,第三个是你要发送的消息 // 这条信息将会被topic.b接收 this.rabbitTemplate.convertAndSend("topicExchange", "topic.good.msg", dateString); } public void sendTopicTopicBC() { Date date = new Date(); String dateString = new SimpleDateFormat("YYYY-mm-DD hh:MM:ss").format(date); dateString = "[topic.m.z] send msg:" + dateString; System.out.println(dateString); // 注意 第一个参数是咱们交换机的名称 ,第二个参数是routerKey ,第三个是你要发送的消息 // 这条信息将会被topic.b、topic.c接收 this.rabbitTemplate.convertAndSend("topicExchange", "topic.m.z", dateString); }
一样的,在控制器里面添加发送服务对应的接口。
RabbitMqController.java
@GetMapping("/sendTopicTopicAB") public Object sendTopicTopicAB() { rabbitProducer.sendTopicTopicAB(); return "success"; } @GetMapping("/sendTopicTopicB") public Object sendTopicTopicB() { rabbitProducer.sendTopicTopicB(); return "success"; } @GetMapping("/sendTopicTopicBC") public Object sendTopicTopicBC() { rabbitProducer.sendTopicTopicBC(); return "success"; }
接着针对三个主题队列编写对应的消息消费者。
TopicAConsumer.java
package com.louis.springboot.demo.mq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "topic.a") public class TopicAConsumer { /** * 消息消费 * @RabbitHandler 表明此方法为接受到消息后的处理方法 */ @RabbitHandler public void recieved(String msg) { System.out.println("[topic.a] recieved message:" + msg); } }
TopicBConsumer.java
package com.louis.springboot.demo.mq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "topic.b") public class TopicBConsumer { /** * 消息消费 * @RabbitHandler 表明此方法为接受到消息后的处理方法 */ @RabbitHandler public void recieved(String msg) { System.out.println("[topic.b] recieved message:" + msg); } }
TopicCConsumer.java
package com.louis.springboot.demo.mq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "topic.c") public class TopicCConsumer { /** * 消息消费 * @RabbitHandler 表明此方法为接受到消息后的处理方法 */ @RabbitHandler public void recieved(String msg) { System.out.println("[topic.c] recieved message:" + msg); } }
重启应用,调用sendTopicTopicAB接口,通过匹配,route key为“topic.msg”的消息被发送到了topic.a和topic.b。
[topic.msg] send msg:2019-12-183 06:07:22 [topic.b] recieved message:[topic.msg] send msg:2019-12-183 06:07:22 [topic.a] recieved message:[topic.msg] send msg:2019-12-183 06:07:22
调用sendTopicTopicB接口,通过匹配,route key为“topic.good.msg”的消息被发送到了topic.b。
[topic.good.msg] send msg:2019-15-183 06:07:23 [topic.b] recieved message:[topic.good.msg] send msg:2019-15-183 06:07:23
调用sendTopicTopicBC接口,通过匹配,route key为“topic.m.z”的消息被发送到了topic.b和topic.c。
[topic.m.z] send msg:2019-16-183 06:07:09 [topic.b] recieved message:[topic.m.z] send msg:2019-16-183 06:07:09 [topic.c] recieved message:[topic.m.z] send msg:2019-16-183 06:07:09
官方网站:https://www.rabbitmq.com/
百度百科:https://baike.baidu.com/item/rabbitmq/9372144?fr=aladdin
中文教程:http://rabbitmq.mr-ping.com/description.html
码云:https://gitee.com/liuge1988/spring-boot-demo.git
做者:朝雨忆轻尘
出处:https://www.cnblogs.com/xifengxiaoma/
版权全部,欢迎转载,转载请注明原文做者及出处。