ActiveMQ的安装与使用。

一、什么是ActiveMQ
html

 1 ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个彻底支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已是好久的事情了,可是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。  2 主要特色:  3   1). 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP  4   2). 彻底支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)。  5   3.) 对Spring的支持,ActiveMQ能够很容易内嵌到使用Spring的系统里面去,并且也支持Spring2.0的特性。  6   4.) 经过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中经过JCA 1.5 resource adaptors的配置,可让ActiveMQ能够自动的部署到任何兼容J2EE 1.4 商业服务器上。  7   5). 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA。  8   6). 支持经过JDBC和journal提供高速的消息持久化。  9   7). 从设计上保证了高性能的集群,客户端-服务器,点对点。 10   8). 支持Ajax。 11   9). 支持与Axis的整合。 12   10). 能够很容易得调用内嵌JMS provider,进行测试。

二、JMS介绍:java

1 1)、JMS的全称是Java Message Service,即Java消息服务。用于在两个应用程序之间,或分布式系统中发送消息,进行异步通讯。 2 
3 2)、它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。把它应用到实际的业务需求中的话咱们能够在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑。

三、ActiveMQ的两种消息形式。linux

 1   1)、对于消息的传递有两种类型。  2     a)、一种是点对点的,即一个生产者和一个消费者一一对应。  3     b)、另外一种是发布/订阅模式,即一个生产者产生消息并进行发送后,能够由多个消费者进行接收。  4 
 5   2)、JMS定义了五种不一样的消息正文格式,以及调用的消息类型,容许你发送并接收以一些不一样形式的数据,提供现有消息格式的一些级别的兼容性。  6     a)、 StreamMessage -- Java原始值的数据流。  7     b)、 MapMessage--一套名称-值对。  8     c)、 TextMessage--一个字符串对象。  9     d)、 ObjectMessage--一个序列化的 Java对象。 10     e)、 BytesMessage--一个字节的数据流。

四、ActiveMQ的安装。官方网址:http://activemq.apache.org/web

因为ActiveMQ是java开发的,因此须要先安装jdk(注意:安装jdk,须要jdk1.7以上版本)的哦。这里使用的是apache-activemq-5.12.0-bin.tar.gz版本的。spring

开始进行解压缩操做。macos

1 [root@localhost package]# ls 2 apache-activemq-5.12.0-bin.tar.gz  apache-activemq-5.12.0-bin.zip  apache-tomcat-7.0.47.tar.gz  IK Analyzer 2012FF_hf1  IK Analyzer 2012FF_hf1.rar  jdk-7u55-linux-i586.tar.gz  solr-4.10.3.tgz.tgz  zookeeper-3.4.6.tar.gz 3 [root@localhost package]# tar -zxvf apache-activemq-5.12.0-bin.tar.gz -C /home/hadoop/soft/

解压缩完之后进入bin目录。开始进行启动操做。apache

启动:[root@localhost bin]# ./activemq start tomcat

中止:[root@localhost bin]# ./activemq stop服务器

查看状态:[root@localhost bin]# ./activemq status网络

 1 [root@localhost soft]# cd apache-activemq-5.12.0/
 2 [root@localhost apache-activemq-5.12.0]# ls  3 activemq-all-5.12.0.jar  bin  conf  data  docs  examples  lib  LICENSE  NOTICE  README.txt  webapps  webapps-demo  4 [root@localhost apache-activemq-5.12.0]# ll  5 total 9384
 6 -rwxr-xr-x. 1 root root 9524668 Aug 10  2015 activemq-all-5.12.0.jar  7 drwxr-xr-x. 5 root root    4096 Sep 15 00:39 bin  8 drwxr-xr-x. 2 root root    4096 Sep 15 00:39 conf  9 drwxr-xr-x. 2 root root    4096 Sep 15 00:39 data 10 drwxr-xr-x. 2 root root    4096 Sep 15 00:39 docs 11 drwxr-xr-x. 8 root root    4096 Sep 15 00:39 examples 12 drwxr-xr-x. 6 root root    4096 Sep 15 00:39 lib 13 -rw-r--r--. 1 root root   40580 Aug 10  2015 LICENSE 14 -rw-r--r--. 1 root root    3334 Aug 10  2015 NOTICE 15 -rw-r--r--. 1 root root    2610 Aug 10  2015 README.txt 16 drwxr-xr-x. 7 root root    4096 Sep 15 00:39 webapps 17 drwxr-xr-x. 3 root root    4096 Sep 15 00:39 webapps-demo 18 [root@localhost apache-activemq-5.12.0]# cd bin/
19 [root@localhost bin]# ls 20 activemq  activemq-diag  activemq.jar  env  linux-x86-32  linux-x86-64 macosx wrapper.jar 21 [root@localhost bin]# ./activemq start 22 INFO: Loading '/home/hadoop/soft/apache-activemq-5.12.0//bin/env'
23 INFO: Using java '/home/hadoop/soft/jdk1.7.0_55/bin/java'
24 INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details 25 INFO: pidfile created : '/home/hadoop/soft/apache-activemq-5.12.0//data/activemq.pid' (pid '9318') 26 [root@localhost bin]# ./activemq status 27 INFO: Loading '/home/hadoop/soft/apache-activemq-5.12.0//bin/env'
28 INFO: Using java '/home/hadoop/soft/jdk1.7.0_55/bin/java'
29 ActiveMQ is running (pid '9318') 30 [root@localhost bin]#

而后你能够访问后台管理界面,帐号和密码默认都是admin的。访问地址:http://192.168.110.142:8161/admin

Home是当前的欢迎页,Queues是点到点形式,Topics是发布订阅模式,Subscribers话题消息的发布与订阅,Connections客户端连接,Network当前网络的连接状态,Scheduled计划任务,Send能够测试发送消息。

五、ActiveMQ的使用方法,JMS消息发送模式。

注意:
1)、在点对点或队列模型下,一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列。这种模式被归纳为:只有一个消费者将得到消息。生产者不须要在接收者消费该消息期间处于运行状态,接收者也一样不须要在消息发送时处于运行状态。每个成功处理的消息都由接收者签收。
2)、发布者/订阅者模型支持向一个特定的消息主题发布消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方。这种模式比如是匿名公告板。这种模式被归纳为:多个消费者能够得到消息,在发布者和订阅者之间存在时间依赖性。发布者须要创建一个订阅(subscription),以便客户可以购订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者创建了持久的订阅。在那种状况下,在订阅者未链接时发布的消息将在订阅者从新链接时从新发布。

六、JMS应用程序接口。

 1 1)、ConnectionFactory 接口(链接工厂)  2  用户用来建立到JMS提供者的链接的被管对象。JMS客户经过可移植的接口访问链接,这样当下层的实现改变时,代码不须要进行修改。 管理员在JNDI名字空间中配置链接工厂,这样,JMS客户才可以查找到它们。根据消息类型的不一样,用户将使用队列链接工厂,或者主题链接工厂。  3 2)、Connection 接口(链接)  4  链接表明了应用程序和消息服务器之间的通讯链路。在得到了链接工厂后,就能够建立一个与JMS提供者的链接。根据不一样的链接类型,链接容许用户建立会话,以发送和接收队列和主题到目标。  5 3)、Destination 接口(目标)  6  目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。JMS管理员建立这些对象,而后用户经过JNDI发现它们。和链接工厂同样,管理员能够建立两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题。  7 4)、MessageConsumer 接口(消息消费者)  8  由会话建立的对象,用于接收发送到目标的消息。消费者能够同步地(阻塞模式),或异步(非阻塞)接收队列和主题类型的消息。  9 5)、MessageProducer 接口(消息生产者) 10  由会话建立的对象,用于发送消息到目标。用户能够建立某个目标的发送者,也能够建立一个通用的发送者,在发送消息时指定目标。 11 6)、Message 接口(消息) 12  是在消费者和生产者之间传送的对象,也就是说从一个应用程序创送到另外一个应用程序。一个消息有三个主要部分: 13  消息头(必须):包含用于识别和为消息寻找路由的操做设置。 14  一组消息属性(可选):包含额外的属性,支持其余提供者和用户的兼容。能够建立定制的字段和过滤器(消息选择器)。 15  一个消息体(可选):容许用户建立五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。 16  消息接口很是灵活,并提供了许多方式来定制消息的内容。 17 7)、Session 接口(会话) 18     表示一个单线程的上下文,用于发送和接收消息。因为会话是单线程的,因此消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。若是用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务以前,用户可使用回滚操做取消这些消息。一个会话容许用户建立消息生产者来发送消息,建立消息消费者来接收消息。

七、如何使用java操做activeMQ呢,把ActiveMQ依赖的jar包添加到工程中。

使用maven工程,则添加jar包的依赖:

1 <dependency>
2     <groupId>org.apache.activemq</groupId>
3     <artifactId>activemq-all</artifactId>
4     <version>5.11.2</version>
5 </dependency>

而后你就能够愉快得开发了。是否是很开森呢。

八、ActiveMQ点对点模式(point-to-point)。

ActiveMq的点对点生产者。

 1 package com.taotao.activemq;  2 
 3 import javax.jms.Connection;  4 import javax.jms.ConnectionFactory;  5 import javax.jms.JMSException;  6 import javax.jms.MessageProducer;  7 import javax.jms.Queue;  8 import javax.jms.Session;  9 import javax.jms.TextMessage; 10 
11 import org.apache.activemq.ActiveMQConnectionFactory; 12 import org.apache.activemq.command.ActiveMQTextMessage; 13 import org.junit.Test; 14 
15 /** 16  * 17  * @ClassName: ActiveMqMain.java 18  * @author: biehl 19  * @since: 2019年9月15日 下午4:44:57 20  * @Copyright: ©2019 biehl 版权全部 21  * @version: 0.0.1 22  * @Description: 23  */
24 public class ActiveMqMain { 25 
26     // activeMq得点对点生产者
27  @Test 28     public void queueProducer() throws JMSException { 29         // 一、建立一个链接工厂对象ConnectionFactory对象。须要指定mq服务得ip以及端口号61616。
30         String brokerURL = "tcp://192.168.110.142:61616"; 31         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL); 32         // 二、使用ConnectionFactory建立一个链接Connection对象。
33         Connection connection = connectionFactory.createConnection(); 34         // 三、开启链接。调用Connection对象得start方法。
35  connection.start(); 36         // 四、使用Connection对象建立一个Session对象。 37         // 参数一是否开启事务,通常不开启事务,保证数据得最终一致性,可使用消息队列实现数据最终一致性。若是第一个参数为true,第二个参数自动忽略 38         // 参数二是消息得应答模式。两种模式,自动应答和手动应答。通常使用自动应答。
39         boolean transacted = false;// 不开启事务
40         int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;// 1
41         Session session = connection.createSession(transacted, acknowledgeMode); 42         // 五、使用Session对象建立一个Destination对象。两种形式queue、topic。如今应该使用queue。
43         String queueName = "queue1";// 当前消息队列得名称
44         Queue queue = session.createQueue(queueName); 45         // 六、使用Session对象建立一个Producer对象。 46         // interface Queue extends Destination。destination是一个接口。
47         MessageProducer producer = session.createProducer(queue); 48         // 七、建立一个TextMessage对象。 49         // 建立TextMessage方式一 50         // TextMessage textMessage = new ActiveMQTextMessage(); 51         // textMessage.setText("hello activeMq......"); 52         // 方式二
53         TextMessage textMessage = session.createTextMessage("hello activeMq......"); 54         // 八、发送消息。
55  producer.send(textMessage); 56         // 九、关闭资源。
57         producer.close();// 关闭producer
58         session.close();// 关闭session
59         connection.close();// 关闭connection
60  } 61 
62 }

ActiveMQ的点对点消息生产成功之后,能够在ActiveMQ提供的web界面能够看到一些信息。

activeMq的点对点消费者。

 1 package com.taotao.activemq;  2 
 3 import java.io.IOException;  4 
 5 import javax.jms.Connection;  6 import javax.jms.ConnectionFactory;  7 import javax.jms.JMSException;  8 import javax.jms.Message;  9 import javax.jms.MessageConsumer;  10 import javax.jms.MessageListener;  11 import javax.jms.MessageProducer;  12 import javax.jms.Queue;  13 import javax.jms.Session;  14 import javax.jms.TextMessage;  15 
 16 import org.apache.activemq.ActiveMQConnectionFactory;  17 import org.apache.activemq.command.ActiveMQTextMessage;  18 import org.junit.Test;  19 
 20 /**  21  *  22  * @ClassName: ActiveMqMain.java  23  * @author: biehl  24  * @since: 2019年9月15日 下午4:44:57  25  * @Copyright: ©2019 biehl 版权全部  26  * @version: 0.0.1  27  * @Description:  28  */
 29 public class ActiveMqMain {  30 
 31     // activeMq的点对点生产者
 32  @Test  33     public void queueProducer() throws JMSException {  34         // 一、建立一个链接工厂对象ConnectionFactory对象。须要指定mq服务得ip以及端口号61616。
 35         String brokerURL = "tcp://192.168.110.142:61616";  36         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);  37         // 二、使用ConnectionFactory建立一个链接Connection对象。
 38         Connection connection = connectionFactory.createConnection();  39         // 三、开启链接。调用Connection对象得start方法。
 40  connection.start();  41         // 四、使用Connection对象建立一个Session对象。  42         // 参数一是否开启事务,通常不开启事务,保证数据得最终一致性,可使用消息队列实现数据最终一致性。若是第一个参数为true,第二个参数自动忽略  43         // 参数二是消息得应答模式。两种模式,自动应答和手动应答。通常使用自动应答。
 44         boolean transacted = false;// 不开启事务
 45         int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;// 1
 46         Session session = connection.createSession(transacted, acknowledgeMode);  47         // 五、使用Session对象建立一个Destination对象。两种形式queue、topic。如今应该使用queue。
 48         String queueName = "queue1";// 当前消息队列得名称
 49         Queue queue = session.createQueue(queueName);  50         // 六、使用Session对象建立一个Producer对象。  51         // interface Queue extends Destination。destination是一个接口。
 52         MessageProducer producer = session.createProducer(queue);  53         // 七、建立一个TextMessage对象。  54         // 建立TextMessage方式一  55         // TextMessage textMessage = new ActiveMQTextMessage();  56         // textMessage.setText("hello activeMq......");  57         // 方式二
 58         TextMessage textMessage = session.createTextMessage("hello activeMq......");  59         // 八、发送消息。
 60  producer.send(textMessage);  61         // 九、关闭资源。
 62         producer.close();// 关闭producer
 63         session.close();// 关闭session
 64         connection.close();// 关闭connection
 65  }  66 
 67     // activeMq的点对点消费者
 68  @Test  69     public void queueConsumer() throws JMSException {  70         // 一、建立一个链接工厂ConnectionFactory 对象
 71         String brokerURL = "tcp://192.168.110.142:61616";  72         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);  73         // 二、使用链接工厂对象建立一个链接
 74         Connection connection = connectionFactory.createConnection();  75         // 三、开启链接
 76  connection.start();  77         // 四、使用链接对象建立一个Session对象
 78         boolean transacted = false;// 关闭事务
 79         int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;// 自动响应
 80         Session session = connection.createSession(transacted, acknowledgeMode);  81         // 五、使用Session建立一个Destination,Destination应该和消息的发送端一致的。
 82         String queueName = "queue1";  83         Queue queue = session.createQueue(queueName);  84         // 六、使用Session建立一个Consumer对象。
 85         MessageConsumer consumer = session.createConsumer(queue);  86         // 七、向Consumer对象中设置一个MessageListener对象,用来接受消息。  87         // 匿名内部类,new 接口,后面加上{},至关于实现了这个接口的实现类。而后建立这个实现类的对象listener。
 88         MessageListener listener = new MessageListener() {  89 
 90  @Override  91             public void onMessage(Message message) {  92                 // 接受事件的。当消息到达就能够在这里接受到消息了的。  93                 // 八、取出消息的内容。
 94                 if (message instanceof TextMessage) {  95                     TextMessage textMessage = (TextMessage) message;  96                     // 九、打印消息内容。
 97                     try {  98                         String text = textMessage.getText();  99                         System.out.println(text); 100                     } catch (JMSException e) { 101  e.printStackTrace(); 102  } 103  } 104  } 105  }; 106  consumer.setMessageListener(listener); 107 
108         // 关闭资源之前,系统等待,等待接受消息。
109         /*while (true) { 110  try { 111  Thread.sleep(100); 112  } catch (InterruptedException e) { 113  e.printStackTrace(); 114  } 115  }*/
116         
117         // 等待键盘输入。才回接着向下执行的。
118         try { 119             System.in.read(); 120         } catch (IOException e) { 121  e.printStackTrace(); 122  } 123         
124         
125         // 十、关闭资源。
126         consumer.close();// 关闭consumer
127         session.close();// 关闭session
128         connection.close();// 关闭connection
129  } 130 
131 }

执行了activeMq的点对点消费者。能够在界面看到变化。能够看到有一个消费者,而后生产了7条消息,7条消息进队和7条消息出队。

 九、ActiveMQ发布订阅模式(publish/subscribe)。

消费者有两种消费方法(这里使用异步消费):
  a、同步消费。经过调用消费者的receive方法从目的地中显式提取消息。receive方法能够一直阻塞到消息到达。

  b、异步消费。客户能够为消费者注册一个消息监听器,以定义在消息到达时所采起的动做。

    实现MessageListener接口,在MessageListener()方法中实现消息的处理逻辑。

 1 package com.taotao.activemq;  2 
 3 import java.io.IOException;  4 
 5 import javax.jms.Connection;  6 import javax.jms.ConnectionFactory;  7 import javax.jms.JMSException;  8 import javax.jms.Message;  9 import javax.jms.MessageConsumer;  10 import javax.jms.MessageListener;  11 import javax.jms.MessageProducer;  12 import javax.jms.Session;  13 import javax.jms.TextMessage;  14 import javax.jms.Topic;  15 
 16 import org.apache.activemq.ActiveMQConnectionFactory;  17 import org.junit.Test;  18 
 19 /**  20  * Active的发布订阅模式  21  *  22  * @ClassName: ActiveMqTopics.java  23  * @author: biehl  24  * @since: 2019年9月19日 上午10:51:14  25  * @Copyright: ©2019 biehl 版权全部  26  * @version: 0.0.1  27  * @Description:  28  */
 29 public class ActiveMqTopics {  30 
 31     // 发布订阅模式,生产者。topic生产者生产消息默认不持久化客户端的。
 32  @Test  33     public void topicProducer() {  34         try {  35             // 一、建立一个链接工厂对象。须要指定mq服务的ip地址以及端口号61616
 36             String brikerURL = "tcp://192.168.110.142:61616";  37             // 建立ConnectionFactory接口对象,实现类ActiveMQConnectionFactory
 38             ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brikerURL);  39 
 40             // 二、建立Connection链接
 41             Connection connection = connectionFactory.createConnection();  42 
 43             // 三、开启链接,调用Connection的start方法。
 44  connection.start();  45 
 46             // 四、建立Session,使用Connection对象建立一个session  47             // 参数一是否开启事务,通常不开启事务,保证数据得最终一致性,可使用消息队列实现数据最终一致性。若是第一个参数为true,第二个参数自动忽略
 48             boolean transacted = false;  49             // 参数二是消息得应答模式。两种模式,自动应答和手动应答。通常使用自动应答。
 50             int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;  51             Session session = connection.createSession(transacted, acknowledgeMode);  52 
 53             // 五、建立Destination,应该使用topic,区别于点对点的queue
 54             String topicName = "topic01";  55             Topic topic = session.createTopic(topicName);  56 
 57             // 六、建立一个Producer对象  58             // interface Topic extends Destination.  59             // Destination是一个接口,Topic接口继承Destination这个接口。
 60             MessageProducer producer = session.createProducer(topic);  61 
 62             // 七、建立一个TextMessage对象
 63             String message = null;  64             TextMessage textMessage = null;  65             for (int i = 0; i < 100; i++) {  66                 message = i + " ActiveMQ topics......";  67                 textMessage = session.createTextMessage(message);  68 
 69                 // 八、发送消息
 70  producer.send(textMessage);  71  }  72 
 73             // 九、关闭资源
 74             producer.close();// 关闭producer
 75             session.close();// 关闭session
 76             connection.close();// 关闭connection
 77         } catch (JMSException e) {  78  e.printStackTrace();  79  }  80  }  81 
 82     // 发布订阅模式,消费者必须一直等待生产者生产的消息,由于发布订阅模式不持久化。
 83  @Test  84     public void topicConsumer() {  85         try {  86             // 一、建立一个链接工厂对象
 87             String brokerURL = "tcp://192.168.110.142:61616";  88             ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);  89 
 90             // 二、使用链接工厂对象建立一个链接
 91             Connection connection = connectionFactory.createConnection();  92 
 93             // 三、开启链接
 94  connection.start();  95 
 96             // 四、使用链接对象建立一个Session对象  97             // 参数一是否开启事务,通常不开启事务,保证数据得最终一致性,可使用消息队列实现数据最终一致性。若是第一个参数为true,第二个参数自动忽略
 98             boolean transacted = false;  99             // 参数二是消息得应答模式。两种模式,自动应答和手动应答。通常使用自动应答。
100             int acknowledgeMode = Session.AUTO_ACKNOWLEDGE; 101             Session session = connection.createSession(transacted, acknowledgeMode); 102 
103             // 五、使用session建立destination,注意,destination应该和消息的发送端一致的。
104             String topicName = "topic01"; 105             Topic topic = session.createTopic(topicName); 106 
107             // 六、使用session建立一个consumer对象
108             MessageConsumer consumer = session.createConsumer(topic); 109 
110             // 七、向Consumer对象中设置一个MessageListener对象,用来接受消息。 111             // 匿名内部类,new 接口,后面加上{},至关于实现了这个接口的实现类。而后建立这个实现类的对象listener。
112             MessageListener listener = new MessageListener() { 113                 // 接受事件的。当消息到达就能够在这里接受到消息了的。 114                 // 八、取出消息的内容。
115  @Override 116                 public void onMessage(Message message) { 117                     if (message instanceof TextMessage) { 118                         TextMessage textMessage = (TextMessage) message; 119                         // 九、打印消息内容。
120                         try { 121                             String text = textMessage.getText(); 122                             System.out.println(text); 123                         } catch (JMSException e) { 124  e.printStackTrace(); 125  } 126  } 127  } 128  }; 129  consumer.setMessageListener(listener); 130 
131             // 启动三次,模拟是三个消费者
132             System.out.println("消费者1......."); 133             // System.out.println("消费者2......."); 134             // System.out.println("消费者3......."); 135 
136             // 等待键盘输入。才回接着向下执行的。
137             try { 138                 System.in.read(); 139             } catch (IOException e) { 140  e.printStackTrace(); 141  } 142 
143             // 九、关闭资源
144             consumer.close();// 关闭producer
145             session.close();// 关闭session
146             connection.close();// 关闭connection
147         } catch (JMSException e) { 148  e.printStackTrace(); 149  } 150 
151  } 152 
153 }

执行了activeMq的发布订阅模式。能够在界面看到变化。能够看到有三个消费者,而后生产了201条消息,201条消息进队和603条消息出队。

十、ActiveMQ与Spring整合以下所示:

在pom.xml配置文件中引入本身的依赖的jar包。

1 <dependency>
2     <groupId>org.springframework</groupId>
3     <artifactId>spring-jms</artifactId>
4 </dependency>
5 <dependency>
6     <groupId>org.springframework</groupId>
7     <artifactId>spring-context-support</artifactId>
8 </dependency>

在配置文件applicationContext-activemq.xml里面配置ConnectionFactory。以下所示:

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3     xmlns:context="http://www.springframework.org/schema/context"
 4     xmlns:p="http://www.springframework.org/schema/p"
 5     xmlns:aop="http://www.springframework.org/schema/aop"
 6     xmlns:tx="http://www.springframework.org/schema/tx"
 7     xmlns:jms="http://www.springframework.org/schema/jms"
 8     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 9     xsi:schemaLocation="http://www.springframework.org/schema/beans 
10     http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
11     http://www.springframework.org/schema/context 
12     http://www.springframework.org/schema/context/spring-context-4.0.xsd
13     http://www.springframework.org/schema/aop 
14     http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 
15     http://www.springframework.org/schema/tx 
16     http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
17     http://www.springframework.org/schema/jms 
18     http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
19     http://www.springframework.org/schema/util 
20     http://www.springframework.org/schema/util/spring-util-4.0.xsd">
21 
22 
23     <!-- 真正能够产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
24     <bean id="targetConnectionFactory"
25         class="org.apache.activemq.ActiveMQConnectionFactory">
26         <property name="brokerURL"
27             value="tcp://192.168.110.142:61616" />
28     </bean>
29 
30     <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
31     <bean id="connectionFactory"
32         class="org.springframework.jms.connection.SingleConnectionFactory">
33         <!-- 目标ConnectionFactory对应真实的能够产生JMS Connection的ConnectionFactory -->
34         <property name="targetConnectionFactory"
35             ref="targetConnectionFactory" />
36     </bean>
37 </beans>

开始配置生产者的spring配置。

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3     xmlns:context="http://www.springframework.org/schema/context"
 4     xmlns:p="http://www.springframework.org/schema/p"
 5     xmlns:aop="http://www.springframework.org/schema/aop"
 6     xmlns:tx="http://www.springframework.org/schema/tx"
 7     xmlns:jms="http://www.springframework.org/schema/jms"
 8     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 9     xsi:schemaLocation="http://www.springframework.org/schema/beans 
10     http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
11     http://www.springframework.org/schema/context 
12     http://www.springframework.org/schema/context/spring-context-4.0.xsd
13     http://www.springframework.org/schema/aop 
14     http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 
15     http://www.springframework.org/schema/tx 
16     http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
17     http://www.springframework.org/schema/jms 
18     http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
19     http://www.springframework.org/schema/util 
20     http://www.springframework.org/schema/util/spring-util-4.0.xsd">
21 
22 
23     <!-- 1、真正能够产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
24     <bean id="targetConnectionFactory"
25         class="org.apache.activemq.ActiveMQConnectionFactory">
26         <property name="brokerURL"
27             value="tcp://192.168.110.142:61616" />
28     </bean>
29 
30     <!-- 2、Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
31     <bean id="connectionFactory"
32         class="org.springframework.jms.connection.SingleConnectionFactory">
33         <!-- 目标ConnectionFactory对应真实的能够产生JMS Connection的ConnectionFactory -->
34         <!-- 给属性targetConnectionFactory传值 -->
35         <property name="targetConnectionFactory"
36             ref="targetConnectionFactory" />
37     </bean>
38 
39     <!-- 3、开始配置生产者配置 -->
40     <!-- 配置生产者 -->
41     <!-- Spring提供的JMS工具类,它能够进行消息发送、接收等 -->
42     <bean id="jmsTemplate"
43         class="org.springframework.jms.core.JmsTemplate">
44         <!-- 这个connectionFactory对应的是咱们定义的Spring提供的那个ConnectionFactory对象 -->
45         <!-- 给属性connectionFactory传值 -->
46         <property name="connectionFactory" ref="connectionFactory" />
47     </bean>
48 
49     <!-- 4、配置消息的Destination对象 -->
50     <!-- 点对点模式 -->
51     <!-- 这个是队列目的地,点对点的。 -->
52     <bean id="queueDestination"
53         class="org.apache.activemq.command.ActiveMQQueue">
54         <constructor-arg>
55             <!-- 给ActiveMQQueue构造参数传递一个值为queue -->
56             <value>queue</value>
57         </constructor-arg>
58     </bean>
59 
60     <!-- 发布订阅模式 -->
61     <!-- 这个是主题目的地,一对多的。 -->
62     <bean id="topicDestination"
63         class="org.apache.activemq.command.ActiveMQTopic">
64         <!-- 给ActiveMQTopic构造参数传递一个值为topic -->
65         <constructor-arg value="topic" />
66     </bean>
67 
68 </beans>

生产者测试代码以下所示:

能够根据以前的消费者测试一下,消息的消费。

 1 package com.taotao.activemq;  2 
 3 import javax.jms.Destination;  4 import javax.jms.JMSException;  5 import javax.jms.Message;  6 import javax.jms.Session;  7 import javax.jms.TextMessage;  8 
 9 import org.junit.Test; 10 import org.springframework.context.ApplicationContext; 11 import org.springframework.context.support.ClassPathXmlApplicationContext; 12 import org.springframework.jms.core.JmsTemplate; 13 import org.springframework.jms.core.MessageCreator; 14 
15 /** 16  * 17  * @ClassName: SpringActiveMQ.java 18  * @author: biehl 19  * @since: 2019年9月19日 下午7:01:43 20  * @Copyright: ©2019 biehl 版权全部 21  * @version: 0.0.1 22  * @Description: 23  */
24 public class SpringActiveMQ { 25 
26     // 使用spring与activemq整合,是哟个jmsTemplate发送消息
27  @Test 28     public void jmsTemplateProducer() { 29         // 一、初始化spring容器
30         ApplicationContext applicationContext = new ClassPathXmlApplicationContext( 31                 "classpath:/spring/applicationContext-activemq.xml"); 32         // 二、从容器中得到jmsTemplate对象。根据类型获取到bean的对象
33         JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class); 34         // 三、从容器中得到Destination对象。根据名称获取到bean的对象
35         Destination destination = (Destination) applicationContext.getBean("queueDestination"); 36 
37         // 四、发送消息
38         jmsTemplate.send(destination, new MessageCreator() { 39 
40  @Override 41             public Message createMessage(Session session) throws JMSException { 42                 // 定义一个消息
43                 String message = "hello activeMq......"; 44                 // 发送消息
45                 TextMessage textMessage = session.createTextMessage(message); 46                 return textMessage; 47  } 48  }); 49  } 50 
51 }

效果以下所示:

 开始配置消费者的spring配置。

  1)、注意:那么消费者是经过Spring为咱们封装的消息监听容器MessageListenerContainer实现的,它负责接收信息,并把接收到的信息分发给真正的MessageListener进行处理。每一个消费者对应每一个目的地都须要有对应的MessageListenerContainer。
  2)、对于消息监听容器而言,除了要知道监听哪一个目的地以外,还须要知道到哪里去监听,也就是说它还须要知道去监听哪一个JMS服务器,这是经过在配置MessageConnectionFactory的时候往里面注入一个ConnectionFactory来实现的。
  3)、因此在配置一个MessageListenerContainer的时候有三个属性必须指定:
    a、一个是表示从哪里监听的ConnectionFactory
    b、一个是表示监听什么的Destination;
    c、一个是接收到消息之后进行消息处理的MessageListener。
  4)、经常使用的MessageListenerContainer实现类是DefaultMessageListenerContainer。

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3     xmlns:context="http://www.springframework.org/schema/context"
 4     xmlns:p="http://www.springframework.org/schema/p"
 5     xmlns:aop="http://www.springframework.org/schema/aop"
 6     xmlns:tx="http://www.springframework.org/schema/tx"
 7     xmlns:jms="http://www.springframework.org/schema/jms"
 8     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 9     xsi:schemaLocation="http://www.springframework.org/schema/beans 
10     http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
11     http://www.springframework.org/schema/context 
12     http://www.springframework.org/schema/context/spring-context-4.0.xsd
13     http://www.springframework.org/schema/aop 
14     http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 
15     http://www.springframework.org/schema/tx 
16     http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
17     http://www.springframework.org/schema/jms 
18     http://www.springframework.org/schema/jms/spring-jms-4.0.xsd
19     http://www.springframework.org/schema/util 
20     http://www.springframework.org/schema/util/spring-util-4.0.xsd">
21 
22 
23     <!-- 1、真正能够产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
24     <bean id="targetConnectionFactory"
25         class="org.apache.activemq.ActiveMQConnectionFactory">
26         <property name="brokerURL"
27             value="tcp://192.168.110.142:61616" />
28     </bean>
29 
30     <!-- 2、Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
31     <bean id="connectionFactory"
32         class="org.springframework.jms.connection.SingleConnectionFactory">
33         <!-- 目标ConnectionFactory对应真实的能够产生JMS Connection的ConnectionFactory -->
34         <!-- 给属性targetConnectionFactory传值 -->
35         <property name="targetConnectionFactory"
36             ref="targetConnectionFactory" />
37     </bean>
38 
39     <!-- 3、配置消息的Destination对象。接受消息的目的地。 -->
40     <!-- 点对点模式 -->
41     <!-- 这个是队列目的地,点对点的。 -->
42     <bean id="queueDestination"
43         class="org.apache.activemq.command.ActiveMQQueue">
44         <constructor-arg>
45             <!-- 给ActiveMQQueue构造参数传递一个值为queue -->
46             <value>queue</value>
47         </constructor-arg>
48     </bean>
49 
50     <!-- 发布订阅模式 -->
51     <!-- 这个是主题目的地,一对多的。 -->
52     <bean id="topicDestination"
53         class="org.apache.activemq.command.ActiveMQTopic">
54         <!-- 给ActiveMQTopic构造参数传递一个值为topic -->
55         <constructor-arg value="topic" />
56     </bean>
57 
58     <!-- 4、配置消息接收者 -->
59     <!-- 配置一个监听器 -->
60     <bean id="activeMqMessageListener"
61         class="com.taotao.search.listener.ActiveMqMessageListener" />
62 
63     <!-- 配置监听容器 -->
64     <bean 65         class="org.springframework.jms.listener.DefaultMessageListenerContainer">
66         <!-- 属性设置 -->
67         <!-- 一个是表示从哪里监听的ConnectionFactory -->
68         <property name="connectionFactory" ref="connectionFactory" />
69         <!-- 一个是表示监听什么的Destination -->
70         <property name="destination" ref="queueDestination" />
71         <!-- 一个是接收到消息之后进行消息处理的MessageListener -->
72         <property name="messageListener" ref="activeMqMessageListener" />
73     </bean>
74 
75 
76 </beans>

而后能够写消息监听器,用来监听生产者生产的消息,以便实现本身的业务逻辑。

 1 package com.taotao.search.listener;  2 
 3 import java.text.SimpleDateFormat;  4 import java.util.Date;  5 
 6 import javax.jms.JMSException;  7 import javax.jms.Message;  8 import javax.jms.MessageListener;  9 import javax.jms.TextMessage; 10 
11 /** 12  * 接受ActiveMQ发送的消息. 13  * 14  * @ClassName: ActiveMqMessageListener.java 15  * @author: biehl 16  * @since: 2019年9月19日 下午7:55:24 17  * @Copyright: ©2019 biehl 版权全部 18  * @version: 0.0.1 19  * @Description: 20  */
21 public class ActiveMqMessageListener implements MessageListener { 22 
23  @Override 24     public void onMessage(Message message) { 25         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); 26         System.out.println("监听生产者生产的消息,消费者进行消息消费......."); 27         // 消息到了onMessage就接受到了消息
28         if (message instanceof TextMessage) { 29             TextMessage textMessage = (TextMessage) message; 30             try { 31                 String text = textMessage.getText(); 32                 System.out.println(sdf.format(new Date()) + " : " + text); 33             } catch (JMSException e) { 34  e.printStackTrace(); 35  } 36  } 37  } 38 
39 }

因为这里只是简单的测试,若是是正式项目的话,直接加载这个配置文件,而后就能够进行消息的监听消费,我这里只是加载一下这个配置文件便可。

 1 package com.taotao.search.service;  2 
 3 import java.io.IOException;  4 
 5 import org.springframework.context.ApplicationContext;  6 import org.springframework.context.support.ClassPathXmlApplicationContext;  7 
 8 /**  9  * 10  * @ClassName: ActiveMqConsumer.java 11  * @author: biehl 12  * @since: 2019年9月19日 下午8:10:55 13  * @Copyright: ©2019 biehl 版权全部 14  * @version: 0.0.1 15  * @Description: 16  */
17 public class ActiveMqConsumer { 18 
19     // 启动spring容器。就能够实现监听生产者发送消息,消费者消费小的目的地。
20     public static void main(String[] args) { 21         // 初始化spring容器
22         ApplicationContext applicationContext = new ClassPathXmlApplicationContext( 23                 "classpath:/spring/applicationContext-activemq.xml"); 24         System.out.println("spring容器加载完毕,开始监听生产者生产的消息......."); 25         try { 26             System.in.read(); 27         } catch (IOException e) { 28  e.printStackTrace(); 29  } 30  } 31 }

实现效果以下所示:

控制台打印以下所示,只要你生产消息,这里就能够进行消息的消费。

 

 

待续......

原文出处:https://www.cnblogs.com/biehongli/p/11522793.html