这篇是SpringBoot整合消息队列的第一篇文章,咱们详细介绍下消息队列的相关内容。java
MQ
(Message Quene):经过典型的生产者和消费者模型,生产者不断向消息队列中产生消息,消费者不断的从队列中获取消息。由于生产者和消费者都是异步的,并且生产者只关心消息的发送,消费者只关心消息的接收,没有业务逻辑的侵入,轻松实现业务解耦。python
场景描述:某商场具备注册功能,注册的时候须要发送短信验证码。git
传统的作法是用户提交信息到用户服务,用户服务调用短信服务发送短信,而后给用户返回响应,这种是同步的处理方式,耗时较长。加入消息队列后,用户直接提交信息到用户服务,将信息写入消息队列,直接给用户返回响应,短信服务从消息队列中读取消息进行发送短信。github
场景描述:某商场下单流程。spring
传统作法是用户下单,订单系统去查询库存系统,若是库存系统宕机了,则下单失败,损失订单量。加入消息队列后,用户下单,订单系统记录订单,将订单信息写入消息队列,下单成功,而后库存系统恢复正常后去操做数据库库存(不考虑库存为0的状况)。这样订单系统和库存系统就达到松耦合的目的了数据库
场景描述:秒杀活动。服务器
流量过大确定会致使响应超时或系统宕机,加入消息队列,用户秒杀请求写入消息队列,设置消息队列的长度等属性,达到消息队列最大长度后,直接返回秒杀失败,而后再去消费消息队列的数据,完成秒杀。微信
RabbitMQ是用Erlang语言编写的,实现了高级消息队列协议(AMQP)的消息中间件。网络
AMQP
:AMQP
是一种连接协议,直接定义网络交换的数据格式,这使得实现了AMQP
的provider
自己就是跨平台的。如下是AMQP
协议模型:app
在上图中:
在上图中:
当消息处理比较耗时时,就会出现生产消息的速度远远大于消费消息的速度,这样就会出现消息堆积,没法及时处理。这时就可让多个消费者绑定一个队列,去消费消息,队列中的消息一旦消费就会丢失,所以任务不会重复执行。
这种模型中生产者发送的消息全部消费者均可以消费。
在上图中:
这种模型消费者发送的消息,不一样类型的消息能够由不一样的消费者去消费。
在上图中:
这种模型和direct模型同样,都是能够根据routing key将消息路由到不一样的队列,只不过这种模型可让队列绑定routing key 的时候使用通配符。这种类型的routing key都是由一个或多个单词组成,多个单词之间用.
分割。
通配符介绍:
*
:只匹配一个单词
#
:匹配一个或多个单词
这种模式须要通知远程计算机运行功能并等待返回运行结果。这个过程是阻塞的。
当客户端启动时,它建立一个匿名独占回调队列。并提供名字为call的函数,这个call会发送RPC请求而且阻塞直到收到RPC运算的结果。
第一步:引入pom依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
第二步:增长RabbitMQ服务配置信息
spring: rabbitmq: virtual-host: javatrip port: 5672 host: 127.0.0.1 username: guest password: guest
这里咱们用广播模型来举例使用,广播模型(fanout)比较好理解,就像公众号同样,我天天推文章后,会推送给每一个关注用户,他们均可以看到这条消息。
广播模型注意点:
@Configuration public class RabbitConfig { final static String queueNameA = "first-queue"; final static String queueNameB = "second-queue"; /*** * 定义一个队列,设置队列属性 * @return */ @Bean("queueA") public Queue queueA(){ Map<String,Object> map = new HashMap<>(); // 消息过时时长,10秒过时 map.put("x-message-ttl",10000); // 队列中最大消息条数,10条 map.put("x-max-length",10); // 第一个参数,队列名称 // 第二个参数,durable:持久化 // 第三个参数,exclusive:排外的, // 第四个参数,autoDelete:自动删除 Queue queue = new Queue(queueNameA,true,false,false,map); return queue; } @Bean("queueB") public Queue queueB(){ Map<String,Object> map = new HashMap<>(); // 消息过时时长,10秒过时 map.put("x-message-ttl",10000); // 队列中最大消息条数,10条 map.put("x-max-length",10); // 第一个参数,队列名称 // 第二个参数,durable:持久化 // 第三个参数,exclusive:排外的, // 第四个参数,autoDelete:自动删除 Queue queue = new Queue(queueNameB,true,false,false,map); return queue; } }
@Bean public FanoutExchange fanoutExchange(){ // 第一个参数,交换机名称 // 第二个参数,durable,是否持久化 // 第三个参数,autoDelete,是否自动删除 FanoutExchange fanoutExchange = new FanoutExchange(exchangeName,true,false); return fanoutExchange; }
@Bean public Binding bindingA(@Qualifier("queueA") Queue queueA, FanoutExchange fanoutExchange){ Binding binding = BindingBuilder.bind(queueA).to(fanoutExchange); return binding; } @Bean public Binding bindingB(@Qualifier("queueB") Queue queueB,FanoutExchange fanoutExchange){ Binding binding = BindingBuilder.bind(queueB).to(fanoutExchange); return binding; }
@RabbitListener(queues = RabbitConfig.queueNameA) @Component @Slf4j public class ConsumerA { @RabbitHandler public void receive(String message){ log.info("消费者A接收到的消息:"+message); } }
@RabbitListener(queues = RabbitConfig.queueNameB) @Component @Slf4j public class ConsumerB { @RabbitHandler public void receive(String message){ log.info("消费者B接收到的消息:"+message); } }
@RestController public class provider { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("send") public void sendMessage(){ String message = "你好,我是Java旅途"; rabbitTemplate.convertAndSend(RabbitConfig.exchangeName,null,message); } }
这样生产者发送一条消息后,两个消费者就能同时消费到消息了。
此是spring-boot-route系列的第十三篇文章,这个系列的文章都比较简单,主要目的就是为了帮助初次接触Spring Boot 的同窗有一个系统的认识。本文已收录至个人github,欢迎各位小伙伴star
!
github:https://github.com/binzh303/s...
若是以为文章不错,欢迎关注、点赞、收藏,大家的支持是我创做的动力,感谢你们。
若是文章写的有问题,请不要吝啬,欢迎留言指出,我会及时核查修改。
若是你还想更加深刻的了解我,能够微信搜索「Java旅途」进行关注。回复「1024」便可得到学习视频及精美电子书。天天7:30准时推送技术文章,让你的上班路不在孤独,并且每个月还有送书活动,助你提高硬实力!