本文记录学习在Spring Boot中使用MQ。html
MQ全称(Message Queue)又名消息队列,是一种异步通信的中间件。它的做用相似于邮局,发信人(生产者)只须要将信(消息)交给邮局,而后由邮局再将信(消息)发送给具体的接收者(消费者),具体发送过程与时间发信人能够不关注,也不会影响发信人作其它事情。目前常见的MQ有activemq、kafka、rabbitmq、zeromq、rocketmq等。java
使用MQ的优势主要有:spring
1 方法的异步执行 使用MQ能够将耗时的同步操做经过以发送消息的方式进行了异步化处理,减小了因为同步而等待的时间;apache
2 程序之间松耦合 使用MQ能够减小了服务之间的耦合性,不一样的服务能够经过消息队列进行通讯,只要约定好消息的内容格式就行;服务器
JMS(Java Message Service)即java消息服务,是一个Java平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通讯。JMS的消息机制有2种模型,一种是1对1(Point to Point)的队列的消息,这种消息,只能被一个消费者消费;另外一种是一对多的发布/订阅(Topic)消息,一条消息能够被多个消费者消费。ActiveMq是对JMS的一个实现。app
官网下载一个服务程序,解压后直接启动服务就能够了,下载地址:http://activemq.apache.org/activemq-5158-release.htmldom
SpringBoot也对Active MQ提供了支持,咱们使用时引入具体的依赖便可,修改pom.xml文件,添加依赖异步
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
在application.properties文件中配置Active MQ服务器的链接信息tcp
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
#消息模式 true:广播(Topic),false:队列(Queue),默认时false
#spring.jms.pub-sub-domain=true
完成以上配置信息后,当咱们在启动SpringBoot项目时,会自动帮咱们完成初始化操做,并提供一个JmsMessagingTemplate,提提供了咱们经常使用发送消息的各类方法供咱们使用。咱们只须要在使用的地方注入JmsMessagingTemplate便可使用。分布式
发送队列消息
@RunWith(SpringRunner.class)
@SpringBootTest
public class ActivemqApplicationTests {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Test
public void testQueueMsg(){
//建立名称为zyQueue的队列
Queue queue = new ActiveMQQueue("zyQueue");
//向队列发送消息
jmsMessagingTemplate.convertAndSend(queue,"这是一个队列消息!");
}
}
消息的接收方,监听消息队列,当队列中有消息时就能够获取到消息
@Component
public class Consumer {
private static DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,sss");
/**
* destination 目标地址即队列
*/
@JmsListener(destination = "zyQueue")
public void receiveMessage(String text){
System.out.println("接收队列消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
}
}
执行测试方法发送消息能够看到,控制台输出的消费者接受到消息
队列消息只能有一个消费者,若是有多个消费者同时监听一个队列时,只能有一个拿到消息,咱们测试,修改发送方法,循环发送10调消息
@Test
public void testQueueMsg(){
//建立名称为zyQueue的队列
Queue queue = new ActiveMQQueue("zyQueue");
//向队列发送消息
for (int i=0;i<10;i++) {
jmsMessagingTemplate.convertAndSend(queue,"这是第"+i+"个队列消息!");
}
}
在Consumer 类中再添加一个消费者,监听队列zyQueue
@JmsListener(destination = "zyQueue")
public void receiveMessage(String text){
System.out.println("接收队列消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
}
@JmsListener(destination = "zyQueue")
public void receiveMessage1(String text){
System.out.println("1接收队列消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
}
执行发送消息,看到控制台输出的结果,2个消费者平分了这10条消息
若是但愿监听同一个队列的多个消费者都能接收到全部消息,咱们就只能发送Topic消息了,咱们修改application.properties中的
#消息模式 true:广播(Topic),false:队列(Queue),默认时false
spring.jms.pub-sub-domain=true
表示要发送发布/订阅消息,发送消息的队列改用Topic发送消息,以下
@Test
public void testTopicMsg(){
Topic topic = new ActiveMQTopic("zyTopic");
for (int i=0;i<5;i++){
jmsMessagingTemplate.convertAndSend(topic,"这是第"+i+"个Topic消息!");
}
}
咱们在Consumer 类中添加两个消费者来监听zyTopic队列,接受消息
@JmsListener(destination = "zyTopic")
public void receiveTopicMessage1(String text){
System.out.println("消费者1接收消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
}
@JmsListener(destination = "zyTopic")
public void receiveTopicMessage2(String text){
System.out.println("消费者2接收消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
}
执行发消息方法,能够看到控制台输出的内容,2个消费者都完整的接收到了5条消息
咱们在测试发送消息时修改了属性文件中的配置信息,才能够发送对应的类型的消息,这是因为SpringBoot中默认的是队列消息(查看源码能够知道,监听器默认使用的DefaultJmsListenerContainerFactory),若是咱们想在不修改配置信息的状况下能够同时发送Queue和Topic消息怎么办呢,咱们须要手动的更改初始的配置类,分别针对Queue和Topic消息提供JmsListenerContainerFactory
新建一个配置类,以下
@SpringBootConfiguration
public class ActiveMqConfig {
@Bean("queueListenerFactory")
public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//设置消息模型为队列
factory.setPubSubDomain(false);
return factory;
}
@Bean("topicListenerFactory")
public JmsListenerContainerFactory topicListenerFactory(ConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//设置消息模型为队列
factory.setPubSubDomain(true);
return factory;
}
}
在容器启动时会针对两种消息类型,初始化获得两个不一样的JmsListenerContainerFactory。下来再修改消费者类,在 @JmsListener 注解中指定 containerFactory,如
@JmsListener(destination = "zyQueue", containerFactory = "queueListenerFactory")
public void receiveMessage(String text){
System.out.println("接收队列消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
}
@JmsListener(destination = "zyTopic", containerFactory = "topicListenerFactory")
public void receiveTopicMessage1(String text){
System.out.println("消费者1接收消息时间:"+ df.format(new Date()) +", 接收到消息内容:"+text);
}
Queue消息使用 queueListenerFactory,Topic消息使用 topicListenerFactory,而后注释掉属性文件中的消息模式配置就能够了。