【本文版权归微信公众号"代码艺术"(ID:onblog)全部,如果转载请务必保留本段原创声明,违者必究。如果文章有不足之处,欢迎关注微信公众号私信与我进行交流!】
RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的做用。java
消息中间件最主要的做用是解耦,中间件最标准的用法是生产者生产消息传送到队列,消费者从队列中拿取消息并处理,生产者不用关心是谁来消费,消费者不用关心谁在生产消息,从而达到解耦的目的。在分布式的系统中,消息队列也会被用在不少其它的方面,好比:分布式事务的支持,RPC的调用等等。web
RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。RabbitMQ主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者没法快速消费,那么须要一个中间层。保存这个数据。spring
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。shell
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。编程
一般咱们谈到队列服务, 会有三个概念: 发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上, 多作了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。json
那么,其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定。数组
交换机的功能主要是接收消息而且转发到绑定的队列,交换机不存储消息,在启用ack模式后,交换机找不到队列会返回错误。交换机有四种类型:Direct, topic, Headers and Fanout安全
RabbitMQ简单来讲,就是生产者发送消息到虚拟主机,虚拟主机把消息交给指定的交换机,交换机按照规则扔给消息队列进行存储,消息队列等待消费者来消费。服务器
由此我想到了商品买卖:厂家生产商品卖给批发部,批发部交给指定的超市出售,超市按照售价摆放在门店,并等待顾客上门购买。微信
完美~~
由于RabbitMQ是由erlang语言写的,就像Java程序须要jdk环境同样,运行RabbitMQ也须要erlang环境。
环境:Centos7.4
下载地址:http://erlang.org/download/
在Linux终端运行命令行
下载:
wget http://erlang.org/download/otp_src_18.3.tar.gz
下载必定要认准otp_src_字样。
解压:
tar -zxvf otp_src_18.3.tar.gz
进入解压后的根目录:
./configure --prefix=/usr/local/erlang --enable-hipe --enable-threads --enable-smp-support --enable-kernel-poll --without-javac
make && make install
上面有点慢。
把erlang加入环境变量:
vi /etc/profile
export ERLANG=/usr/local/erlang/erlang export PATH=$ERLANG/bin:$PATH
使环境变量生效
source /etc/profile
而后,咱们测试下是否安装成功:
[root@ystcode ~]# erl Erlang/OTP 18 [erts-7.3] [source] [64-bit] [async-threads:10] [hipe] [kernel-poll:false] Eshell V7.3 (abort with ^G) 1>
下载地址:http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.1/
下载:
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.1/rabbitmq-server-generic-unix-3.6.1.tar.xz
对于下载xz包进行解压,首先先下载xz压缩工具:
yum install xz
对rabbitmq包进行解压:
xz -d xz -d rabbitmq-server-generic-unix-3.6.1.tar.xz
tar -xvf rabbitmq-server-generic-unix-3.6.1.tar
随后在sbin
目录启用MQ管理方式:
./rabbitmq-plugins enable rabbitmq_management #启动后台管理
./rabbitmq-server -detached #后台运行rabbitmq
添加用户和权限 默认网页guest用户是不容许访问的,须要增长一个用户修改一下权限,代码以下:
添加用户:
./rabbitmqctl add_user admin admin
添加权限:
./rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
修改用户角色:
./rabbitmqctl set_user_tags admin administrator
而后就能够远程访问了,而后可直接配置用户权限等信息。
验证
访问http://localhost:15672/,输入admin用户密码,登陆成功!
新建一个1.5版本的Spring boot项目,选择rabbitmq+web模块。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
virtual-host: /
默认就是斜杠,具体查看rabbitmq后台admin栏。若是默认这行不用写。
spring: rabbitmq: host: 127.0.0.1 username: admin password: admin virtual-host: / #能够不用写
direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去.
1.咱们在RabbitMQ后台新建一个交换机,demo-direct
交换机名,direct
交换机类型,Durable
持久化:
2.再新建一个消息队列,取名为demo
:
3.点击demo-direct
交换机进入绑定消息队列demo
:
若是没有指定routingkey,消息队列的名称就是routingkey
4.绑定成功后查看:
你能够直接在交换机页面下方的Publish message
发送消息,在消息队列的Get message
查看消息,不过咱们实际生产比较多使用编程:
在Spring Boot建立单元测试
demo-direct
交换机名
demo
是该交换机绑定的消息队列名
发送消息
@Autowired RabbitTemplate rabbitTemplate; @Test public void contextLoads() { //message须要本身构造一个,定义消息体内容和消息体 //rabbitTemplate.send(exchange,routingkey,message); Map map = new HashMap(); map.put("key","值"); map.put("msg",true); //对象被默认序列化后发送 rabbitTemplate.convertAndSend("demo-direct","demo",map); }
此时发送消息咱们在rabbitmq网页发现消息是经序列化后的,咱们若是想改变序列化机制为JSON,也很简单,只须要注入一我的Bean:
@Configuration public class MyAMQPConfig { @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
而后咱们再启动测试,发送。
接收消息
demo
是消息队列名,也就是消费者只须要获得消息队列的名字就能够接收队列中的消息。
@Test public void receive(){ Object o = rabbitTemplate.receiveAndConvert("demo"); System.out.println(o.getClass()); System.out.println(o); }
查看打印
class java.util.HashMap {msg=true, key=值}
【本文版权归微信公众号"代码艺术"(ID:onblog)全部,如果转载请务必保留本段原创声明,违者必究。如果文章有不足之处,欢迎关注微信公众号私信与我进行交流!】
转发消息到全部绑定队列(广播:忽略routing_key )
1.首先咱们须要在RabbitMQ后台建立一个广播交换机:
2.再建立一些(demo1,demo2)消息队列,以一个做为演示:
3.进入建立的交换机页面:
4.交换机与消息队列(demo1,demo2)进行绑定:
若是没有指定routingkey,消息队列的名称就是routingkey
在Spring Boot建立单元测试
注意在广播模式下会忽略忽略routing_key
发送消息
@Autowired RabbitTemplate rabbitTemplate; @Test public void send() { Book book = new Book(); book.setName("<西游记>"); book.setAnthony("吴承恩"); //对象被默认序列化后发送 rabbitTemplate.convertAndSend("demo-fanout","",book); }
接收消息
@Test public void receive(){ Object o = rabbitTemplate.receiveAndConvert("demo"); System.out.println(o.getClass()); System.out.println(o); }
打印输出:
class cn.zyzpp.rabbitmq.entity.Book Book{name='<西游记>', anthony='吴承恩'}
按规则转发消息(使用通配符)
1.在rabbitmq后台新建一个topic交换机
2.新建一个消息队列demo
3.交换机绑定消息队列,注意此处的路由键Routing key使用了通配符
*
表示一个词.#
表示零个或多个词.那咱们如何区分几个字母为一个单词呢?
答案是经过”点分”的 routing_key 形式,好比两个单词是 *.demo
hello.demo
,
若是路由键为demo.#
,那能够匹配demo.
开头的全部路由键。
4.查看此时的交换机
在Spring Boot建立单元测试
发送消息
@Test public void contextLoads() { //message须要本身构造一个,定义消息体内容和消息体 //rabbitTemplate.send(exchange,routingkey,message); Map map = new HashMap(); map.put("key","topic交换机"); map.put("msg",true); //对象被默认序列化后发送 rabbitTemplate.convertAndSend("demo-topic","demo.hello",map); }
接收消息
@Test public void receive(){ Object o = rabbitTemplate.receiveAndConvert("demo"); System.out.println(o.getClass()); System.out.println(o); }
打印输出
class java.util.HashMap {msg=true, key=topic交换机}
1.上面演示的是经过RabbitMQ网页后台建立,经过编程的方式也很是简单:
@Autowired AmqpAdmin amqpAdmin; /** * 代码建立交换机与消息队列并绑定 */ @Test public void createExChange(){ new TopicExchange("topic.exChange"); new FanoutExchange("fanout.exChange"); amqpAdmin.declareExchange(new DirectExchange("amqp.exChange"));//建立交换机(remove为删除交换机) System.out.println("单播交换机建立完成"); amqpAdmin.declareQueue(new Queue("amqp.queue",true)); //建立消息队列 amqpAdmin.declareBinding(new Binding("amqp.queue", Binding.DestinationType.QUEUE,"amqp.exChange","amqp.exChange",null));//绑定 }
2.登陆后台查看,建立成功!
监听消息队列,当有消息发到消息队列,立马获取并操做。方法也很简单。
1.开启基于注解的Rabbit模式
@EnableRabbit //开启基于注解的Rabbit模式 @SpringBootApplication public class RabbitmqApplication { public static void main(String[] args) { SpringApplication.run(RabbitmqApplication.class, args); } }
2.@RabbitListener注解实现监听
注意此时你的消息队列已经有了demo
和demo.news
@Service public class BookService { @RabbitListener(queues = "demo") public void receive(Book book){ System.out.println("收到消息:"+ book); } @RabbitListener(queues = "demo.news") public void receiveMess(Message message){ System.out.println("收到消息:"+ message); System.out.println(message.getBody()); //getBody返回的byte[]字节数组 System.out.println(message.getMessageProperties()); } }
而后咱们在开启主程序时再运行测试用例:
测试用例
@Test public void send() { Book book = new Book(); book.setName("<西游记>"); book.setAnthony("吴承恩"); //对象被默认序列化后发送 rabbitTemplate.convertAndSend("demo-fanout","",book); }
查看主控制台打印
收到消息:Book{name='<西游记>', anthony='吴承恩'}
测试用例
@Test public void contextLoads() { //message须要本身构造一个,定义消息体内容和消息体 //rabbitTemplate.send(exchange,routingkey,message); Map map = new HashMap(); map.put("key","topic交换机"); map.put("msg",true); //对象被默认序列化后发送 rabbitTemplate.convertAndSend("demo-topic","demo.hello",map); }
查看主控制台打印
收到消息:(Body:'{"msg":true,"key":"topic交换机"}' MessageProperties [headers={__ContentTypeId__=java.lang.Object, __KeyTypeId__=java.lang.Object, __TypeId__=java.util.HashMap}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=demo-topic, receivedRoutingKey=demo.hello, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-KRZh2DyNETjEaGSH0JZ2dA, consumerQueue=demo.news]) [B@5332f99e MessageProperties [headers={__ContentTypeId__=java.lang.Object, __KeyTypeId__=java.lang.Object, __TypeId__=java.util.HashMap}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=demo-topic, receivedRoutingKey=demo.hello, receivedDelay=null, deliveryTag=1, messageCount=0, consumerTag=amq.ctag-KRZh2DyNETjEaGSH0JZ2dA, consumerQueue=demo.news]
【本文版权归微信公众号"代码艺术"(ID:onblog)全部,如果转载请务必保留本段原创声明,违者必究。如果文章有不足之处,欢迎关注微信公众号私信与我进行交流!】