ActiveMQ官方网站:https://activemq.apache.org/html
关于ActiveMQ消息传递的方式详见:java
http://www.javashuo.com/article/p-ovkjfdgv-ev.htmlgit
http://www.javashuo.com/article/p-mgrbykqq-eh.htmlgithub
本篇博客旨在解决的问题:web
1.如何在普通Java环境中使用ActiveMQspring
2.ActiveMQ如何与Spring的整合(XML配置)apache
3.在SpringBoot中如何使用ActiveMQsegmentfault
环境:windows
1. windows 10 64bit浏览器
2. apache-activemq-5.14.4
3. jdk 1.8
4. maven 3.3
前置条件:
1.安装启动ActiveMQ:
在官方网站(https://activemq.apache.org/components/classic/download/)上下载ActiveMQ
解压后,进入到目录bin中,根据本身操做系统的位数进入到win64或者win32目录下,而后点击activemq.bat启动ActiveMQ。
启动后在浏览器输入http://localhost:8161/,看到如下画面表示启动成功:
点击“Manage ActiveMQ broker”进入到ActiveMQ的后台管理界面,若要求输入用户名密码则初始用户名密码为admin,admin,以下:
2.本博客使用Maven构建项目,引入如下依赖(问题1与问题2须要引入):
<!--Activemq消息中间件 start--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.14.4</version> </dependency>
消息生产者:
package at.flying.activemq.ptp; import at.flying.domain.Student; import com.github.flyinghe.tools.CommonUtils; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.lang3.time.DateFormatUtils; import javax.jms.*; import java.util.Date; /** * PTP方式传递消息 */ public class ActiveMQProducer { public static void main(String[] args) throws Exception { // 构造ConnectionFactory实例对象 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 从工厂获取链接对象 Connection connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操做链接,一个发送或接收消息的线程 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取消息目的地,消息发送给谁 Destination destination = session.createQueue("test-queue"); // 获取消息生产者 MessageProducer producer = session.createProducer(destination); // 设置不持久化,此处学习,实际根据项目决定 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 构造消息 for (int i = 1; i <= 4; i++) { Student student = new Student(); student.setId((long) i); student.setName("学生" + i); student.setBirthday(new Date()); TextMessage message = session.createTextMessage(CommonUtils.serialize(student)); // 发送消息到目的地方 producer.send(message); System.out.println(String.format("发送消息:%d-%s-%s", student.getId(), student.getName(), DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd"))); } connection.close(); } }
消息消费者1:
package at.flying.activemq.ptp; import at.flying.domain.Student; import com.github.flyinghe.tools.CommonUtils; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.lang3.time.DateFormatUtils; import javax.jms.*; /** * PTP方式接收消息 */ public class ActiveMQConsumer1 { public static void main(String[] args) throws Exception { // 构造ConnectionFactory实例对象 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 从工厂获取链接对象 Connection connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操做链接,一个发送或接收消息的线程 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取消息目的地,消息发送给谁 Destination destination = session.createQueue("test-queue"); // 消费者,消息接收者 MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { if (null != message) { Student student = CommonUtils.deserialize(((TextMessage) message).getText()); System.out.println( String.format("ActiveMQConsumer1-接受消息:%d-%s-%s", student.getId(), student.getName(), DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd"))); } } catch (JMSException e) { } } }); System.in.read(); connection.close(); } }
消息消费者2:
package at.flying.activemq.ptp; import at.flying.domain.Student; import com.github.flyinghe.tools.CommonUtils; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.lang3.time.DateFormatUtils; import javax.jms.*; /** * PTP方式接收消息 */ public class ActiveMQConsumer2 { public static void main(String[] args) throws Exception { // 构造ConnectionFactory实例对象 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 从工厂获取链接对象 Connection connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操做链接,一个发送或接收消息的线程 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取消息目的地,消息发送给谁 Destination destination = session.createQueue("test-queue"); // 消费者,消息接收者 MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { if (null != message) { Student student = CommonUtils.deserialize(((TextMessage) message).getText()); System.out.println( String.format("ActiveMQConsumer2-接受消息:%d-%s-%s", student.getId(), student.getName(), DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd"))); } } catch (JMSException e) { } } }); System.in.read(); connection.close(); } }
先启动两个消息消费者,再启动消息生产者,控制台输出信息以下:
消息生产者:
消息消费者1:
消息消费者2:
这个结果使咱们很容易理解PTP的消息传递方式。
消息生产者:
package at.flying.activemq.pubsub; import at.flying.domain.Student; import com.github.flyinghe.tools.CommonUtils; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.lang3.time.DateFormatUtils; import javax.jms.*; import java.util.Date; /** * Pub/Sub方式传递消息 */ public class ActiveMQProducer { public static void main(String[] args) throws Exception { // 构造ConnectionFactory实例对象 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 从工厂获取链接对象 Connection connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操做链接,一个发送或接收消息的线程 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取消息目的地,消息发送给谁 Destination destination = session.createTopic("test-topic"); // 获取消息生产者 MessageProducer producer = session.createProducer(destination); // 设置不持久化,此处学习,实际根据项目决定 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 构造消息 for (int i = 1; i <= 4; i++) { Student student = new Student(); student.setId((long) i); student.setName("学生" + i); student.setBirthday(new Date()); TextMessage message = session.createTextMessage(CommonUtils.serialize(student)); // 发送消息到目的地方 producer.send(message); System.out.println(String.format("发送消息:%d-%s-%s", student.getId(), student.getName(), DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd"))); } connection.close(); } }
消息消费者1:
package at.flying.activemq.pubsub; import at.flying.domain.Student; import com.github.flyinghe.tools.CommonUtils; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.lang3.time.DateFormatUtils; import javax.jms.*; /** * Pub/Sub方式接收消息 */ public class ActiveMQConsumer1 { public static void main(String[] args) throws Exception { // 构造ConnectionFactory实例对象 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 从工厂获取链接对象 Connection connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操做链接,一个发送或接收消息的线程 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取消息目的地,消息发送给谁 Destination destination = session.createTopic("test-topic"); // 消费者,消息接收者 MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { if (null != message) { Student student = CommonUtils.deserialize(((TextMessage) message).getText()); System.out.println( String.format("ActiveMQConsumer1-接受消息:%d-%s-%s", student.getId(), student.getName(), DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd"))); } } catch (JMSException e) { } } }); System.in.read(); connection.close(); } }
消息消费者2:
package at.flying.activemq.pubsub; import at.flying.domain.Student; import com.github.flyinghe.tools.CommonUtils; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.commons.lang3.time.DateFormatUtils; import javax.jms.*; /** * Pub/Sub方式接收消息 */ public class ActiveMQConsumer2 { public static void main(String[] args) throws Exception { // 构造ConnectionFactory实例对象 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 从工厂获取链接对象 Connection connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操做链接,一个发送或接收消息的线程 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取消息目的地,消息发送给谁 Destination destination = session.createTopic("test-topic"); // 消费者,消息接收者 MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { if (null != message) { Student student = CommonUtils.deserialize(((TextMessage) message).getText()); System.out.println( String.format("ActiveMQConsumer2-接受消息:%d-%s-%s", student.getId(), student.getName(), DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd"))); } } catch (JMSException e) { } } }); System.in.read(); connection.close(); } }
先启动两个消息消费者,再启动消息生产者,控制台输出信息以下:
消息生产者:
消息消费者1:
消息消费者2:
这个结果使咱们很容易理解Pub/Sub的消息传递方式。
总结:
从以上代码能够看出PTP与Pub/Sub方式的消息传递,只是在建立消息目的地的时候不同:
PTP方式建立的消息目的地是Queue(队列),Pub/Sub方式建立的消息目的地是Topic(主题)。
ActiveMQ与Spring整合时并不须要额外依赖相似xxx-spring.jar的jar包,由于在activemq-all包中已经包含了这些依赖。
相似于其余框架诸如Quartz定时等框架与Spring整合同样,须要配置xml并在applicationContext.xml总配置文件中引入ActiveMQ的配置文件。
ActiveMQ的配置文件以下:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns = "http://www.springframework.org/schema/beans" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation = " http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd "> <!-- 配置可以产生connection的connectionfactory,由JMS对应的服务厂商提供 --> <bean id = "tagertConnectionFactory" class = "org.apache.activemq.ActiveMQConnectionFactory"> <constructor-arg name = "brokerURL" value = "tcp://localhost:61616" /> </bean> <!-- 配置spring管理真正connectionfactory的connectionfactory,至关于spring对connectionfactory的一层封装 --> <bean id = "connectionFactory" class = "org.springframework.jms.connection.SingleConnectionFactory"> <property name = "targetConnectionFactory" ref = "tagertConnectionFactory" /> </bean> <!-- 配置生产者 --> <!-- Spring使用JMS工具类,能够用来发送和接收消息 --> <bean id = "jmsTemplate" class = "org.springframework.jms.core.JmsTemplate"> <!-- 这里是配置受Spring管理的connectionfactory --> <property name = "connectionFactory" ref = "connectionFactory" /> </bean> <!-- 配置destination --> <!-- 队列目的地 --> <bean id = "queueDestination" class = "org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value = "spring-test-queue" /> </bean> <!-- 话题目的地 --> <bean id = "topicDestination" class = "org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value = "spring-test-topic" /> </bean> <!--消息监听器--> <bean id = "iMessageListener" class = "at.flying.activemq.listener.IMessageListener" /> <!-- 配置消息监听器 --> <bean class = "org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name = "connectionFactory" ref = "connectionFactory" /> <property name = "destination" ref = "queueDestination" /> <property name = "messageListener" ref = "iMessageListener" /> </bean> <bean class = "org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name = "connectionFactory" ref = "connectionFactory" /> <property name = "destination" ref = "topicDestination" /> <property name = "messageListener" ref = "iMessageListener" /> </bean> </beans>
消息监听器定义以下:
package at.flying.activemq.listener; import at.flying.domain.Student; import com.github.flyinghe.tools.CommonUtils; import org.apache.commons.lang3.time.DateFormatUtils; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; /** * 消息监听器(消费者) */ public class IMessageListener implements MessageListener { @Override public void onMessage(Message message) { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; try { Student student = CommonUtils.deserialize(textMessage.getText()); System.out.println( String.format("%sListener-接受消息:%d-%s-%s", message.getJMSDestination().toString().toLowerCase().startsWith("topic") ? "Topic" : "Queue", student.getId(), student.getName(), DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd"))); } catch (JMSException e) { e.printStackTrace(); } } } }
在applicationContext.xml总配置文件中引入ActiveMQ的配置文件:
至此,配置文件配置完毕。
为测试ActiveMQ在Web应用中的使用,咱们须要写一个页面与一个Controller来作测试。
准备一个JSP页面(其实随便啥页面都行):
<%@ page language = "java" import = "java.util.*" pageEncoding = "UTF-8" %> <%@taglib prefix = "c" uri = "http://java.sun.com/jsp/jstl/core" %> <% String path = request.getContextPath(); String basePath = request.getScheme() + "://" + request.getServerName() + ":" + request.getServerPort() + path + "/"; %> <!DOCTYPE html> <html lang = "en"> <head> <base href = "<%=basePath %>" /> <meta charset = "UTF-8"> <title>Activemq 学习</title> </head> <body> <center><h1>Activemq 学习</h1></center> <h1>测试PTP方式传递消息</h1> <form action = "<c:url value='/activemq/testQueue'/>" method = "get"> sid:<input name = "sid" type = "text" /> name:<input name = "name" type = "text" /> <button onsubmit = "true">test Queue</button> </form> <br /> <h1>测试Pub/Sub方式传递消息</h1> <form action = "<c:url value='/activemq/testTopic'/>" method = "get"> sid:<input name = "sid" type = "text" /> name:<input name = "name" type = "text" /> <button onsubmit = "true">test Topic</button> </form> </body> </html>
准备一个接收请求的Controller:
package at.flying.web.action; import at.flying.domain.Student; import com.github.flyinghe.tools.CommonUtils; import org.apache.commons.lang3.time.DateFormatUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.servlet.ModelAndView; import javax.jms.*; import java.util.Date; @Controller @RequestMapping("activemq") public class ActivemqAction { @Autowired @Qualifier("jmsTemplate") private JmsTemplate jmsTemplate; @Autowired @Qualifier("queueDestination") private Destination queueDestination; @Autowired @Qualifier("topicDestination") private Destination topicDestination; @RequestMapping(value = "testQueue", method = {RequestMethod.GET, RequestMethod.POST}) public ModelAndView testQueue( @RequestParam("sid") Long sid, @RequestParam("name") String name) { ModelAndView modelAndView = new ModelAndView(); this.jmsTemplate.send(this.queueDestination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { Student student = new Student(); student.setBirthday(new Date()); student.setId(sid); student.setName(name); TextMessage message = session.createTextMessage(CommonUtils.serialize(student)); // 发送消息到目的地方 System.out.println(String.format("发送消息:%d-%s-%s", student.getId(), student.getName(), DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd"))); return message; } }); modelAndView.setViewName("redirect:/activemq/start.jsp"); return modelAndView; } @RequestMapping(value = "testTopic", method = {RequestMethod.GET, RequestMethod.POST}) public ModelAndView testTopic( @RequestParam("sid") Long sid, @RequestParam("name") String name) { ModelAndView modelAndView = new ModelAndView(); this.jmsTemplate.send(this.topicDestination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { Student student = new Student(); student.setBirthday(new Date()); student.setId(sid); student.setName(name); TextMessage message = session.createTextMessage(CommonUtils.serialize(student)); // 发送消息到目的地方 System.out.println(String.format("发送消息:%d-%s-%s", student.getId(), student.getName(), DateFormatUtils.format(student.getBirthday(), "yyyy-MM-dd"))); return message; } }); modelAndView.setViewName("redirect:/activemq/start.jsp"); return modelAndView; } }
测试结果以下:
①PTP方式:
消息发送页面:
消息发送控制台:
消息接收控制台:
②Pub/Sub方式:
消息发送页面:
消息发送控制台:
消息接收控制台:
首先在pom文件中加入以下依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency>
application.properties文件中加入以下配置:
#ActiveMQ spring.activemq.broker-url=tcp://localhost:61616 spring.activemq.user=admin spring.activemq.password=admin #配置消息类型,false则消息模式为PTP,true则消息模式为PUB/SUB,默认值为false spring.jms.pub-sub-domain=false
新建一个配置类ActiveMQConfig:
package at.flying.springbootproject.config.activemq; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.jms.Destination; @Configuration public class ActiveMQConfig { @Bean("test-queue") public Destination testQueue() { return new ActiveMQQueue("test-queue"); } @Bean("test-topic") public Destination testTopic() { return new ActiveMQTopic("test-topic"); } }
配置监听器(消息消费者):
package at.flying.springbootproject.config.activemq; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; @Component public class ConsumerListenters { @JmsListener(destination = "test-queue") public void testQueue1(String msg) { System.out.println(String.format("test-queue1:%s", msg)); } @JmsListener(destination = "test-queue") public void testQueue2(String msg) { System.out.println(String.format("test-queue2:%s", msg)); } @JmsListener(destination = "test-topic") public void testTopic1(String msg) { System.out.println(String.format("test-topic1:%s", msg)); } @JmsListener(destination = "test-topic") public void testTopic2(String msg) { System.out.println(String.format("test-topic2:%s", msg)); } }
配置消息生产者:
package at.flying.springbootproject.service; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.stereotype.Service; import javax.jms.Destination; @Service public class ActiveMQService { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Autowired @Qualifier("test-queue") private Destination testQueue; @Autowired @Qualifier("test-topic") private Destination testTopic; public void testQueue(String msg) { if (StringUtils.isNotBlank(msg)) { this.jmsMessagingTemplate.convertAndSend(this.testQueue, msg); } } public void testTopic(String msg) { if (StringUtils.isNotBlank(msg)) { this.jmsMessagingTemplate.convertAndSend(this.testTopic, msg); } } }
到这里其实已经配置完毕,可是为了测试效果咱们还须要一个Controller来接受页面请求而后触发消息的发送:
package at.flying.springbootproject.controller; import at.flying.springbootproject.service.ActiveMQService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; @Controller @RequestMapping("activemq") public class TestActiveMQController { @Autowired private ActiveMQService activeMQService; @RequestMapping("test-queue") @ResponseBody public String test1( @RequestParam(value = "msg", required = false) String msg) { this.activeMQService.testQueue(msg); return msg; } @RequestMapping("test-topic") @ResponseBody public String test2( @RequestParam(value = "msg", required = false) String msg) { this.activeMQService.testTopic(msg); return msg; } }
而后咱们打开浏览器请求test-queue:
连续请求4次,控制台输出以下:
输出了四条消息,而且是两个消费者轮流消费。
如今咱们来测试test-topic:
注意:
此时咱们须要把application.properties里的spring.jms.pub-sub-domain属性改成true,由于true值才表明消息模式为PUB/SUB,若不更改不会报错,可是发送Topic消息时消息消费者不会消费该消息,也就是没有触发Topic消息监听器。
咱们打开浏览器请求test-topic:
连续请求2次,控制台输出以下:
输出了四条消息,同一消息两个Topic消费者均消费了。