ActiveMQ 是由 Apache 出品的一款开源消息中间件,旨在为应用程序提供高效、可扩展、稳定、安全的企业级消息通讯。 它的设计目标是提供标准的、面向消息的、多语言的应用集成消息通讯中间件。ActiveMQ 实现了 JMS 1.1 并提供了不少附加的特性,好比 JMX 管理、主从管理、消息组通讯、消息优先级、延迟接收消息、虚拟接收者、消息持久化、消息队列监控等等。其主要特性有:html
由于 ActiveMQ 是完整支持 JMS 1.1 的,因此从 Java 使用者的角度其基本概念与 JMS 1.1 规范是一致的。java
点对点模型(Point to Point) 使用队列(Queue)做为消息通讯载体,知足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。程序员
发布订阅模型(Pub/Sub) 使用主题做为消息通讯载体,相似于广播模式,发布者发布一条消息,该消息经过主题传递给全部的订阅者,在一条消息广播以后才订阅的用户则是收不到该条消息的。web
ActiveMQ 使用时包含的基本组件各与 JMS 是相同的:spring
因为这些概念在 JMS 中已介绍过,这里再也不详细介绍。数据库
ActiveMQ Broker 的主要做用是为客户端应用提供一种通讯机制,为此 ActiveMQ 提供了一种链接机制,并用链接器(connector)来描述这种链接机制。ActiveMQ 中链接器有两种,一种是用于客户端与消息代理服务器(client-to-broker)之间通讯的传输链接器(transport connector),一种是用于消息代理服务器之间(broker-to-broker)通讯的网络链接器(network connector)。connector 使用 URI(统一资源定位符)来表示,URI 格式为: <schema name>:<hierarchical part>[?<query>][#<fragment>]
schema name 表示协议, 例如:foo://username:password@example.com:8042/over/there/index.dtb?type=animal&name=narwhal#noseapache
其中 schema name 部分是 foo,hierarchical part 是 username:password@example.com:8042/over/there/index.dtb,query 是 type=animal&name=narwhal,fragment 是 nose。缓存
<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61616" discoveryUri="multicast://default"/>
<transportConnector name="ssl" uri="ssl://localhost:61617"/>
<transportConnector name="stomp" uri="stomp://localhost:61613"/>
<transportConnector name="ws" uri="ws://localhost:61614/" />
</transportConnectors>
复制代码
传输链接器定义在<transportConnectors>
元素中,一个<transportConnector>
元素定义一个特定的链接器,一个链接器必须有本身惟一的名字和 URI 属性,但discoveryUri
属性是可选的。目前在 ActiveMQ 最新的5.15版本中经常使用的传输链接器链接协议有:vm、tcp、udp、multicast、nio、ssl、http、https、websocket、amqp、mqtt、stomp 等等安全
每一个协议的具体配置见官网(http://activemq.apache.org/uri-protocols.html )。除了以上这些基本协议以外 ActiveMQ 还支持一些高级协议也能够经过 URI 的方式进行配置,好比 Failover 和 Fanout 。bash
如图所示,服务器 S1 和 S2 经过 NewworkConnector 相连,生产者 P1 发送的消息,消费者 C3 和 C4 均可以接收到,而生产者 P3 发送的消息,消费者 C1 和 C2 也能够接收到。要使用网络链接器的功能须要在服务器 S1 的 activemq.xml 中的 broker 节点下添加以下配置(假设192.168.11.23:61617 为 S2 的地址):
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.11.23:61617)"/>
</networkConnectors>
复制代码
若是只是这样,S1 能够将消息发送到 S2,但这只是单方向的通讯,发送到 S2 上的的消息还不能发送到 S1 上。若是想 S1 也收到从 S2 发来的消息须要在 S2 的 activemq.xml 中的 broker 节点下也添加以下配置(假设192.168.11.45:61617为 S1 的地址):
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.11.45:61617)"/>
</networkConnectors>
复制代码
这样,S1和S2就能够双向通讯了。目前在 ActiveMQ 最新的5.15版本中经常使用的网络链接器协议有 static 和 multicast 两种。
static://(tcp://ip:61616,tcp://ip2:61616)
对这块感兴趣的话能够看官方文档:http://activemq.apache.org/networks-of-brokers.html
JMS 规范中消息的分发方式有两种:非持久化和持久化。对于非持久化消息 JMS 实现者须保证尽最大努力分发消息,但消息不会持久化存储;而持久化方式分发的消息则必须进行持久化存储。非持久化消息经常使用于发送通知或实时数据,当你比较看重系统性能而且即便丢失一些消息并不影响业务正常运做时可选择非持久化消息。持久化消息被发送到消息服务器后若是当前消息的消费者并无运行则该消息继续存在,只有等到消息被处理并被消息消费者确认以后,消息才会从消息服务器中删除。
对以上这两种方式 ActiveMQ 都支持,而且还支持经过缓存在内存中的中间状态消息的方式来恢复消息。归纳起来看 ActiveMQ 的消息存储有三种:存储到内存、存储到文件、存储到数据库。具体使用上 ActiveMQ 提供了一个插件式的消息存储机制,相似于消息的多点传播,主要实现了以下几种:
JMS 规范中传递消息的方式有两种,一种是点对点模型的队列(Queue)方式,另外一种是发布订阅模型的主题(Topic)方式。下面看下用 ActiveMQ 以主题方式传递消息的 Java 示例。
Java 工程中须要引入 ActiveMQ 包的依赖,jar 包版本同你安装 ActiveMQ 版本一致便可:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.2</version>
</dependency>
复制代码
package org.study.mq.activeMQ;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class TopicPublisher {
/**
* 默认用户名
*/
public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认密码
*/
public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认链接地址
*/
public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
//建立链接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
try {
//建立链接
Connection connection = connectionFactory.createConnection();
//开启链接
connection.start();
//建立会话,不须要事务
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//建立 Topic,用做消费者订阅消息
Topic myTestTopic = session.createTopic("activemq-topic-test1");
//消息生产者
MessageProducer producer = session.createProducer(myTestTopic);
for (int i = 1; i <= 3; i++) {
TextMessage message = session.createTextMessage("发送消息 " + i);
producer.send(myTestTopic, message);
}
//关闭资源
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
复制代码
在 Topic 模式中消息生产者是用于发布消息的,绝大部分代码与 Queue 模式中类似,不一样的是本例中基于 Session 建立的是主题(Topic),该主题做为消费者消费消息的目的地。
package org.study.mq.activeMQ;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class TopicSubscriber {
/**
* 默认用户名
*/
public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
/**
* 默认密码
*/
public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* 默认链接地址
*/
public static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
//建立链接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
try {
//建立链接
Connection connection = connectionFactory.createConnection();
//开启链接
connection.start();
//建立会话,不须要事务
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//建立 Topic
Topic myTestTopic = session.createTopic("activemq-topic-test1");
MessageConsumer messageConsumer = session.createConsumer(myTestTopic);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println("消费者1 接收到消息:" + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
MessageConsumer messageConsumer2 = session.createConsumer(myTestTopic);
messageConsumer2.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println("消费者2 接收到消息:" + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
MessageConsumer messageConsumer3 = session.createConsumer(myTestTopic);
messageConsumer3.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println("消费者3 接收到消息:" + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//让主线程休眠100秒,使消息消费者对象能继续存活一段时间从而能监听到消息
Thread.sleep(100 * 1000);
//关闭资源
session.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
复制代码
为了展现主题模式中消息广播给多个订阅者的功能,这里建立了三个消费者对象并订阅了同一个主题,比较特殊的是最后让主线程休眠了一段时间,这么作的目的是让消费者对象能继续存活,从而使控制台能打印出监听到的消息内容。
在 ActiveMQ 的 bin 目录下直接执行activemq start
即启动了 ActiveMQ
须要先运行 TopicSubscriber 类的 main 方法,这样发布者发布消息的时候订阅者才能接收到消息,若是将执行顺序倒过来则消息先发布出去但没有任何订阅者在运行,则看不到消息被消费了。
接着运行 TopicPublisher 类的 main 方法,向主题中发布3条消息,而后能够在 TopicSubscriber 后台看到接收到的消息内容:
在实际项目中若是使用原生的 ActiveMQ API 开发显然比较啰嗦,这中间建立链接工厂、建立链接之类代码彻底能够抽取出来由框架统一作,这些事情 Spring 也想到了并帮咱们作了。ActiveMQ 彻底支持基于 Spring 的方式 配置 JMS 客户端和服务器,下面的例子展现一下在 Spring 中如何使用队列模式和主题模式传递消息。
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.2</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.3.10.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.0</version>
</dependency>
复制代码
工程中除了 activemq 的包以外还要添加 Spring 支持 JMS 的包。因为 connection、session、producer 的建立会消耗大量系统资源,为此这里使用 链接池 来复用这些资源,因此还要添加 activemq-pool 的依赖。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<context:component-scan base-package="org.study.mq.activeMQ.spring"/>
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>tcp://localhost:61616</value>
</property>
</bean>
</property>
<property name="maxConnections" value="100"></property>
</bean>
<bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="jmsFactory"/>
<property name="sessionCacheSize" value="1"/>
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<property name="messageConverter">
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
</property>
</bean>
<bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg name="name" value="spring-queue"/>
</bean>
<bean id="testTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="spring-topic"/>
</bean>
<bean id="queueListener" class="org.study.mq.activeMQ.spring.QueueListener"/>
<bean id="topic1Listener" class="org.study.mq.activeMQ.spring.Topic1Listener"/>
<bean id="topic2Listener" class="org.study.mq.activeMQ.spring.Topic2Listener"/>
<bean id="queueContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<property name="destination" ref="testQueue"/>
<property name="messageListener" ref="queueListener"/>
</bean>
<bean id="topic1Container"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<property name="destination" ref="testTopic"/>
<property name="messageListener" ref="topic1Listener"/>
</bean>
<bean id="topic2Container"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<property name="destination" ref="testTopic"/>
<property name="messageListener" ref="topic2Listener"/>
</bean>
</beans>
复制代码
下面的项目示例中的 Java 代码采用注解的方式,这也是如今不少程序员的习惯用法,因此在配置文件一开始定义注解扫描包路径org.study.mq.activeMQ.spring
,您能够根据本身实际状况修改包名称,本例中的全部 Java 代码都放在该包之下。
接下来定义了一个 JMS 工厂 bean,采用的是池化链接工厂类org.apache.activemq.pool.PooledConnectionFactory
,实际就是对内部的 ActiveMQ 链接工厂增长了链接池的功能,从其内部配置能够看到就是对org.apache.activemq.ActiveMQConnectionFactory
的功能封装,而ActiveMQConnectionFactory
类则比较熟悉了,就是上面 Java 访问 ActiveMQ 示例一开始建立链接工厂时使用的类。brokerURL 属性配置的就是链接服务器的协议和服务器地址。接下来的 cachingConnectionFactory 是实际项目代码中经常使用的,对链接工厂的又一层加强,使用链接的缓存功能以提高效率,读者可酌情选择使用。
jmsTemplate 就是 Spring 解决 JMS 访问时冗长重复代码的方案,它须要配置的两个主要属性是 connectionFactory 和 messageConverter,经过 connectionFactory 获取链接、会话等对象, messageConverter 则是配置消息转换器,由于一般消息在发送前和接收后都须要进行一个前置和后置处理,转换器便进行这个工做。这样实际代码直接经过 jmsTemplate 来发送和接收消息,而每次发送接收消息时建立链接工厂、建立链接、建立会话等工做都由 Spring 框架作了。
有了 JMS 模板还须要知道队列和主题做为实际发送和接收消息的目的地,因此接下来定义了 testQueue 和 testTopic 做为两种模式的示例。而异步接收消息时则须要提供 MessageListener 的实现类,因此定义了 queueListener 做为队列模式下异步接收消息的监听器,topic1Listener 和 topic2Listener 做为主题模式下异步接收消息的监听器,主题模式用两个监听器是为了演示多个消费者时都能收到消息。最后的 queueContainer、topic1Container、topic2Container 用于将消息监听器绑定到具体的消息目的地上。
下面是使用 JMS 模板处理消息的消息服务类
package org.study.mq.activeMQ.spring;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import javax.jms.*;
@Service
public class MessageService {
@Resource(name = "jmsTemplate")
private JmsTemplate jmsTemplate;
@Resource(name = "testQueue")
private Destination testQueue;
@Resource(name = "testTopic")
private Destination testTopic;
//向队列发送消息
public void sendQueueMessage(String messageContent) {
jmsTemplate.send(testQueue, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage msg = session.createTextMessage();
// 设置消息内容
msg.setText(messageContent);
return msg;
}
});
}
//向主题发送消息
public void sendTopicMessage(String messageContent) {
jmsTemplate.send(testTopic, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage msg = session.createTextMessage();
// 设置消息内容
msg.setText(messageContent);
return msg;
}
});
}
}
复制代码
@Service 将该类声明为一个服务,实际项目中不少服务代码也相似。经过 Resource 注解直接将上面配置文件中定义的 jmsTemplate 引入到 MessageService 类中就能够直接使用了,testQueue 和 testTopic 也是相似,服务类中直接引入配置文件中定义好的队列和主题。重点是下面的两个发送消息的方法,sendQueueMessage 向队列发送消息,sendTopicMessage 向主题发送消息,两种模式都使用了 jmsTemplate 的 send 方法,send 方法第1个参数是javax.jms.Destination
类型,表示消息目的地。因为javax.jms.Queue
和javax.jms.Topic
都继承了javax.jms.Destination
接口,因此该方法对队列模式和主题模式都适用。send 方法的第2个参数是org.springframework.jms.core.MessageCreator
,这里使用了匿名内部类的方式建立对象,从支持的 Session 对象中建立文本消息,这样就能够发送消息了。能够看到不管是队列仍是主题,经过 Spring 框架来发送消息的代码比以前的 Java 代码示例简洁了不少。
package org.study.mq.activeMQ.spring;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class QueueListener implements MessageListener {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
TextMessage txtMsg = (TextMessage) message;
String messageStr = txtMsg.getText();
System.out.println("队列监听器接收到文本消息:" + messageStr);
} catch (JMSException e) {
e.printStackTrace();
}
} else {
throw new IllegalArgumentException("只支持 TextMessage 类型消息!");
}
}
}
复制代码
队列消息监听器在收到消息时校验是不是文本消息类型,是的话则打印出内容。
package org.study.mq.activeMQ.spring;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class Topic1Listener implements MessageListener {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
TextMessage txtMsg = (TextMessage) message;
String messageStr = txtMsg.getText();
System.out.println("主题监听器1 接收到文本消息:" + messageStr);
} catch (JMSException e) {
e.printStackTrace();
}
} else {
throw new IllegalArgumentException("只支持 TextMessage 类型消息!");
}
}
}
复制代码
package org.study.mq.activeMQ.spring;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class Topic2Listener implements MessageListener {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
TextMessage txtMsg = (TextMessage) message;
String messageStr = txtMsg.getText();
System.out.println("主题监听器2 接收到文本消息:" + messageStr);
} catch (JMSException e) {
e.printStackTrace();
}
} else {
throw new IllegalArgumentException("只支持 TextMessage 类型消息!");
}
}
}
复制代码
主题监听器的代码与队列监听器相似,只是打印时经过不一样字符串表示当前是不一样监听器接收的消息。
为了演示例子,写了一个 StartApplication 类,在 main 方法中加载 Spring ,获取到 MessageService 服务以后调用 sendQueueMessage 和 sendTopicMessage 方法发送消息。
package org.study.mq.activeMQ.spring;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class StartApplication {
public static void main(String[] args) {
ApplicationContext ctx = new ClassPathXmlApplicationContext("spring-context.xml");
MessageService messageService = (MessageService) ctx.getBean("messageService");
messageService.sendQueueMessage("个人测试消息1");
messageService.sendTopicMessage("个人测试消息2");
messageService.sendTopicMessage("个人测试消息3");
}
}
复制代码
启动好 activeMQ 服务以后运行 StartApplication 类,在控制台看到接收到文本消息:
队列监听器监听到了一条消息,两个主题监听器分别监听到了两条消息。