可经过消息服务中间件来提高系统异步通讯、扩展解耦能力html
消息服务中两个重要概念:消息代理(message broker)和目的地(destination)当消息发送者发送消息之后,将由消息代理接管,消息代理保证消息传递到指定目的地。java
消息队列主要有两种形式的目的地:python
队列(queue):点对点消息通讯(point-to-point)linux
主题(topic):发布(publish)/订阅(subscribe)消息通讯web
注:经过ActiveMQ的学习便可知道以上的概念redis
在未引入消息中间件的状况下,响应时间并不能降到最低;在引入消息中间件时,响应时间由150ms下降为55ms;spring
在秒杀系统中,咱们能够引入消息队列进行流量削峰。如,5件商品,100人抢购,若是抢购完了,则后面抢购的消息所有拒绝。docker
消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列json
消息只有惟一的发送者和接受者,但并非说只能有一个接收者浏览器
发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息.类比微信公众号
JAVA消息服务(Java Message Service),基于JVM消息代理的规范。ActiveMQ、HornetMQ是JMS实现
高级消息队列协议(Advanced Message Queuing Protocol),也是一个消息代理的规范,兼容JMS.RabbitMQ是AMQP的实现;
spring-jms提供了对JMS的支持;
spring-rabbit提供了对AMQP的支持;
须要ConnectionFactory的实现来链接消息代理;
提供JmsTemplate、RabbitTemplate来发送消息;
@JmsListener(JMS)、@RabbitListener(AMQP)注解在方法上监听消息代理发布的消息;
@EnableJms、@EnableRabbit开启支持;
JmsAutoConfiguration
RabbitAutoConfiguration
RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue Protocol)的开源实现。
Message:消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其余消息的优先权)、delivery-mode(指出该消息可能须要持久性存储)等。
Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。Exchange有4种类型:direct(默认),fanout, topic, 和headers,不一样类型的Exchange转发消息的策略有所区别.
Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者链接到这个队列将其取走。
Binding:绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列链接起来的路由规则,因此能够将交换器理解成一个由绑定构成的路由表。Exchange 和Queue的绑定能够是多对多的关系。
Connection:网络链接,好比一个TCP链接。
Channel:信道,多路复用链接中的一条独立的双向数据流通道。信道是创建在真实的TCP链接内的虚拟链接,AMQP 命令都是经过信道发出去的,不论是发布消息、订阅队列仍是接收消息,这些动做都是经过信道完成。由于对于操做系统来讲创建和销毁 TCP 都是很是昂贵的开销,因此引入了信道的概念,以复用一条 TCP 链接。
Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
Virtual Host虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每一个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有本身的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在链接时指定,RabbitMQ 默认的 vhost 是/。
Broker:表示消息队列服务器实体。
AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差异,AMQP 中增长了 Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。
Exchange分发消息时根据类型的不一样分发策略有区别,目前共四种类型:direct、fanout、topic、headers;headers 匹配 AMQP 消息的 header 而不是路由键, headers 交换器和 direct 交换器彻底一致,但性能差不少,目前几乎用不到了,因此直接看另外三种类型:
如,usa.news和usa.weather会匹配到usa.#上;而usa.weather和europe.weather就会匹配到#.weather;相似于模糊匹配
https://www.cnblogs.com/HOsystem/p/13789551.html |
https://www.cnblogs.com/HOsystem/p/13789551.html |
在docker上pull rabbitmq
[root@hosystem ~]# docker pull rabbitmq Using default tag: latest latest: Pulling from library/rabbitmq 171857c49d0f: Pull complete 419640447d26: Pull complete 61e52f862619: Pull complete 856781f94405: Pull complete 125d5ee3d600: Pull complete 42de77c4d197: Pull complete 4d65f87814dd: Pull complete f6c0bf06039f: Pull complete 01671add1b7b: Pull complete 088ff84cf8cb: Pull complete Digest: sha256:3da3bcd2167a1fc9bdbbc40ec0ae2b195df5df05e3c10c64569c969cb3d86435 Status: Downloaded newer image for rabbitmq:latest docker.io/library/rabbitmq:latest [root@hosystem ~]# docker images REPOSITORY TAG IMAGE ID CREATED SIZE redis latest 62f1d3402b78 4 days ago 104MB rabbitmq latest ea2bf0a30abf 4 weeks ago 156MB hello-world latest bf756fb1ae65 10 months ago 13.3kB |
经过docker启动rabbitmq
[root@hosystem ~]# docker images REPOSITORY TAG IMAGE ID CREATED SIZE rabbitmq latest ea2bf0a30abf 4 weeks ago 156MB [root@hosystem ~]# docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq ea2bf0a30abf e687835a6ea784d55717dc402d5d447d62e486e78f6c770ec703dfdec3d64f16 [root@hosystem ~]# |
-d:表示后台启动
-p:进行端口映射
--name:从新名,修改为咱们想要的名字
访问rabbitmq,由于个人ip为192.168.188.198因此只要在浏览器上输入192.168.188.198:15672便可;帐号:guest 密码:guest
注:rabbitMQ启动后用web访问显示服务器拒绝访问,用如下方法解决
#添加防火墙规则 [root@hosystem ~]# firewall-cmd --permanent --zone=public --add-port=15672/tcp success [root@hosystem ~]# firewall-cmd --reload success |
#https://blog.csdn.net/tl1242616458/article/details/105586984 [root@hosystem ~]# docker exec -it myrabbitmq /bin/bash [root@e687835a6ea7:/]# rabbitmq-plugins enable rabbitmq_management |
按照上图依次添加,exchange.direct、exchange.fanout、exchange.topic这三个exchange.效果以下
点击须要的exchange,进去后在bingdings里填写与之绑定的queues。
咱们发送key为hello.news的消息,由于咱们topic有#.news,因此只要有#.news均可以接收
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> |
#rabbitmq配置信息 spring.rabbitmq.host=192.168.188.198 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #spring.rabbitmq.port= #spring.rabbitmq.virtual-host= |
AmqpAdmin:管理组件
RabbitTemplate:消息发送处理组件
RabbitProperties是封装RabbitMQ相关配置的类
RabbitTemplate是用于RabbitMQ发送和接收消息
AmqpAdmin是RabbitMQ系统管理功能组件
配置rabbitmq参数
#rabbitmq配置信息 spring.rabbitmq.host=192.168.188.198 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #spring.rabbitmq.port= #spring.rabbitmq.virtual-host= |
发送消息到rabbitmq,默认使用java-serialized序列化
@SpringBootTest class Springboot02AmqpApplicationTests {
@Autowired RabbitTemplate rabbitTemplate;
/** * 单播(点对点) */ @Test void contextLoads() { //message须要本身定义;定义消息体内容和消息头(org.springframework.amqp.core.Message()) // rabbitTemplate.send(exchange,routekey,message);
//object默认当成消息体,只须要传入要发送的对象,自动序列化发送给rabbitmq // rabbitTemplate.convertAndSend(exchange,routekey,message); Map<String,Object> map = new HashMap<>(); map.put("msg","test1"); map.put("data", Arrays.asList("helloworld",123,true)); //对象被默认序列化(java-serialized-object)后发送 // rabbitTemplate.convertAndSend("exchange.direct","hos.news",map);
} |
/** * 接收rabbitmsq消息 * 将数据转为json发送出去(private MessageConverter messageConverter = new SimpleMessageConverter();) */ @Test public void receive(){ Object o = rabbitTemplate.receiveAndConvert("hos.news"); System.out.println(o.getClass()); System.out.println(o); Object o1 = rabbitTemplate.receiveAndConvert("hos.news"); System.out.println(o1.getClass()); System.out.println(o1);
} |
将数据转为json发送出去MessageConverter messageConverter = new SimpleMessageConverter();(org.springframework.amqp.support.converter.MessageConverter)
package com.hosystem.amqp.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class MyAMQPConfig {
@Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } } |
package com.hosystem.amqp.bean;
public class Book {
private String bookName; private String author;
public Book() { }
public Book(String bookName, String author) {
this.bookName = bookName; this.author = author; }
public String getBookName() { return bookName; }
public String getAuthor() { return author; }
public void setBookName(String bookName) { this.bookName = bookName; }
public void setAuthor(String author) { this.author = author; } } |
使用自定义对象发送给rabbitmq
@SpringBootTest class Springboot02AmqpApplicationTests {
@Autowired RabbitTemplate rabbitTemplate;
/** * 单播(点对点) */ @Test void contextLoads() { //message须要本身定义;定义消息体内容和消息头(org.springframework.amqp.core.Message()) // rabbitTemplate.send(exchange,routekey,message);
//object默认当成消息体,只须要传入要发送的对象,自动序列化发送给rabbitmq // rabbitTemplate.convertAndSend(exchange,routekey,message); Map<String,Object> map = new HashMap<>(); map.put("msg","test1"); map.put("data", Arrays.asList("helloworld",123,true)); //发送自定义对象 rabbitTemplate.convertAndSend("exchange.direct","hos.news",new Book("Linux","linux"));
} } |
@SpringBootTest class Springboot02AmqpApplicationTests {
@Autowired RabbitTemplate rabbitTemplate;
/** * 接收rabbitmsq消息 * 将数据转为json发送出去(private MessageConverter messageConverter = new SimpleMessageConverter();) */ @Test public void receive(){ Object o = rabbitTemplate.receiveAndConvert("hos.news"); System.out.println(o.getClass()); System.out.println(o); } |
package com.hosystem.amqp;
@SpringBootTest class Springboot02AmqpApplicationTests {
@Autowired RabbitTemplate rabbitTemplate;
/** * 广播 */ @Test public void sendMsg(){ rabbitTemplate.convertAndSend("exchange.fanout","",new Book("python书籍","python做者")); } } |
@EnableRabbit + @RabbitListener 监听消息队列内容
package com.hosystem.amqp.service;
import com.hosystem.amqp.bean.Book; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service;
@Service public class BookService {
@RabbitListener(queues = "hos.news") public void receive(Book book){ System.out.println("收到消息:"+book); }
@RabbitListener(queues = "hos") public void receive02(Message message){ System.out.println(message.getBody()); System.out.println(message.getMessageProperties()); } } |
@EnableRabbit //开启基于注解的rabbitmq模式 @SpringBootApplication public class Springboot02AmqpApplication {
public static void main(String[] args) { SpringApplication.run(Springboot02AmqpApplication.class, args); }
} |
AmqpAdmin(org.springframework.amqp.core.AmqpAdmin):RabbitMQ系统管理功能组件。
AmqpAdmin:建立和删除Queue、exchange、binding
@SpringBootTest class Springboot02AmqpApplicationTests {
@Autowired RabbitTemplate rabbitTemplate;
// @Bean // @ConditionalOnSingleCandidate(ConnectionFactory.class) // @ConditionalOnProperty( // prefix = "spring.rabbitmq", // name = {"dynamic"}, // matchIfMissing = true // ) // @ConditionalOnMissingBean // public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) { // return new RabbitAdmin(connectionFactory); // } @Autowired AmqpAdmin amqpAdmin;
//建立exchange queues binding @Test public void createExchange(){ //org.springframework.amqp.core.Exchange //org.springframework.amqp.core.DirectExchange //建立exchange // amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange")); // System.out.println("建立成功");
//建立queues //org.springframework.amqp.core.AmqpAdmin // amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));
//org.springframework.amqp.core.Binding //建立binding amqpAdmin.declareBinding(new Binding("amqpadmin.queue",Binding.DestinationType.QUEUE,"amqpadmin.exchange","amqp.haha",null)); } } |