是AMQP(Advanced Message Queue Protocol)的开源实现java
Message 消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其余消息的优先权)、delivery-mode(指出该消息可能须要持久性存储)等。 Publisher 消息的生产者,也是一个向交换器发布消息的客户端应用程序。 Exchange 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 Exchange有4种类型:direct(默认),fanout, topic, 和headers,不一样类型的Exchange转发消息的策略有所区别 Queue 消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者链接到这个队列将其取走。 Binding 绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列链接起来的路由规则,因此能够将交换器理解成一个由绑定构成的路由表。 Exchange 和Queue的绑定能够是多对多的关系。 Connection 网络链接,好比一个TCP链接。 Channel 信道,多路复用链接中的一条独立的双向数据流通道。信道是创建在真实的TCP链接内的虚拟链接,AMQP 命令都是经过信道发出去的,无论是发布消息、订阅队列仍是接收消息,这些动做都是经过信道完成。由于对于操做系统来讲创建和销毁 TCP 都是很是昂贵的开销,因此引入了信道的概念,以复用一条 TCP 链接。 Consumer 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。 Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每一个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有本身的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在链接时指定,RabbitMQ 默认的 vhost 是 / 。 Broker 表示消息队列服务器实体
2.1 Exchange类型git
Exchange分发消息时根据类型的不一样分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键, headers 交换器和 direct 交换器彻底一致,但性能差不少,目前几乎用不到了,因此直接看另外三种类型:github
2.1.1 Direct Exchangeweb
消息中的路由键(routing key)若是和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。
路由键与队列名彻底匹配,若是一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,
不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是彻底匹配、单播的模式。
2.1.2 Fanout Exchangespring
每一个发到 fanout 类型交换器的消息都会分到全部绑定的队列上去。
fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每一个发送到交换器的消息都会被转发到与该交换器绑定的全部队列上。
很像子网广播,每台子网内的主机都得到了一份复制的消息。fanout 类型转发消息是最快的。
2.1.3 Topic Exchangedocker
topic 交换器经过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列须要绑定到一个模式上。
它将路由键和绑定键的字符串切分红单词,这些单词之间用点隔开。它一样也会识别两个通配符:符号“#”和符号“*”。
#匹配0个或多个单词,*匹配一个单词。
基于docker的国内镜像安装(3-management带管理界面的rabbitmq)
> docker pull registry.docker-cn.com/library/rabbitmq:3-management
启动rabbitmq(-d 后台启动 -p 端口映射 5672 链接rabbirmq的端口 15672访问rabbitmq web管理界面的端口)
> docker run -d -p 5672:5672 -p 15672:15672 --name brianRabbitMQ xxxxxx(镜像name或者镜像ID)
rabbitmq的管理web访问url: ip:15672,默认的帐户密码guest/guest服务器
4.1 引用依赖网络
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
4.2 AmqpAdmin建立和删除 Queue, Exchange,Bindingspring-boot
ManageMQService.java性能
package com.kawa.mq; import com.kawa.config.Contents; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class ManageMQService { @Autowired AmqpAdmin amqpAdmin; public void createExchange(String exchangeName,String mqType){ if(mqType.equals(Contents.DIRECT_EXCHANGE)){ amqpAdmin.declareExchange(new DirectExchange(exchangeName)); } if(mqType.equals(Contents.FANOUT_EXCHANGE)){ amqpAdmin.declareExchange(new FanoutExchange(exchangeName)); } if(mqType.equals(Contents.TOPIC_EXCHANGE)){ amqpAdmin.declareExchange(new TopicExchange(exchangeName)); } } public void removeExchange(String exchangeName){ amqpAdmin.deleteExchange(exchangeName); } public void createQueue(String queueName){ amqpAdmin.declareQueue(new Queue(queueName,true)); } public void removeQueue(String queueName){ amqpAdmin.deleteQueue(queueName); } public void createBinding(String queueName, String exchangeName, String routingKey){ amqpAdmin.declareBinding(new Binding(queueName, Binding.DestinationType.QUEUE,exchangeName,routingKey,null)); } public void removeBinding(String queueName, String exchangeName, String routingKey){ amqpAdmin.removeBinding(new Binding(queueName, Binding.DestinationType.QUEUE,exchangeName,routingKey,null)); } }
4.3 使用RabbitTemplate发送消息
SendMessageService.java
package com.kawa.mq; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class SendMessageService { @Autowired RabbitTemplate rabbitTemplate; public void sendMessage(String exchange,String routingKey,Object obj){ //Message须要本身构造一个;定义消息体和消息头 //rabbitTemplate.send(exchange,routingKey,message); //object默认当成消息体,只须要传入发送对象,自动序列化发送给rabbitmq rabbitTemplate.convertAndSend(exchange,routingKey,obj); } }
4.4 测试建立 Queue, Exchange,Binding和发送消息
SpbDemoApplicationTests.java
@Test public void sendMessage() { manageMQService.createQueue("brian.test"); manageMQService.createExchange("brian",Contents.DIRECT_EXCHANGE); manageMQService.createBinding("brian.test","brian","mymq"); Brian brian = new Brian(); User user = new User(); user.setId((long) 12345678); user.setUsername("cassiel"); user.setPassword("#fyds"); List<String> list = new ArrayList<>(); list.add("我"); list.add("爱"); list.add("你"); list.add("中"); list.add("国"); Map<String,Object> map = new HashMap<>(); map.put("123","包邮"); brian.setKawadate(new Date()); brian.setLists(list); brian.setObj(map); brian.setUser(user); sendMessageService.sendMessage("brian","mymq",brian); }
查看结果
4.5 添加@RabbitListener监听和处理消息
在使用RabbitListener注解接收消息时,须要在启动类上加上数据@EnableRabbit
BrianService.java
package com.kawa.sercice; import com.kawa.pojo.Brian; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class BrianService { @RabbitListener(queues = "brian.test") public void receiveMessage(Brian brian){ System.out.println("接收到的消息体:" + brian); } }
4.6 启动工程查看测试结果
查看到队列里面消息已经没有了