RabbitMQ是一款基于AMQP(消息队列协议),由Erlang开发的开源消息队列组件。是一款优秀的消息队列组件,他由两部分组成:服务端和客户端,客户端支持多种语言的驱动,如:.Net、JAVA、Erlang等。
对于RabbitMQ来讲,生产者和消息队列之间存在隔离,生产者将消息发送给交换机,而交换机则根据调度策略把相应的消息转发给对应的消息队列。消费者经过读取消息队列从而实现消息的发送和接收。html
交换机的主要做用是接收相应的消息而且绑定到指定的队列.交换机有四种类型,分别为Direct,topic,headers,Fanout.git
Direct是RabbitMQ默认的交换机模式,也是最简单的模式.即建立消息队列的时候,指定一个BindingKey.当发送者发送消息的时候,指定对应的Key.当Key和消息队列的BindingKey一致的时候,消息将会被发送到该消息队列中.github
topic转发信息主要是依据通配符,队列和交换机的绑定主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的Key和该模式相匹配的时候,消息才会被发送到该消息队列中.web
headers也是根据一个规则进行匹配,在消息队列和交换机绑定的时候会指定一组键值对规则,而发送消息的时候也会指定一组键值对规则,当两组键值对规则相匹配的时候,消息会被发送到匹配的消息队列中.spring
Fanout是路由广播的形式,将会把消息发给绑定它的所有队列,即使设置了key,也会被忽略.c#
消息队列两个用处:服务间解耦,缓解压力(削峰平谷);
RabbitMQ实现了AQMP协议,AQMP协议定义了消息路由规则和方式。生产端经过路由规则发送消息到不一样queue,消费端根据queue名称消费消息。
RabbitMQ既支持内存队列也支持持久化队列,消费端为推模型,消费状态和订阅关系由服务端负责维护,消息消费完后当即删除,不保留历史消息。
(1)点对点
生产端发送一条消息经过路由投递到Queue,只有一个消费者能消费到。springboot
(2)多订阅
当RabbitMQ须要支持多订阅时,发布者发送的消息经过路由同时写到多个Queue,不一样订阅组消费不一样的Queue。因此支持多订阅时,消息会多个拷贝。服务器
二、安装RabbitMQ服务端
(1)下载Erlang安装包:https://pan.baidu.com/s/1bEbYnc#list/path=%2F(百度网盘地址)
(2)安装和配置RabbitMQ服务端,3.6.0版本:https://pan.baidu.com/s/1bEbYnc#list/path=%2F(百度网盘地址)
(官方:安装Erland,经过官方下载页面http://www.erlang.org/downloads获取exe安装包,直接打开并完成安装。
安装RabbitMQ,经过官方下载页面https://www.rabbitmq.com/download.html获取exe安装包。)
(3)启用web管理插件:rabbitmq-plugins enable rabbitmq_management
(4)启动RabbitMQ:chkconfig rabbitmq-server on /sbin/service rabbitmq-server start
(5)防火墙开通端口
# firewall-cmd --permanent --zone=public --add-port=5672/tcp
# firewall-cmd --permanent --zone=public --add-port=15672/tcp
# firewall-cmd --reload
(6)rabbitmq默认会建立guest帐号,只能用于localhost登陆页面管理员,本机访问地址:http://localhost:15672/app
三.SpringBoot整合RabbitMQ(Topic转发模式)tcp
首先咱们看发送端,咱们须要配置队列Queue,再配置交换机(Exchange),再把队列按照相应的规则绑定到交换机上:
首先 配置pom.xml
<!-- 添加springboot对amqp的支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
接着在application.properties中,去编辑和RabbitMQ相关的配置信息,配置信息的表明什么内容根据键就能很直观的看出了.这里端口是5672,不是15672...15672是管理端的端口!
server.port=17080
#mq
spring.application.name=spirng-boot-rabbitmq-sender
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=ncs
spring.rabbitmq.password=12345678
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/
咱们看发送端,咱们须要配置队列Queue,再配置交换机(Exchange),再把队列按照相应的规则绑定到交换机上:
package com.example.demo.conf; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Created by ningcs on 2017/10/30. */ @Configuration public class SenderConf { @Bean(name="message") public Queue queueMessage() { return new Queue("topic.message"); } @Bean(name="messages") public Queue queueMessages() { return new Queue("topic.messages"); } @Bean public TopicExchange exchange() { return new TopicExchange("exchange"); } @Bean Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");//*表示一个词,#表示零个或多个词 } }
在SpringBoot中,咱们使用AmqpTemplate去发送消息!代码以下:
package com.example.demo.sender; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * Created by ningcs on 2017/10/30. */ @Component public class HelloSender { @Autowired private AmqpTemplate template; public void send(String msg) { System.out.println(msg); template.convertAndSend("exchange","topic.message","hello,rabbit~"); } }
测试
package com.example.demo.controller; import com.example.demo.sender.HelloSender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; /** * Created by ningcs on 2017/10/30. */ @RestController @RequestMapping("rabbit") public class RabbitController { @Autowired private HelloSender helloSender; @RequestMapping(value = "/hello",method = {RequestMethod.GET, RequestMethod.POST}) public String helloSender(){ helloSender.send("hello,rabbit~"); return "发送成功"; } }
接收端(能够放在两个项目,配置文件同样):
package com.example.demo.receiver;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* Created by ningcs on 2017/10/30.
*/
@Component
public class HelloReceive {
// @RabbitListener(queues="queue") //监听器监听指定的Queue
// public void processC(String str) {
// System.out.println("Receive:"+str);
// }
@RabbitListener(queues="topic.message") //监听器监听指定的Queue
public void process1(String str) {
System.out.println("message:"+str);
}
@RabbitListener(queues="topic.messages") //监听器监听指定的Queue
public void process2(String str) {
System.out.println("messages:"+str);
}
}
访问地址:http://localhost:17080/rabbit/hello
控制台输出如下日志说明成功:
2017-11-02 11:30:11.982 INFO 4476 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 17080 (http)
2017-11-02 11:30:11.994 INFO 4476 --- [ main] d.SpringbootRabbitTopicSenderApplication : Started SpringbootRabbitTopicSenderApplication in 12.292 seconds (JVM running for 12.829)
2017-11-02 11:31:06.712 INFO 4476 --- [io-17080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring FrameworkServlet 'dispatcherServlet'
2017-11-02 11:31:06.713 INFO 4476 --- [io-17080-exec-1] o.s.web.servlet.DispatcherServlet : FrameworkServlet 'dispatcherServlet': initialization started
2017-11-02 11:31:06.729 INFO 4476 --- [io-17080-exec-1] o.s.web.servlet.DispatcherServlet : FrameworkServlet 'dispatcherServlet': initialization completed in 16 ms
hello,rabbit~
hello,rabbit~
2017-11-02 11:30:13.165 INFO 11064 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 17081 (http)
2017-11-02 11:30:13.170 INFO 11064 --- [ main] SpringbootRabbitTopicReceiverApplication : Started SpringbootRabbitTopicReceiverApplication in 13.48 seconds (JVM running for 17.121)
messages:hello,rabbit~
message:hello,rabbit~
messages:hello,rabbit~
message:hello,rabbit~
Fanout 和Direct模式详见最下面github地址。
术语解释
Broker:简单来讲就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪一个队列。
Queue:消息队列载体,每一个消息都会被投入到一个或多个队列。
Binding:绑定,它的做用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里能够开设多个vhost,用做不一样用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每一个链接里,可创建多个channel,每一个channel表明一个会话任务。
RabbitMQ消息队列详细介绍(主要涉及术语)
http://blog.csdn.net/leyangjun/article/details/52529047
通讯协议AMQP(Advanced Message Queuing Protocol)
AMQP模型中,消息在producer中产生,发送到MQ的exchange上,exchange根据配置的路由方式发到相应的Queue上,Queue又将消息发送给consumer,消息从queue到consumer有push和pull两种方式。 消息队列的使用过程大概以下:
客户端链接到消息队列服务器,打开一个channel。
客户端声明一个exchange,并设置相关属性。
客户端声明一个queue,并设置相关属性。
客户端使用routing key,在exchange和queue之间创建好绑定关系。
客户端投递消息到exchange。
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。
RabbitMQ消息队列-RabbitMQ的优劣势及产生背景
推荐博客:
http://blog.csdn.net/super_rd/article/details/70229714
详细代码访问github:https://github.com/ningcs/Springboot-rabbit-mq