前面咱们已经学习了如何在RabbitMQ的安装及简单使用以及在SpringBoot中集成RabbitMQ组件,接下来咱们来学习RabbitMQ的一些高级特性。java
一、添加Bookweb
为了测试监听器的使用场景,咱们先构建一个bean。spring
package com.zhaoyi.bweb.bean; public class Book { private String author; private String bookName; private String introduce; public Book() { } public static Book defaultBook(){ return new Book("红楼梦","曹雪芹", "四大名著之一..."); } public Book(String bookName, String author, String introduce) { this.author = author; this.bookName = bookName; this.introduce = introduce; } public String getIntroduce() { return introduce; } public void setIntroduce(String introduce) { this.introduce = introduce; } public String getAuthor() { return author; } public void setAuthor(String author) { this.author = author; } public String getBookName() { return bookName; } public void setBookName(String bookName) { this.bookName = bookName; } @Override public String toString() { return "Book{" + "author='" + author + '\'' + ", bookName='" + bookName + '\'' + ", introduce='" + introduce + '\'' + '}'; } }
使用咱们上一节中学习使用的项目进行接下来的操做。json
在应用中使用RabbitMQ的监听器。springboot
二、在主程序处开启基于注解的RabbitMQ模式app
package com.zhaoyi.bweb; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @EnableRabbit @SpringBootApplication public class BwebApplication { public static void main(String[] args) { SpringApplication.run(BwebApplication.class, args); } }
三、编写一个Service,监听消息队列ide
package com.zhaoyi.bweb.service; import com.zhaoyi.bweb.bean.Book; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class BookService { @RabbitListener(queues = {"joyblack.news"}) public void listernerBook(Book book){ System.out.println("receive a message:"); System.out.println(book); } }
该监听方法会将消息体数据映射到Book对象,若是数据类型不一致会出现应用程序异常。学习
该service中的listernerBook监听了咱们定制的MQ服务的joyblack.news的队列信息,当joyblack.news接受到信息的时候,会调用该方法。测试
四、 在测试用例中添加一个测试用例,用于向joyblack.news
中发送包含Book数据的消息。网站
@Autowired RabbitTemplate rabbitTemplate; @Test public void send(){ rabbitTemplate.convertAndSend("exchange.direct", "joyblack.news", Book.defaultBook()); }
运行主程序。
而后运行咱们的测试运行,能够看到,每当咱们运行一次测试用例,就会触发service的listernerBook的一次调用,并打印结果:
... receive a message: Book{author='曹雪芹', bookName='红楼梦', introduce='四大名著之一...'} receive a message: Book{author='曹雪芹', bookName='红楼梦', introduce='四大名著之一...'} ....
也就是说,经过@EnableRabbit以及@RabbitListener两个注解,咱们就能够在springboot实现简单的消息监听了。
固然,咱们也能够有其余的接收消息的模式,好比获取消息所有内容:
package com.zhaoyi.bweb.service; import com.zhaoyi.bweb.bean.Book; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class BookService { // @RabbitListener(queues = {"joyblack.news"}) // public void listernerBook(Book book){ // System.out.println("receive a message:"); // System.out.println(book); // } @RabbitListener(queues = {"joyblack.news"}) public void listernerBook(Message message){ System.out.println("receive a message:"); System.out.println(message); } }
发送消息后打印的结果为:
receive a message: (Body:'{"author":"曹雪芹","bookName":"红楼梦","introduce":"四大名著之一..."}' MessageProperties [headers={__TypeId__=com.zhaoyi.bweb.bean.Book}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=exchange.direct, receivedRoutingKey=joyblack.news, deliveryTag=1, consumerTag=amq.ctag-a8Co2RP7og21nB5A6R5QbQ, consumerQueue=joyblack.news])
即包含了整个消息内容。
前面咱们已经经过MQ的组件测试了不少有意思的功能,可是别忘了,咱们不少组件,好比交换器、消息队列等,都是咱们经过RabbitMQ的管理网站事先建立的。那么,咱们会有这样的疑问,可不能够经过RabbitMQ提供的什么组件帮咱们在应用程序中完成这样的操做呢?答案是能!
这个组件就是咱们这一章节将要讲到的AmqpAdmin。经过AmqpAdmin咱们能够建立交换器、消息队列以及绑定规则等。
要使用AmqpAdmin很简单,还记得咱们以前讲过的自动配置类吗,他提供的两个重要组件之一就是AmqpAdmin。咱们直接在应用程序中注入该组件就可使用了。
Exchange.inteface是MQ组件中定义的一个接口
package org.springframework.amqp.core; import java.util.Map; public interface Exchange extends Declarable { String getName(); String getType(); boolean isDurable(); boolean isAutoDelete(); Map<String, Object> getArguments(); boolean isDelayed(); boolean isInternal(); }
咱们查看该接口的实现类有5个(其实有一个抽象类做为中间件),他们分别是DirectExchange、FanoutExchange、CustomExchange、TopicExchange以及HeadersExchange。其中有2个咱们不用在乎,其余三个恰好对应咱们以前所讲到的3种交换器类型,所以,要建立交换器,直接建立对应类型的交换器便可,例如,咱们建立一个direct类型的交换器,并命名为exchange.myDirect.
package com.zhaoyi.bweb; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.DirectExchange; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class BwebApplicationTests { @Autowired AmqpAdmin amqpAdmin; @Test public void createExchange(){ amqpAdmin.declareExchange(new DirectExchange("exchange.myDirect")); System.out.println("create exchange success."); } }
运行该测试用例,咱们就能够在管理网站处的Exchanges选项卡查看到新建立的direct类型的exchange了。
amqpAdmin.declarexxxx能够建立xxx类型的RabbitMQ组件。
同理,咱们能够经过amqpAdmin.declareQueue建立其余的组件,提供的参数一一对应于咱们配置对应组件时指定的那些配置选项。
@Test public void createQueue(){ amqpAdmin.declareQueue(new Queue("queue.myQueue", true)); System.out.println("create Queue success."); }
该测试用例建立了一个name=queue.myQueue,以及durable为true(便可持久化)的队列。
建立绑定规则时咱们须要查看一下Binding类对应的方法参数:
public Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments) { // 目的地 this.destination = destination; // 目的的类型 this.destinationType = destinationType; // 交换器 this.exchange = exchange; // 路由键 this.routingKey = routingKey; // 额外参数 this.arguments = arguments; }
所以,咱们对应这些参数进行配置就能够了,你也能够感受获得,这些参数都是和咱们的管理网站的可视化配置一一对应起来的:
@Test public void createBinding(){ amqpAdmin.declareBinding(new Binding("queue.myQueue", Binding.DestinationType.QUEUE, "exchange.myDirect", "queue.myQueue", null )); System.out.println("create Binding success."); }
能够看到,咱们定义了一个绑定规则,他是绑定在交换器exchange.myDirect上,路由键为queue.myQueue,并指向目的地为queue.myQueue的队列。
如今,查看管理网站,能够清晰的看到咱们此次建立的三个组件,以及他们之间的绑定关系。
注意路由键这个属性,一般状况下,咱们会将其命名为和目的地队列同样的名称,但请不要混淆两者。若是你的应用足够复杂,显然是不少绑定规则,而且路由键是多种多样的。
amqpAdmin.deleteXxx 能够帮助咱们删除指定名称的交换器和队列,你们能够本身尝试使用。