ActiveMQ的使用简单介绍

什么是ActiveMQ?

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

主要特点:

1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)

3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性

4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上

5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

6. 支持通过JDBC和journal提供高速的消息持久化

7. 从设计上保证了高性能的集群,客户端-服务器,点对点

8. 支持Ajax

9. 支持与Axis的整合

10. 可以很容易得调用内嵌JMS provider,进行测试

ActiveMQ的消息形式

对于消息的传递有两种类型:

一种是点对点的,称为队列Queue,即一个生产者和一个消费者一一对应;在发送生产者发送到服务器之后,如果没消费者接收,数据会保存在服务器。

另一种是发布/订阅模式,称为广播Topic,即一个生产者产生消息并进行发送后,如果有多个消费者,所有消费者都能收到信息,如果没消费者接收,数据不会保存在服务器(直接丢失),但可以通过设置持久化来实现保存在服务器。可以由多个消费者进行接收。

 

JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。

  · StreamMessage -- Java原始值的数据流

  · MapMessage--一套名称-值对

  · TextMessage--一个字符串对象

  · ObjectMessage--一个序列化的 Java对象

  · BytesMessage--一个字节的数据流

 

ActiveMQ的安装

直接进入进入http://activemq.apache.org/    --Download--下载ActiveMQ。。。tar.gz文件

安装环境:

  1. 安装在Linux系统。
  2. 需要jdk。

 

安装步骤

第一步: 把ActiveMQ 的压缩包上传到Linux系统。

第二步:解压缩。

第三步:启动。

使用bin目录下的activemq命令启动:

[[email protected] bin]# ./activemq start

关闭:

[[email protected] bin]# ./activemq stop

查看状态:

[[email protected] bin]# ./activemq status

注意:如果ActiveMQ整合spring使用不要使用activemq-all-5.12.0.jar包。建议使用5.11.2

进入管理后台:

http://虚拟机ip地址:8161 (注意,8161是默认的访问端口)

用户名:admin

密码:admin

 

解决405问题(部分人的虚拟机有这样的问题):

修改hosts文件,配置机器名和127.0.0.1的映射关系

机器名:/etx/sysconfig/network文件中定义了机器名。

Host文件的配置

重新启动Acticemq服务。

 

 

测试ActiveMQ:

Queue Producer

生产者:生产消息,发送端。

把jar包添加到工程中。使用5.11.2版本的jar包。

 

@Test

      public void testQueueProducer() throws Exception {

           // 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。

           //brokerURL服务器的ip及端口号

           ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");

           // 第二步:使用ConnectionFactory对象创建一个Connection对象。

           Connection connection = connectionFactory.createConnection();

           // 第三步:开启连接,调用Connection对象的start方法。

           connection.start();

           // 第四步:使用Connection对象创建一个Session对象。

           //第一个参数:是否开启事务。true:开启事务,第二个参数忽略。

           //第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。

           Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

           // 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。

           //参数:队列的名称。

           Queue queue = session.createQueue("test-queue");

           // 第六步:使用Session对象创建一个Producer对象。

           MessageProducer producer = session.createProducer(queue);

           // 第七步:创建一个Message对象,创建一个TextMessage对象。

           /*TextMessage message = new ActiveMQTextMessage();

           message.setText("hello activeMq,this is my first test.");*/

           TextMessage textMessage = session.createTextMessage("hello activeMq,this is my queue test.");

           // 第八步:使用Producer对象发送消息。

           producer.send(textMessage);

           // 第九步:关闭资源。

           producer.close();

           session.close();

           connection.close();

      }

 

Queue Consumer

 

@Test

      public void testQueueConsumer() throws Exception {

           // 第一步:创建一个ConnectionFactory对象。

           ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");

           // 第二步:从ConnectionFactory对象中获得一个Connection对象。

           Connection connection = connectionFactory.createConnection();

           // 第三步:开启连接。调用Connection对象的start方法。

           connection.start();

           // 第四步:使用Connection对象创建一个Session对象。

           Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

           // 第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。

           Queue queue = session.createQueue("test-queue");

           // 第六步:使用Session对象创建一个Consumer对象。

           MessageConsumer consumer = session.createConsumer(queue);

           // 第七步:接收消息。

           consumer.setMessageListener(new MessageListener() {

                

                 @Override

                 public void onMessage(Message message) {

                      try {

                            TextMessage textMessage = (TextMessage) message;

                            String text = null;

                            //取消息的内容

                            text = textMessage.getText();

                            // 第八步:打印消息。

                            System.out.println(text);

                      } catch (JMSException e) {

                            e.printStackTrace();

                      }

                 }

           });

           //等待键盘输入

           System.in.read();

           // 第九步:关闭资源

           consumer.close();

           session.close();

           connection.close();

      }

 

Topic  Producer

 

@Test

      public void testTopicProducer() throws Exception {

           // 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。

           // brokerURL服务器的ip及端口号

           ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");

           // 第二步:使用ConnectionFactory对象创建一个Connection对象。

           Connection connection = connectionFactory.createConnection();

           // 第三步:开启连接,调用Connection对象的start方法。

           connection.start();

           // 第四步:使用Connection对象创建一个Session对象。

           // 第一个参数:是否开启事务。true:开启事务,第二个参数忽略。

           // 第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。

           Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

           // 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个topic对象。

           // 参数:话题的名称。

           Topic topic = session.createTopic("test-topic");

           // 第六步:使用Session对象创建一个Producer对象。

           MessageProducer producer = session.createProducer(topic);

           // 第七步:创建一个Message对象,创建一个TextMessage对象。

           /*

            * TextMessage message = new ActiveMQTextMessage(); message.setText(

            * "hello activeMq,this is my first test.");

            */

           TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test");

           // 第八步:使用Producer对象发送消息。

           producer.send(textMessage);

           // 第九步:关闭资源。

           producer.close();

           session.close();

           connection.close();

      }

 

Topic Consumer

@Test

      public void testTopicConsumer() throws Exception {

           // 第一步:创建一个ConnectionFactory对象。

           ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");

           // 第二步:从ConnectionFactory对象中获得一个Connection对象。

           Connection connection = connectionFactory.createConnection();

           // 第三步:开启连接。调用Connection对象的start方法。

           connection.start();

           // 第四步:使用Connection对象创建一个Session对象。

           Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

           // 第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。

           Topic topic = session.createTopic("test-topic");

           // 第六步:使用Session对象创建一个Consumer对象。

           MessageConsumer consumer = session.createConsumer(topic);

           // 第七步:接收消息。

           consumer.setMessageListener(new MessageListener() {

 

                 @Override

                 public void onMessage(Message message) {

                      try {

                            TextMessage textMessage = (TextMessage) message;

                            String text = null;

                            // 取消息的内容

                            text = textMessage.getText();

                            // 第八步:打印消息。

                            System.out.println(text);

                      } catch (JMSException e) {

                            e.printStackTrace();

                      }

                 }

           });

           System.out.println("topic的消费端03。。。。。");

           // 等待键盘输入

           System.in.read();

           // 第九步:关闭资源

           consumer.close();

           session.close();

           connection.close();

      }