1、消息中间件基础知识
2、ActiveMQ介绍
3、ActiveMQ下载安装(Windows版本)
4、Java操做ActiveMQ代码实战
5、Spring整合ActiveMQ代码实战
6、项目源码与参考资料下载
7、参考文章php
https://www.cnblogs.com/WUXIAOCHANG/p/10904987.htmlhtml
1.1 点对点(point-to-point,简称PTP)Queue消息传递模型
经过该消息传递模型,一个应用程序(即消息生产者)能够向另一个应用程序(即消息消费者)发送消息。在此传递模型中,消息目的地类型是队列(即Destination接口实现类实例由Session接口实现类实例经过调用其createQueue方法并传入队列名称而建立)。消息首先被传送至消息服务器端特定的队列中,而后今后对列中将消息传送至对此队列进行监听的某个消费者。同一个队列能够关联多个消息生产者和消息消费者,但一条消息仅能传递给一个消息消费者。若是多个消息消费者正在监听队列上的消息,,JMS消息服务器将根据“先来者优先”的原则肯定由哪一个消息消费者接收下一条消息。若是没有消息消费者在监听队列,消息将保留在队列中,直至消息消费者链接到队列为止。这种消息传递模型是传统意义上的懒模型或轮询模型。在此模型中,消息不是自动推进给消息消费者的,而是要由消息消费者从队列中请求得到。java
1.2 发布/订阅(publish/subscribe,简称pub/sub)Topic消息传递模型
经过该消息传递模型,应用程序可以将一条消息发送给多个消息消费者。在此传送模型中,消息目的地类型是主题(即Destination接口实现类实例由Session接口实现类实例经过调用其createTopic方法并传入主题名称而建立)。消息首先由消息生产者发布至消息服务器中特定的主题中,而后由消息服务器将消息传送至全部已订阅此主题的消费者。主题目标也支持长期订阅。长期订阅表示消费者已注册了主题目标,但在消息到达目标时该消费者能够处于非活动状态。当消费者再次处于活动状态时,将会接收该消息。若是消费者均没有注册某个主题目标,该主题只保留注册了长期订阅的非活动消费者的消息。与PTP消息传递模型不一样,pub/sub消息传递模型容许多个主题订阅者接收同一条消息。JMS一直保留消息,直至全部主题订阅者都接收到消息为止。pub/sub消息传递模型基本上是一个推模型。在该模型中,消息会自动广播,消息消费者无须经过主动请求或轮询主题的方法来得到新的消息。spring
1.3 两种模型方式比较apache
JMS定义了五种不一样的消息正文格式,以及调用的消息类型,容许你发送并接收以一些不一样形式的数据,提供现有消息格式的一些级别的兼容性。
(1)StreamMessage -- Java原始值的数据流
(2)MapMessage--一套名称-值对
(3)TextMessage--一个字符串对象
(4)ObjectMessage--一个序列化的 Java对象
(5)BytesMessage--一个字节的数据流windows
一、打开浏览器,访问网址activemq.apache.org,以下图所示:浏览器
二、下载最新的版本,当前最新版本为5.15.5,根据ActiveMQ须要安装的操做系统选择性下载对应的版本,这里我选择Windows版本,而后点击下载ZIP包,以下图所示:安全
三、下载完成之后,将zip文件解压到D盘下,解压后的目录结构以下图所示:服务器
四、在启动ActiveMQ前,首先要确保服务器上已经安装和配置好JDK,而且JDK的版本要知足ActiveMQ的要求,以下图所示:session
五、接下来咱们进入到D:\apache-activemq-5.15.5\bin,以下图所示:
六、根据服务器上操做系统的版本,选择进入到win32仍是win64,这里选择进入win64目录,而后双击activemq.bat,这时activemq就启动起来了,成功启动之后打印的日志以下图所示:
七、打开浏览器,输入http://localhost:8161/admin/ ,弹出一个windows安全提示框,提示输入activemq的用户名和密码,以下图所示:
八、接下来咱们打开D:\apache-activemq-5.15.5\conf这个目录,找到jetty-realm.properties文件(该文件保存着用户名和密码信息),以下图所示:
九、打开该文件,找到文件的末尾,格式是 用户名: 密码,用户角色 ,以下图所示:
十、角色信息的定义放在D:\apache-activemq-5.15.5\conf下的jetty.xml文件中,以下图所示:
十一、 咱们知道了角色定义的位置,角色对应的用户名和密码后,咱们就可使用默认的用户名admin和默认的密码admin来登陆系统,以下图所示:
十二、 登陆成功之后,就能够看到activemq的主页了,以下图所示:
1.1 idea建立maven项目
建立后项目结构以下:
1.2 pom.xml文件添加依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.wxc</groupId> <artifactId>activemq-service</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <!-- 去掉scope做用域,使用默认的compile,编译、测试、运行都有效的做用域 --> <!--<scope>test</scope>--> </dependency> <!-- mq消息集成 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.15.0</version> </dependency> </dependencies> </project>
1.3 建立测试类
com.wxc.test包下新建TestService.java
其中testQueueProducer方法为队列类型,testTopicProducer方法为发布/订阅类型,其中建立步骤以下:
第一步:建立ConnectionFactory对象,须要指定服务端ip及端口号。
第二步:使用ConnectionFactory对象建立一个Connection对象。
第三步:开启链接,调用Connection对象的start方法。
第四步:使用Connection对象建立一个Session对象。
第五步:使用Session对象建立一个Destination对象(topic、queue),此处建立一个Queue对象。
第六步:使用Session对象建立一个Producer对象。
第七步:建立一个Message对象,建立一个TextMessage对象。
第八步:使用Producer对象发送消息。
第九步:关闭资源。
package com.wxc.test; import org.apache.activemq.ActiveMQConnectionFactory; import org.junit.Test; import javax.jms.*; public class TestService { @Test public void testQueueProducer() throws Exception { // 第一步:建立ConnectionFactory对象,须要指定服务端ip及端口号。 //brokerURL服务器的ip及端口号 //8161是后台管理系统,61616是给java用的tcp端口 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616"); // 第二步:使用ConnectionFactory对象建立一个Connection对象。 Connection connection = connectionFactory.createConnection(); // 第三步:开启链接,调用Connection对象的start方法。 connection.start(); // 第四步:使用Connection对象建立一个Session对象。 //第一个参数:是否开启事务。true:开启事务,第二个参数忽略。 //第二个参数:当第一个参数为false时,才有意义。消息的应答模式。一、自动应答二、手动应答。通常是自动应答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session对象建立一个Destination对象(topic、queue),此处建立一个Queue对象。 //参数:队列的名称。 Queue queue = session.createQueue("test-queue"); // 第六步:使用Session对象建立一个Producer对象。 MessageProducer producer = session.createProducer(queue); // 第七步:建立一个Message对象,建立一个TextMessage对象。 /*TextMessage message = new ActiveMQTextMessage(); message.setText("hello activeMq,this is my first test.");*/ TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test."); // 第八步:使用Producer对象发送消息。 producer.send(textMessage); // 第九步:关闭资源。 producer.close(); session.close(); connection.close(); } @Test public void testTopicProducer() throws Exception { // 第一步:建立ConnectionFactory对象,须要指定服务端ip及端口号。 // brokerURL服务器的ip及端口号 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616"); // 第二步:使用ConnectionFactory对象建立一个Connection对象。 Connection connection = connectionFactory.createConnection(); // 第三步:开启链接,调用Connection对象的start方法。 connection.start(); // 第四步:使用Connection对象建立一个Session对象。 // 第一个参数:是否开启事务。true:开启事务,第二个参数忽略。 // 第二个参数:当第一个参数为false时,才有意义。消息的应答模式。一、自动应答二、手动应答。通常是自动应答。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session对象建立一个Destination对象(topic、queue),此处建立一个topic对象。 // 参数:话题的名称。 Topic topic = session.createTopic("test-topic"); // 第六步:使用Session对象建立一个Producer对象。 MessageProducer producer = session.createProducer(topic); // 第七步:建立一个Message对象,建立一个TextMessage对象。 /* * TextMessage message = new ActiveMQTextMessage(); message.setText( * "hello activeMq,this is my first test."); */ TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test"); // 第八步:使用Producer对象发送消息。 producer.send(textMessage); // 第九步:关闭资源。 producer.close(); session.close(); connection.close(); } }
舒适提示:8161端口是后台管理系统,61616端口是给java用的tcp端口
2.1 idea建立maven项目
建立后项目结构以下:
2.2 pom.xml文件添加依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.wxc</groupId> <artifactId>activemq-customer</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>6</source> <target>6</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <!-- 去掉scope做用域,使用默认的compile,编译、测试、运行都有效的做用域 --> <!--<scope>test</scope>--> </dependency> <!-- mq消息集成 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.15.0</version> </dependency> </dependencies> </project>
2.3 新建测试类
com.wxc.test包下新建TestCustomer.java
testQueueConsumer方法是测试队列方式,testTopicConsumer方法是测试发布/订阅方式,建立步骤以下:
消费者:接收消息。
第一步:建立一个ConnectionFactory对象。
第二步:从ConnectionFactory对象中得到一个Connection对象。
第三步:开启链接。调用Connection对象的start方法。
第四步:使用Connection对象建立一个Session对象。
第五步:使用Session对象建立一个Destination对象。和发送端保持一致queue,而且队列的名称一致。
第六步:使用Session对象建立一个Consumer对象。
第七步:接收消息。
第八步:打印消息。
第九步:关闭资源
package com.wxc.test; import org.apache.activemq.ActiveMQConnectionFactory; import org.junit.Test; import javax.jms.*; public class TestCustomer { @Test public void testQueueConsumer() throws Exception { // 第一步:建立一个ConnectionFactory对象。 //8161是后台管理系统,61616是给java用的tcp端口 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616"); // 第二步:从ConnectionFactory对象中得到一个Connection对象。 Connection connection = connectionFactory.createConnection(); // 第三步:开启链接。调用Connection对象的start方法。 connection.start(); // 第四步:使用Connection对象建立一个Session对象。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session对象建立一个Destination对象。和发送端保持一致queue,而且队列的名称一致。 Queue queue = session.createQueue("test-queue"); // 第六步:使用Session对象建立一个Consumer对象。 MessageConsumer consumer = session.createConsumer(queue); // 第七步:接收消息。 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; String text = null; //取消息的内容 text = textMessage.getText(); // 第八步:打印消息。 System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); //等待键盘输入 System.in.read(); // 第九步:关闭资源 consumer.close(); session.close(); connection.close(); } @Test public void testTopicConsumer() throws Exception { // 第一步:建立一个ConnectionFactory对象。 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616"); // 第二步:从ConnectionFactory对象中得到一个Connection对象。 Connection connection = connectionFactory.createConnection(); // 第三步:开启链接。调用Connection对象的start方法。 connection.start(); // 第四步:使用Connection对象建立一个Session对象。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session对象建立一个Destination对象。和发送端保持一致topic,而且话题的名称一致。 Topic topic = session.createTopic("test-topic"); // 第六步:使用Session对象建立一个Consumer对象。 MessageConsumer consumer = session.createConsumer(topic); // 第七步:接收消息。 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; String text = null; // 取消息的内容 text = textMessage.getText(); // 第八步:打印消息。 System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); System.out.println("topic的消费端03。。。。。"); // 等待键盘输入 System.in.read(); // 第九步:关闭资源 consumer.close(); session.close(); connection.close(); } }
3.1 idea建立maven项目
建立后项目结构以下:
3.2 pom.xml文件添加依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.wxc</groupId> <artifactId>activemq-customer2</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>6</source> <target>6</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <!-- 去掉scope做用域,使用默认的compile,编译、测试、运行都有效的做用域 --> <!--<scope>test</scope>--> </dependency> <!-- mq消息集成 --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>5.15.0</version> </dependency> </dependencies> </project>
3.3 新建测试类
com.wxc.test包下新建TestCustomer.java
testQueueConsumer方法是测试队列方式,testTopicConsumer方法是测试发布/订阅方式,建立步骤以下:
消费者:接收消息。
第一步:建立一个ConnectionFactory对象。
第二步:从ConnectionFactory对象中得到一个Connection对象。
第三步:开启链接。调用Connection对象的start方法。
第四步:使用Connection对象建立一个Session对象。
第五步:使用Session对象建立一个Destination对象。和发送端保持一致queue,而且队列的名称一致。
第六步:使用Session对象建立一个Consumer对象。
第七步:接收消息。
第八步:打印消息。
第九步:关闭资源
package com.wxc.test; import org.apache.activemq.ActiveMQConnectionFactory; import org.junit.Test; import javax.jms.*; public class TestCustomer { @Test public void testQueueConsumer() throws Exception { // 第一步:建立一个ConnectionFactory对象。 //8161是后台管理系统,61616是给java用的tcp端口 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616"); // 第二步:从ConnectionFactory对象中得到一个Connection对象。 Connection connection = connectionFactory.createConnection(); // 第三步:开启链接。调用Connection对象的start方法。 connection.start(); // 第四步:使用Connection对象建立一个Session对象。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session对象建立一个Destination对象。和发送端保持一致queue,而且队列的名称一致。 Queue queue = session.createQueue("test-queue"); // 第六步:使用Session对象建立一个Consumer对象。 MessageConsumer consumer = session.createConsumer(queue); // 第七步:接收消息。 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; String text = null; //取消息的内容 text = textMessage.getText(); // 第八步:打印消息。 System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); //等待键盘输入 System.in.read(); // 第九步:关闭资源 consumer.close(); session.close(); connection.close(); } @Test public void testTopicConsumer() throws Exception { // 第一步:建立一个ConnectionFactory对象。 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616"); // 第二步:从ConnectionFactory对象中得到一个Connection对象。 Connection connection = connectionFactory.createConnection(); // 第三步:开启链接。调用Connection对象的start方法。 connection.start(); // 第四步:使用Connection对象建立一个Session对象。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 第五步:使用Session对象建立一个Destination对象。和发送端保持一致topic,而且话题的名称一致。 Topic topic = session.createTopic("test-topic"); // 第六步:使用Session对象建立一个Consumer对象。 MessageConsumer consumer = session.createConsumer(topic); // 第七步:接收消息。 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; String text = null; // 取消息的内容 text = textMessage.getText(); // 第八步:打印消息。 System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }); System.out.println("topic的消费端03。。。。。"); // 等待键盘输入 System.in.read(); // 第九步:关闭资源 consumer.close(); session.close(); connection.close(); } }
(1)建立了两个客户端的链接,是用于测试过程当中体现队列方式只能被一个消费者接收,而发布/订阅方式能够被多个消费者同时收到
(2)8161端口是后台管理系统,61616端口是给java用的tcp端口
5.1 队列方式
运行两个消费者端
运行服务者端
数据结果以下:
因此验证了队列方式只能有一个消费者端接收获得,且当客户端未运行时,服务器已经发送信息了,那么ActivieMQ会在客户端启动时候,传送数据给它
5.2 发布/订阅方式
运行两个消费者端
运行服务者端
数据结果以下:
因此验证了发布/订阅方式能够多个消费者端接收获得,且当客户端未运行时,服务器已经发送信息了,那么ActivieMQ会在客户端启动时候,传送数据给它
第一步:引用相关的jar包
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> </dependency>
第二步:配置Activemq整合spring。配置ConnectionFactory
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd"> <!-- 真正能够产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.25.168:61616" /> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的能够产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> </beans>
第三步:配置生产者。
使用JMSTemplate对象。发送消息。
第四步:在spring容器中配置Destination
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd"> <!-- 真正能够产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.25.168:61616" /> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的能够产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> <!-- 配置生产者 --> <!-- Spring提供的JMS工具类,它能够进行消息发送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是咱们定义的Spring提供的那个ConnectionFactory对象 --> <property name="connectionFactory" ref="connectionFactory" /> </bean> <!--这个是队列目的地,点对点的 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>spring-queue</value> </constructor-arg> </bean> <!--这个是主题目的地,一对多的 --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic" /> </bean> </beans>
2.1 发送消息
第一步:初始化一个spring容器
第二步:从容器中得到JMSTemplate对象。
第三步:从容器中得到一个Destination对象
第四步:使用JMSTemplate对象发送消息,须要知道Destination
@Test public void testQueueProducer() throws Exception { // 第一步:初始化一个spring容器 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml"); // 第二步:从容器中得到JMSTemplate对象。 JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class); // 第三步:从容器中得到一个Destination对象 Queue queue = (Queue) applicationContext.getBean("queueDestination"); // 第四步:使用JMSTemplate对象发送消息,须要知道Destination jmsTemplate.send(queue, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage textMessage = session.createTextMessage("spring activemq test"); return textMessage; } }); }
2.2 接收消息
Taotao-search-Service中接收消息。
第一步:把Activemq相关的jar包添加到工程中
第二步:建立一个MessageListener的实现类。
public class MyMessageListener implements MessageListener { @Override public void onMessage(Message message) { try { TextMessage textMessage = (TextMessage) message; //取消息内容 String text = textMessage.getText(); System.out.println(text); } catch (JMSException e) { e.printStackTrace(); } } }
第三步:配置spring和Activemq整合。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd"> <!-- 真正能够产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 --> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.25.168:61616" /> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的能够产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> <!--这个是队列目的地,点对点的 --> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>spring-queue</value> </constructor-arg> </bean> <!--这个是主题目的地,一对多的 --> <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="topic" /> </bean> <!-- 接收消息 --> <!-- 配置监听器 --> <bean id="myMessageListener" class="com.taotao.search.listener.MyMessageListener" /> <!-- 消息监听容器 --> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory" /> <property name="destination" ref="queueDestination" /> <property name="messageListener" ref="myMessageListener" /> </bean> </beans>
第四步:测试代码。
@Test public void testQueueConsumer() throws Exception { //初始化spring容器 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml"); //等待 System.in.read(); }
连接:https://pan.baidu.com/s/10jknviW5p7MJr3FKSjvYjQ
提取码:waeh