消息队列之 ActiveMQ

简介

ActiveMQ 特色

ActiveMQ 是由 Apache 出品的一款开源消息中间件,旨在为应用程序提供高效、可扩展、稳定、安全的企业级消息通讯。 它的设计目标是提供标准的、面向消息的、多语言的应用集成消息通讯中间件。ActiveMQ 实现了 JMS 1.1 并提供了不少附加的特性,好比 JMX 管理、主从管理、消息组通讯、消息优先级、延迟接收消息、虚拟接收者、消息持久化、消息队列监控等等。其主要特性有:html

  1. 支持包括 Java、C、C++、C#、Ruby、Perl、Python、PHP 等多种语言的客户端和协议。协议包含 OpenWire、Stomp、AMQP、MQTT 。
  2. 提供了像消息组通讯、消息优先级、延迟接收消息、虚拟接收者、消息持久化之类的高级特性
  3. 彻底支持 JMS 1.1 和 J2EE 1.4规范(包括持久化、分布式事务消息、事务)
  4. 对 Spring 框架的支持,ActiveMQ 能够经过 Spring 的配置文件方式很容易嵌入到 Spring 应用中
  5. 经过了常见的 J2EE 服务器测试,好比 TomEE、Geronimo、JBoss、GlassFish、WebLogic
  6. 链接方式的多样化,ActiveMQ 提供了多种链接模式,例如 in-VM、TCP、SSL、NIO、UDP、多播、JGroups、JXTA
  7. 支持经过使用 JDBC 和 journal 实现消息的快速持久化
  8. 为高性能集群、客户端-服务器、点对点通讯等场景而设计
  9. 提供了技术和语言中立的 REST API 接口
  10. 支持 Ajax 方式调用 ActiveMQ
  11. ActiveMQ 能够轻松地与 CXF、Axis 等 Web Service 技术整合,以提供可靠的消息传递
  12. 可用做为内存中的 JMS 提供者,很是适合 JMS 单元测试

基本概念

由于 ActiveMQ 是完整支持 JMS 1.1 的,因此从 Java 使用者的角度其基本概念与 JMS 1.1 规范是一致的。java

消息传送模型
  1. 点对点模型(Point to Point) 使用队列(Queue)做为消息通讯载体,知足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。程序员

  2. 发布订阅模型(Pub/Sub) 使用主题做为消息通讯载体,相似于广播模式,发布者发布一条消息,该消息经过主题传递给全部的订阅者,在一条消息广播以后才订阅的用户则是收不到该条消息的。web

基本组件

ActiveMQ 使用时包含的基本组件各与 JMS 是相同的:spring

  1. Broker,消息代理,表示消息队列服务器实体,接受客户端链接,提供消息通讯的核心服务。
  2. Producer,消息生产者,业务的发起方,负责生产消息并传输给 Broker 。
  3. Consumer,消息消费者,业务的处理方,负责从 Broker 获取消息并进行业务逻辑处理。
  4. Topic,主题,发布订阅模式下的消息统一聚集地,不一样生产者向 Topic 发送消息,由 Broker 分发到不一样的订阅者,实现消息的广播。
  5. Queue,队列,点对点模式下特定生产者向特定队列发送消息,消费者订阅特定队列接收消息并进行业务逻辑处理。
  6. Message,消息体,根据不一样通讯协议定义的固定格式进行编码的数据包,来封装业务 数据,实现消息的传输。

因为这些概念在 JMS 中已介绍过,这里再也不详细介绍。数据库

链接器

ActiveMQ Broker 的主要做用是为客户端应用提供一种通讯机制,为此 ActiveMQ 提供了一种链接机制,并用链接器(connector)来描述这种链接机制。ActiveMQ 中链接器有两种,一种是用于客户端与消息代理服务器(client-to-broker)之间通讯的传输链接器(transport connector),一种是用于消息代理服务器之间(broker-to-broker)通讯的网络链接器(network connector)。connector 使用 URI(统一资源定位符)来表示,URI 格式为: <schema name>:<hierarchical part>[?<query>][#<fragment>] schema name 表示协议, 例如:foo://username:password@example.com:8042/over/there/index.dtb?type=animal&name=narwhal#noseapache

其中 schema name 部分是 foo,hierarchical part 是 username:password@example.com:8042/over/there/index.dtb,query 是 type=animal&name=narwhal,fragment 是 nose。缓存

  1. 传输链接器 为了交换消息,消息生产者和消息消费者(统称为客户端)都须要链接到消息代理服务器,这种客户端和消息代理服务器之间的通讯就是经过传输链接器(Transport connectors)完成的。不少状况下用户链接消息代理时的需求侧重点不一样,有的更关注性能,有的更注重安全性,所以 ActiveMQ 提供了一系列l链接协议供选择,来覆盖这些使用场景。从消息代理的角度看,传输链接器就是用来处理和监听客户端链接的,查看 ActiveMQ demo 的配置文件(/examples/conf/activemq-demo.xml),传输链接的相关配置以下:
<transportConnectors>
            <transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
            <transportConnector name="ssl" uri="ssl://localhost:61617"/>
            <transportConnector name="stomp" uri="stomp://localhost:61613"/>
            <transportConnector name="ws" uri="ws://localhost:61614/" />
        </transportConnectors>
复制代码

传输链接器定义在<transportConnectors>元素中,一个<transportConnector>元素定义一个特定的链接器,一个链接器必须有本身惟一的名字和 URI 属性,但discoveryUri属性是可选的。目前在 ActiveMQ 最新的5.15版本中经常使用的传输链接器链接协议有:vm、tcp、udp、multicast、nio、ssl、http、https、websocket、amqp、mqtt、stomp 等等安全

  • vm,容许客户端和消息服务器直接在 VM 内部通讯,采用的链接不是 Socket 链接,而是直接的虚拟机本地方法调用,从而避免网络传输的开销。应用场景仅限于服务器和客户端在同一 JVM 中。
  • tcp,客户端经过 TCP 链接到远程的消息服务器。
  • udp,客户端经过 UDP 链接到远程的消息服务器。
  • multicast,容许使用组播传输的方式链接到消息服务器。
  • nio,nio 和 tcp 的做用是同样的,只不过 nio 使用了 java 的 NIO包,这可能在某些场景下可提供更好的性能。
  • ssl,ssl 容许用户在 TCP 的基础上使用 SSL 。
  • http 和 https,容许客户端使用 REST 或 Ajax 的方式进行链接,这意味着能够直接使用 Javascript 向 ActiveMQ 发送消息。
  • websocket,容许客户端经过 HTML5 中的 WebSocket 方式链接到消息服务器。
  • amqp,5.8版本开始支持。
  • mqtt、stomp,5.6版本开始支持。

每一个协议的具体配置见官网(http://activemq.apache.org/uri-protocols.html )。除了以上这些基本协议以外 ActiveMQ 还支持一些高级协议也能够经过 URI 的方式进行配置,好比 Failover 和 Fanout 。bash

  • Failover 是一种从新链接的机制,工做于上面介绍的链接协议的上层,用于创建可靠的传输。其配置语法容许制定任意多个复合的 URI ,它会自动选择其中的一个 URI 来尝试创建链接,若是该链接没有成功,则会继续选择其它的 URI 来尝试。配置语法例如:failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100
  • Fanout 是一种从新链接和复制的机制,它也工做于其它链接的上层,采用复制的方式把消息复制到多个消息服务器。配置语法例如:fanout:(tcp://localhost:61629,tcp://localhost:61639,tcp://localhost:61649)
  1. 网络链接器 不少状况下,咱们要处理的数据多是海量的,这种场景单台服务器很难支撑,这就要用到集群功能,为此 ActiveMQ 提供了网络链接的模式,简单说就是经过把多个消息服务器实例链接在一块儿做为一个总体对外提供服务,从而提升总体对外的消息服务能力。经过这种方式链接在一块儿的服务器实例之间可共享队列和消费者列表,从而达到分布式队列的目的,网络链接器就是用来配置服务器之间的通讯。

使用网络链接器的简单场景
)

如图所示,服务器 S1 和 S2 经过 NewworkConnector 相连,生产者 P1 发送的消息,消费者 C3 和 C4 均可以接收到,而生产者 P3 发送的消息,消费者 C1 和 C2 也能够接收到。要使用网络链接器的功能须要在服务器 S1 的 activemq.xml 中的 broker 节点下添加以下配置(假设192.168.11.23:61617 为 S2 的地址):

<networkConnectors>      
          <networkConnector uri="static:(tcp://192.168.11.23:61617)"/>    
</networkConnectors>
复制代码

若是只是这样,S1 能够将消息发送到 S2,但这只是单方向的通讯,发送到 S2 上的的消息还不能发送到 S1 上。若是想 S1 也收到从 S2 发来的消息须要在 S2 的 activemq.xml 中的 broker 节点下也添加以下配置(假设192.168.11.45:61617为 S1 的地址):

<networkConnectors>      
          <networkConnector uri="static:(tcp://192.168.11.45:61617)"/>    
</networkConnectors>
复制代码

这样,S1和S2就能够双向通讯了。目前在 ActiveMQ 最新的5.15版本中经常使用的网络链接器协议有 static 和 multicast 两种。

  • static,静态协议,用于为一个网络中多个代理建立静态配置,这种配置协议支持复合的 URI (即包含其余 URI 的 URI)。例如static://(tcp://ip:61616,tcp://ip2:61616)
  • multicast,多点传送协议,消息服务器会广播本身的服务,也会定位其余代理。这种方式用于服务器之间实现动态识别,而不是配置静态的 IP 组。

对这块感兴趣的话能够看官方文档:http://activemq.apache.org/networks-of-brokers.html

消息存储

JMS 规范中消息的分发方式有两种:非持久化和持久化。对于非持久化消息 JMS 实现者须保证尽最大努力分发消息,但消息不会持久化存储;而持久化方式分发的消息则必须进行持久化存储。非持久化消息经常使用于发送通知或实时数据,当你比较看重系统性能而且即便丢失一些消息并不影响业务正常运做时可选择非持久化消息。持久化消息被发送到消息服务器后若是当前消息的消费者并无运行则该消息继续存在,只有等到消息被处理并被消息消费者确认以后,消息才会从消息服务器中删除。

对以上这两种方式 ActiveMQ 都支持,而且还支持经过缓存在内存中的中间状态消息的方式来恢复消息。归纳起来看 ActiveMQ 的消息存储有三种:存储到内存、存储到文件、存储到数据库。具体使用上 ActiveMQ 提供了一个插件式的消息存储机制,相似于消息的多点传播,主要实现了以下几种:

  • AMQ,是 ActiveMQ 5.0及之前版本默认的消息存储方式,它是一个基于文件的、支持事务的消息存储解决方案。 在此方案下消息自己以日志的形式实现持久化,存放在 Data Log 里。而且还对日志里的消息作了引用索引,方便快速取回消息。
  • KahaDB,也是一种基于文件并具备支持事务的消息存储方式,从5.3开始推荐使用 KahaDB 存储消息,它提供了比 AMQ 消息存储更好的可扩展性和可恢复性。
  • JDBC,基于 JDBC 方式将消息存储在数据库中,将消息存到数据库相对来讲比较慢,因此 ActiveMQ 建议结合 journal 来存储,它使用了快速的缓存写入技术,大大提升了性能。
  • 内存存储,是指将全部要持久化的消息放到内存中,由于这里没有动态的缓存,因此须要注意设置消息服务器的 JVM 和内存大小。
  • LevelDB,5.6版本以后推出了 LevelDB 的持久化引擎,它使用了自定义的索引代替经常使用的 BTree 索引,其持久化性能高于 KahaDB,虽然默认的持久化方式仍是 KahaDB,可是 LevelDB 将是趋势。在5.9版本还提供了基于 LevelDB 和 Zookeeper 的数据复制方式,做为 Master-Slave 方式的首选数据复制方案。

工程实例

Java 访问 ActiveMQ 实例

JMS 规范中传递消息的方式有两种,一种是点对点模型的队列(Queue)方式,另外一种是发布订阅模型的主题(Topic)方式。下面看下用 ActiveMQ 以主题方式传递消息的 Java 示例。

引入依赖

Java 工程中须要引入 ActiveMQ 包的依赖,jar 包版本同你安装 ActiveMQ 版本一致便可:

<dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.15.2</version>
    </dependency>
复制代码
消息生产者
package org.study.mq.activeMQ;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicPublisher {

    /**
     * 默认用户名
     */
    public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    /**
     * 默认密码
     */
    public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * 默认链接地址
     */
    public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        //建立链接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        try {
            //建立链接
            Connection connection = connectionFactory.createConnection();
            //开启链接
            connection.start();
            //建立会话,不须要事务
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //建立 Topic,用做消费者订阅消息
            Topic myTestTopic = session.createTopic("activemq-topic-test1");
            //消息生产者
            MessageProducer producer = session.createProducer(myTestTopic);

            for (int i = 1; i <= 3; i++) {
                TextMessage message = session.createTextMessage("发送消息 " + i);
                producer.send(myTestTopic, message);
            }

            //关闭资源
            session.close();
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
复制代码

在 Topic 模式中消息生产者是用于发布消息的,绝大部分代码与 Queue 模式中类似,不一样的是本例中基于 Session 建立的是主题(Topic),该主题做为消费者消费消息的目的地。

消息消费者
package org.study.mq.activeMQ;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicSubscriber {

    /**
     * 默认用户名
     */
    public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    /**
     * 默认密码
     */
    public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    /**
     * 默认链接地址
     */
    public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    public static void main(String[] args) {
        //建立链接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
        try {
            //建立链接
            Connection connection = connectionFactory.createConnection();
            //开启链接
            connection.start();
            //建立会话,不须要事务
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //建立 Topic
            Topic myTestTopic = session.createTopic("activemq-topic-test1");

            MessageConsumer messageConsumer = session.createConsumer(myTestTopic);
            messageConsumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        System.out.println("消费者1 接收到消息:" + ((TextMessage) message).getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });

            MessageConsumer messageConsumer2 = session.createConsumer(myTestTopic);
            messageConsumer2.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        System.out.println("消费者2 接收到消息:" + ((TextMessage) message).getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });

            MessageConsumer messageConsumer3 = session.createConsumer(myTestTopic);
            messageConsumer3.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        System.out.println("消费者3 接收到消息:" + ((TextMessage) message).getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });

            //让主线程休眠100秒,使消息消费者对象能继续存活一段时间从而能监听到消息
            Thread.sleep(100 * 1000);
            //关闭资源
            session.close();
            connection.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
复制代码

为了展现主题模式中消息广播给多个订阅者的功能,这里建立了三个消费者对象并订阅了同一个主题,比较特殊的是最后让主线程休眠了一段时间,这么作的目的是让消费者对象能继续存活,从而使控制台能打印出监听到的消息内容。

启动 ActiveMQ 服务器

在 ActiveMQ 的 bin 目录下直接执行activemq start即启动了 ActiveMQ

运行 TopicSubscriber

须要先运行 TopicSubscriber 类的 main 方法,这样发布者发布消息的时候订阅者才能接收到消息,若是将执行顺序倒过来则消息先发布出去但没有任何订阅者在运行,则看不到消息被消费了。

运行 TopicPublisher

接着运行 TopicPublisher 类的 main 方法,向主题中发布3条消息,而后能够在 TopicSubscriber 后台看到接收到的消息内容:

消费者接收到消息

Spring 整合 ActiveMQ

在实际项目中若是使用原生的 ActiveMQ API 开发显然比较啰嗦,这中间建立链接工厂、建立链接之类代码彻底能够抽取出来由框架统一作,这些事情 Spring 也想到了并帮咱们作了。ActiveMQ 彻底支持基于 Spring 的方式 配置 JMS 客户端和服务器,下面的例子展现一下在 Spring 中如何使用队列模式和主题模式传递消息。

引入依赖
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.2</version>
</dependency>

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>4.3.10.RELEASE</version>
</dependency>

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
    <version>5.15.0</version>
</dependency>
复制代码

工程中除了 activemq 的包以外还要添加 Spring 支持 JMS 的包。因为 connection、session、producer 的建立会消耗大量系统资源,为此这里使用 链接池 来复用这些资源,因此还要添加 activemq-pool 的依赖。

Spring 配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">
    <context:component-scan base-package="org.study.mq.activeMQ.spring"/>

    <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL">
                    <value>tcp://localhost:61616</value>
                </property>
            </bean>
        </property>
        <property name="maxConnections" value="100"></property>
    </bean>
    <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="jmsFactory"/>
        <property name="sessionCacheSize" value="1"/>
    </bean>
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>

    <bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg name="name" value="spring-queue"/>
    </bean>
    <bean id="testTopic" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg index="0" value="spring-topic"/>
    </bean>

    <bean id="queueListener" class="org.study.mq.activeMQ.spring.QueueListener"/>
    <bean id="topic1Listener" class="org.study.mq.activeMQ.spring.Topic1Listener"/>
    <bean id="topic2Listener" class="org.study.mq.activeMQ.spring.Topic2Listener"/>

    <bean id="queueContainer"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <property name="destination" ref="testQueue"/>
        <property name="messageListener" ref="queueListener"/>
    </bean>
    <bean id="topic1Container"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <property name="destination" ref="testTopic"/>
        <property name="messageListener" ref="topic1Listener"/>
    </bean>
    <bean id="topic2Container"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <property name="destination" ref="testTopic"/>
        <property name="messageListener" ref="topic2Listener"/>
    </bean>

</beans>
复制代码

下面的项目示例中的 Java 代码采用注解的方式,这也是如今不少程序员的习惯用法,因此在配置文件一开始定义注解扫描包路径org.study.mq.activeMQ.spring,您能够根据本身实际状况修改包名称,本例中的全部 Java 代码都放在该包之下。

接下来定义了一个 JMS 工厂 bean,采用的是池化链接工厂类org.apache.activemq.pool.PooledConnectionFactory,实际就是对内部的 ActiveMQ 链接工厂增长了链接池的功能,从其内部配置能够看到就是对org.apache.activemq.ActiveMQConnectionFactory的功能封装,而ActiveMQConnectionFactory类则比较熟悉了,就是上面 Java 访问 ActiveMQ 示例一开始建立链接工厂时使用的类。brokerURL 属性配置的就是链接服务器的协议和服务器地址。接下来的 cachingConnectionFactory 是实际项目代码中经常使用的,对链接工厂的又一层加强,使用链接的缓存功能以提高效率,读者可酌情选择使用。

jmsTemplate 就是 Spring 解决 JMS 访问时冗长重复代码的方案,它须要配置的两个主要属性是 connectionFactory 和 messageConverter,经过 connectionFactory 获取链接、会话等对象, messageConverter 则是配置消息转换器,由于一般消息在发送前和接收后都须要进行一个前置和后置处理,转换器便进行这个工做。这样实际代码直接经过 jmsTemplate 来发送和接收消息,而每次发送接收消息时建立链接工厂、建立链接、建立会话等工做都由 Spring 框架作了。

有了 JMS 模板还须要知道队列和主题做为实际发送和接收消息的目的地,因此接下来定义了 testQueue 和 testTopic 做为两种模式的示例。而异步接收消息时则须要提供 MessageListener 的实现类,因此定义了 queueListener 做为队列模式下异步接收消息的监听器,topic1Listener 和 topic2Listener 做为主题模式下异步接收消息的监听器,主题模式用两个监听器是为了演示多个消费者时都能收到消息。最后的 queueContainer、topic1Container、topic2Container 用于将消息监听器绑定到具体的消息目的地上。

消息服务类

下面是使用 JMS 模板处理消息的消息服务类

package org.study.mq.activeMQ.spring;

import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.jms.*;

@Service
public class MessageService {

    @Resource(name = "jmsTemplate")
    private JmsTemplate jmsTemplate;

    @Resource(name = "testQueue")
    private Destination testQueue;

    @Resource(name = "testTopic")
    private Destination testTopic;

    //向队列发送消息
    public void sendQueueMessage(String messageContent) {
        jmsTemplate.send(testQueue, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage msg = session.createTextMessage();
                // 设置消息内容
                msg.setText(messageContent);
                return msg;
            }
        });

    }

    //向主题发送消息
    public void sendTopicMessage(String messageContent) {
        jmsTemplate.send(testTopic, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage msg = session.createTextMessage();
                // 设置消息内容
                msg.setText(messageContent);
                return msg;
            }
        });

    }
}
复制代码

@Service 将该类声明为一个服务,实际项目中不少服务代码也相似。经过 Resource 注解直接将上面配置文件中定义的 jmsTemplate 引入到 MessageService 类中就能够直接使用了,testQueue 和 testTopic 也是相似,服务类中直接引入配置文件中定义好的队列和主题。重点是下面的两个发送消息的方法,sendQueueMessage 向队列发送消息,sendTopicMessage 向主题发送消息,两种模式都使用了 jmsTemplate 的 send 方法,send 方法第1个参数是javax.jms.Destination类型,表示消息目的地。因为javax.jms.Queuejavax.jms.Topic都继承了javax.jms.Destination接口,因此该方法对队列模式和主题模式都适用。send 方法的第2个参数是org.springframework.jms.core.MessageCreator,这里使用了匿名内部类的方式建立对象,从支持的 Session 对象中建立文本消息,这样就能够发送消息了。能够看到不管是队列仍是主题,经过 Spring 框架来发送消息的代码比以前的 Java 代码示例简洁了不少。

消息监听器类
package org.study.mq.activeMQ.spring;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class QueueListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                TextMessage txtMsg = (TextMessage) message;
                String messageStr = txtMsg.getText();
                System.out.println("队列监听器接收到文本消息:" + messageStr);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        } else {
            throw new IllegalArgumentException("只支持 TextMessage 类型消息!");
        }
    }
}
复制代码

队列消息监听器在收到消息时校验是不是文本消息类型,是的话则打印出内容。

package org.study.mq.activeMQ.spring;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class Topic1Listener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                TextMessage txtMsg = (TextMessage) message;
                String messageStr = txtMsg.getText();
                System.out.println("主题监听器1 接收到文本消息:" + messageStr);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        } else {
            throw new IllegalArgumentException("只支持 TextMessage 类型消息!");
        }
    }
}
复制代码
package org.study.mq.activeMQ.spring;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class Topic2Listener implements MessageListener {
    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                TextMessage txtMsg = (TextMessage) message;
                String messageStr = txtMsg.getText();
                System.out.println("主题监听器2 接收到文本消息:" + messageStr);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        } else {
            throw new IllegalArgumentException("只支持 TextMessage 类型消息!");
        }
    }
}
复制代码

主题监听器的代码与队列监听器相似,只是打印时经过不一样字符串表示当前是不一样监听器接收的消息。

启动应用

为了演示例子,写了一个 StartApplication 类,在 main 方法中加载 Spring ,获取到 MessageService 服务以后调用 sendQueueMessage 和 sendTopicMessage 方法发送消息。

package org.study.mq.activeMQ.spring;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class StartApplication {
    public static void main(String[] args) {
        ApplicationContext ctx = new ClassPathXmlApplicationContext("spring-context.xml");
        MessageService messageService = (MessageService) ctx.getBean("messageService");

        messageService.sendQueueMessage("个人测试消息1");
        messageService.sendTopicMessage("个人测试消息2");
        messageService.sendTopicMessage("个人测试消息3");
    }

}
复制代码

启动好 activeMQ 服务以后运行 StartApplication 类,在控制台看到接收到文本消息:

接收到文本消息

队列监听器监听到了一条消息,两个主题监听器分别监听到了两条消息。

相关文章
相关标签/搜索