所谓异步消息,跟RMI远程调用、webservice调用是相似的,异步消息也是用于应用程序之间的通讯。可是它们之间的区别是:javascript
Java消息服务(Java Message Service,即:JMS)是Java中关于面向消息中间件(MOM)的API,用于在两个应用程序之间或分布式系统中发送消息,进行异步通讯。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持php
JMS是Java平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java应用程序进行消息交换,而且经过提供标准的产生、发送、接收消息的接口简化企业应用的开发,其做用相似于JDBChtml
在异步消息中有两个重要的概念,分别是:消息代理和目的地java
当咱们在一个应用中发送一条消息时,会将该消息移交给一个消息代理(PS:通常是一些消息中间件,如:ActiveMQ )。在这里,消息代理就相似于邮局,消息代理能够确保消息被投递到指定的目的地,同时解放消息发送者,使其可以继续进行其余业务web
一样,每条异步消息在被消息发送者发送时都要指定一个目的地(PS:用于区别不一样类型的消息),而后消息接收者就能够根据本身的业务需求从指定的目的地(PS:消息仍是在消息中间件存放,目的是是用于区别不一样类型的消息)获取本身所需的消息并进行处理spring
队列也便是:点对点消息模型apache
在点对点模型中,每条消息分别只有一个发送者和接收者。也就是说,当消息代理获得发送者发送的消息时,它会将该消息放入到一个队列中。当某一个消息接收者(PS:同一目的地的消息接收者可能存在多个)请求队列中的下一条消息时,消息会从队列中取出并投递给该接收者。以后该条消息将会从队列中删除,这样就能够保证一条消息只投递给一个接收者数组
主题也便是:发布——订阅消息模型缓存
在发布——订阅消息模型中,每条消息能够由多个接收者接收。也就是说,消息再也不是只投递给一个接收者,而是主题的全部订阅者都会收到该消息的副本网络
在同步通讯中,若是客户端与远程服务频繁通讯,或者远程服务响应很慢,就会对客户端应用的性能带来负面影响
当使用JMS发送消息时,客户端没必要等待消息被处理,甚至是被投递,客户端只须要将消息发送给消息代理,就能够确信消息会被投递给相应的目的地
由于不须要等待,因此客户端能够继续执行其余任务,这种方式能够有效的节省时间,客户端的性能可以获得极大的提升
在同步通讯中,客户端经过服务接口与远程服务相耦合,若是服务的接口发生变化,那么此服务的全部客户端都须要作相应的改变
当使用JMS发送消息时,发送异步消息是以数据为中心的。这意味着客户端并无与特定的方法签名绑定,任何能够处理数据的队列或主题订阅者均可以处理由客户端发送的消息,而客户端没必要了解远程服务的任何规范
同步RPC服务一般须要网络地址来定位。这意味着客户端没法灵活地适应网络拓扑的改变。若是服务的IP地址改变了,或者服务被配置为监听其余端口,客户端必须进行相应的调整,不然没法访问服务。
与之相反,消息客户端没必要知道谁会处理它们的消息,或者服务的位置在哪里。客户端只须要了解须要经过哪一个队列或主题来发送消息。所以,只服务可以从队列或主题中获取便可,消息客户端根本不须要关注服务来自哪里
在点对点模型中,能够利用这种位置的独立性来建立消息服务集群。若是客户端不知道服务的位置,而且服务的惟一要求就是能够访问消息代理,那么咱们就能够配置多个服务从同一个队列中接收消息。若是服务过载,处理能力不足,咱们只须要添加一些新的的服务(接收者)实例来监听相同的队列便可平滑加强其处理能力
在发布一订阅模型中,位置独立性会产生另外一种有趣的效应。多个服务能够订阅同一个主题,接收相同消息的副本。可是每个服务对消息的处理方式却可能不一样。例如,假设咱们有一组能够共同处理描述新员工信息的消息。一个服务可能会在工资系统中增长该员工,另外一个服务则会将新员工增长到公司交流群中,同时还有一个服务为新员工分配内网系统的访问权限。在这里,每个服务都是基于相同的数据(都是从同一个主题接收而来),可是却各自对数据进行了不一样的处理
在上面的内容中,我介绍了一些关于异步消息的基本概念。下面我将介绍基于ActiveMQ框架的JMS消息的发送与接收以及ActiveMQ在Spring框架中的一些经常使用用法
在正式开始编写测试实例以前,咱们须要作的是ActiveMQ的下载。其官方下载地址是:activemq.apache.org/download.ht…
而后运行:apache-activemq-5.14.1/bin/win64/activemq.bat ,接着保持控制台窗口不关闭,访问:http://127.0.0.1:8161/admin/
注:默认帐号密码是:admin/admin
package cn.zifangsky.test.base;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/** * 消息生产者 * @author zifangsky * */
public class JMSProducer {
//默认链接用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//默认链接密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//默认链接地址
private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
//链接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
try {
//链接
Connection connection = connectionFactory.createConnection();
//启动链接
connection.start();
//建立session
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//消息目的地
Destination destination = session.createQueue("hello");
//消息生产者
MessageProducer producer = session.createProducer(destination);
//发送消息
for(int i=0;i<10;i++){
//建立一条文本消息
TextMessage message = session.createTextMessage("ActiveMQ:这是第 " + i + " 条消息");
//生产者发送消息
producer.send(message);
}
session.commit();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}复制代码
运行上面的代码以后能够发现ActiveMQ的队列监控界面出现了变化:
package cn.zifangsky.test.base;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/** * 消息消费者 * @author zifangsky * */
public class JMSConsumer {
//默认链接用户名
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
//默认链接密码
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
//默认链接地址
private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
public static void main(String[] args) {
//链接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
try {
//链接
Connection connection = connectionFactory.createConnection();
//启动链接
connection.start();
//建立session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//消息目的地
Destination destination = session.createQueue("hello");
//消息消费者
MessageConsumer consumer = session.createConsumer(destination);
while(true){
TextMessage message = (TextMessage) consumer.receive();
if(message != null){
System.out.println("接收到消息: " + message.getText());
}else{
break;
}
}
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}复制代码
运行代码以后,输出以下:
接收到消息: ActiveMQ:这是第 0 条消息
接收到消息: ActiveMQ:这是第 1 条消息
接收到消息: ActiveMQ:这是第 2 条消息
接收到消息: ActiveMQ:这是第 3 条消息
接收到消息: ActiveMQ:这是第 4 条消息
接收到消息: ActiveMQ:这是第 5 条消息
接收到消息: ActiveMQ:这是第 6 条消息
接收到消息: ActiveMQ:这是第 7 条消息
接收到消息: ActiveMQ:这是第 8 条消息
接收到消息: ActiveMQ:这是第 9 条消息
固然,此时观察ActiveMQ的队列监控界面,能够发现这10条消息已经被消费了
注:上面的代码很简单,而且其思路跟JDBC很相似,所以这里就不作过多解释了
若是写过不少的JDBC代码的话,能够发现使用基本的JMS来发送和接收消息就跟JDBC代码同样,须要每次写不少冗长重复的代码。
针对如何消除冗长和重复的JMS代码,Spring给出的解决方案是JmsTemplate。JmsTemplate能够建立链接、得到会话以及发送和接收消息。这使得咱们能够专一于构建要发送的消息或者处理接收到的消息
另外,JmsTemplate能够处理全部抛出的笨拙的JMsException异常。若是在使用JmsTemplate时抛出JMsException异常,JmsTemplate将捕获该异常,而后抛出一个非检查型异常,该异常是Spring自带的JmsException异常的子类
activemq.ip=127.0.0.1
activemq.username=admin
activemq.passwd=admin复制代码
这个文件配置了ActiveMQ的地址以及认证的帐号密码
<bean id="configProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
<property name="locations">
<list>
<value>classpath:jdbc.properties</value>
<value>classpath:activemq.properties</value>
</list>
</property>
</bean>
<bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PreferencesPlaceholderConfigurer">
<property name="properties" ref="configProperties" />
</bean>复制代码
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:jms="http://www.springframework.org/schema/jms" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.0.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.14.1.xsd">
<context:component-scan base-package="cn.zifangsky.activemq" />
<!-- ActiveMQ 链接工厂 -->
<!-- <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://${activemq.ip}:61616" userName="${activemq.username}" password="${activemq.passwd}" /> -->
<bean id="amqConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://${activemq.ip}:61616"/>
<property name="userName" value="${activemq.username}" />
<property name="password" value="${activemq.passwd}" />
<!-- <property name="trustAllPackages" value="true"/> -->
<property name="trustedPackages">
<list>
<value>java.lang</value>
<value>javax.security</value>
<value>java.util</value>
<value>org.apache.activemq</value>
<value>cn.zifangsky.activemq</value>
<value>cn.zifangsky.model</value>
</list>
</property>
</bean>
<!-- Spring Caching链接工厂 -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory" />
<!-- Session缓存数量 -->
<property name="sessionCacheSize" value="100" />
</bean>
<!-- 定义Queue类型的JmsTemplate -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory" />
<!-- 非pub/sub模型(发布/订阅),即:队列模型 -->
<property name="pubSubDomain" value="false" />
</bean>
<!-- 定义Queue监听器 -->
<jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="test.queue" ref="testQueueReceiver1"/>
<jms:listener destination="test.queue" ref="testQueueReceiver2"/>
</jms:listener-container>
</beans>复制代码
固然,在web.xml中须要加载该配置文件才行:
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>
classpath:context/context.xml
classpath:context/context_*.xml
</param-value>
</context-param>复制代码
在上面的context_activemq.xml文件中,首先是定义了自动扫描cn.zifangsky.activemq 这个包下面的注解,在后面配置的两个接收者:testQueueReceiver一、testQueueReceiver2 的bean就是这样被加载进来的
接着,amqConnectionFactory这个bean配置了ActiveMQ的链接参数(PS:经过配置文件加载进来),以及可信任的能够被序列化的类的包路径
再日后,jmsQueueTemplate这个bean配置了一个JmsTemplate的实例,固然这里定义的是一个队列模型
最后,使用jms:listener-container配置了两个消息监听器,其监听的目的地都是“test.queue”,处理的接收者分别是:testQueueReceiver1 和 testQueueReceiver2
package cn.zifangsky.activemq.producer;
import javax.annotation.Resource;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
@Component("queueSender")
public class QueueSender {
@Resource(name="jmsQueueTemplate")
private JmsTemplate jmsTemplate;
/** * 发送一条消息到指定队列 * @param queueName 队列名称 * @param message 消息内容 */
public void send(String queueName,final String message){
jmsTemplate.send(queueName, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(message);
}
});
}
}复制代码
从上面的代码能够看出,这里仅仅只是使用JmsTemplate的send( )方法建立了一条文本消息
QueueReceiver1.java:
package cn.zifangsky.activemq.consumer;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.springframework.stereotype.Component;
@Component("testQueueReceiver1")
public class QueueReceiver1 implements MessageListener{
@Override
public void onMessage(Message message) {
try {
System.out.println("QueueReceiver1收到消息: " + ((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}复制代码
QueueReceiver2.java:
package cn.zifangsky.activemq.consumer;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.springframework.stereotype.Component;
@Component("testQueueReceiver2")
public class QueueReceiver2 implements MessageListener{
@Override
public void onMessage(Message message) {
try {
System.out.println("QueueReceiver2收到消息: " + ((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}复制代码
package cn.zifangsky.test.springjms;
import javax.annotation.Resource;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import cn.zifangsky.activemq.producer.QueueSender;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:/context/context.xml","classpath:/context/context_activemq.xml"})
public class TestQueue {
private final String QUEUENAME = "test.queue";
@Resource(name="queueSender")
private QueueSender queueSender;
@Test
public void test(){
for(int i=0;i<10;i++){
queueSender.send(QUEUENAME, "Hi,这是第 " + (i+1) + " 条消息!");
}
}
}复制代码
运行这个单元测试方法以后,能够发现输出结果以下:
QueueReceiver2收到消息: Hi,这是第 1 条消息!
QueueReceiver1收到消息: Hi,这是第 2 条消息!
QueueReceiver2收到消息: Hi,这是第 3 条消息!
QueueReceiver1收到消息: Hi,这是第 4 条消息!
QueueReceiver2收到消息: Hi,这是第 5 条消息!
QueueReceiver1收到消息: Hi,这是第 6 条消息!
QueueReceiver2收到消息: Hi,这是第 7 条消息!
QueueReceiver1收到消息: Hi,这是第 8 条消息!
QueueReceiver2收到消息: Hi,这是第 9 条消息!
QueueReceiver1收到消息: Hi,这是第 10 条消息!
从上面的输出结果能够看出,队列类型的消息只能被某一个接收者接收并处理
上面的例子很显然在发送和接收消息的时候写的代码要比纯粹的JMS要少不少,那么是否是就真的没有更简洁的代码了呢?
答案固然是否,第一是在发送消息的时候使用了JmsTemplate的send( ) 方法来发送消息。其实,除了send( )方法,JmsTemplate还提供了convertAndSend( )方法,与send( ) 方法不一样的是,convertAndSend( )方法并不须要MessageCreator做为参数。这是由于convertAndSend( )方法会使用内置的消息转换器(message converter)为咱们建立消息
<!-- 定义Queue类型的JmsTemplate -->
<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory" />
<!-- 非pub/sub模型(发布/订阅),即:队列模型 -->
<property name="pubSubDomain" value="false" />
<property name="defaultDestinationName" value="test.queue"/>
</bean>复制代码
改写以后的方法以下所示:
/** * 发送一条消息到指定队列 * @param message 消息内容 */
public void send(final String message){
jmsTemplate.convertAndSend(message);
}复制代码
除了上面使用的内置的消息转换器以外,Spring还为通用的转换任务提供了多个消息转换器(org.springframework.jms.support.converter
包中)
消息转换器 | 功能 |
---|---|
MappingJackson2MessageConverter | 使用Jackson2 JSON库实现消息与JSON格式的相互转换 |
MarshallingMessageConverter | 使用JAXB库实现消息与XML格式之间的相互转换 |
SimpleMessageConverter | 实现String与TextMessage之间的相互转换、字节数组与BytesMessage之间的相互转换、Map与MapMessage之间的相互转换以及Serializable对象与ObjectMessage之间的相互转换(PS:对象的序列化与反序列化) |
注:默认状况下,JmsTemplate会在convertAndSend( )方法中使用SimpleMessageConverter这个消息转换器。若是须要手动执行消息转化器的话,能够这样修改:
<bean id="jmsMessageConverter" class="org.springframework.jms.support.converter.MappingJackson2MessageConverter" />
<bean id="jmsQueueTemplate2" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory" />
<property name="pubSubDomain" value="false" />
<property name="messageConverter" ref="jmsMessageConverter" />
</bean>复制代码
好了转回正题,针对第一个实例的简化还能够作其余的工做。好比:消息接收者在处理消息的时候实现了一个MessageListener接口,同时复写了onMessage(Message message) 方法。那么咱们是否能够将之简化,改写成一个普通的POJO呢?
package cn.zifangsky.activemq.consumer;
import org.springframework.stereotype.Component;
@Component("testQueueReceiver1")
public class QueueReceiver1{
public void handle(String str){
System.out.println("QueueReceiver1收到消息: " + str);
}
}复制代码
从上面的代码能够看出,这里仅仅只定义了一个普通的handle(String str) 方法,彻底看不出来任何JMS的痕迹
<!-- 定义Queue监听器 -->
<jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="test.queue" ref="testQueueReceiver1" method="handle"/>
<jms:listener destination="test.queue" ref="testQueueReceiver2"/>
</jms:listener-container>复制代码
这里只改写了第一个监听相关配置,手动指定了针对接收到的消息的处理方法。固然,Spring会自动完成消息格式的转化
再次运行单元测试:
@Test
public void test(){
for(int i=0;i<10;i++){
queueSender.send("Hi,这是第 " + (i+1) + " 条消息!");
}
}复制代码
输出略
明白了上面的队列类型的消息发送与接收,那么定义一个发布——订阅类型的消息就很简单了,只须要把JmsTemplate的类型改为“pubSubDomain”类型便可:
添加如下内容:
<!-- 定义Topic类型的JmsTemplate -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory" />
<!-- pub/sub模型(发布/订阅) -->
<property name="pubSubDomain" value="true" />
<property name="defaultDestinationName" value="test.topic"/>
</bean>
<!-- 定义Topic监听器 -->
<jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="test.topic" ref="testTopicReceiver1" method="handle"/>
<jms:listener destination="test.topic" ref="testTopicReceiver2" method="handle"/>
</jms:listener-container>复制代码
package cn.zifangsky.activemq.producer;
import javax.annotation.Resource;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
@Component("topicSender")
public class TopicSender {
@Resource(name="jmsTopicTemplate")
private JmsTemplate jmsTemplate;
/** * 发送一条消息到指定队列 * @param message 消息内容 */
public void send(final String message){
jmsTemplate.convertAndSend(message);
}
}复制代码
package cn.zifangsky.activemq.consumer;
import org.springframework.stereotype.Component;
@Component("testTopicReceiver1")
public class TopicReceiver1{
public void handle(String str){
System.out.println("TopicReceiver1收到消息: " + str);
}
}复制代码
另外一个接收者代码跟上面类似,请自行完成,略
package cn.zifangsky.test.springjms;
import javax.annotation.Resource;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import cn.zifangsky.activemq.producer.TopicSender;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:/context/context.xml","classpath:/context/context_activemq.xml"})
public class TestTopic {
@Resource(name="topicSender")
private TopicSender topicSender;
@Test
public void test(){
for(int i=0;i<5;i++){
topicSender.send("Hi,这是第 " + (i+1) + " 条消息!");
}
}
}复制代码
输出以下:
TopicReceiver2收到消息: Hi,这是第 1 条消息!
TopicReceiver1收到消息: Hi,这是第 1 条消息!
TopicReceiver1收到消息: Hi,这是第 2 条消息!
TopicReceiver1收到消息: Hi,这是第 3 条消息!
TopicReceiver1收到消息: Hi,这是第 4 条消息!
TopicReceiver2收到消息: Hi,这是第 2 条消息!
TopicReceiver2收到消息: Hi,这是第 3 条消息!
TopicReceiver2收到消息: Hi,这是第 4 条消息!
TopicReceiver2收到消息: Hi,这是第 5 条消息!
TopicReceiver1收到消息: Hi,这是第 5 条消息!
从上面的输出内容能够看出,发布——订阅类型的消息每一个接收者都会接收到一份消息的副本
若是咱们想要发送和接收对象类型的消息,而不是普通的文本消息。其实,由于Spring提供了默认的消息转换器——SimpleMessageConverter。因此咱们只须要像发送文本消息那样发送对象消息,关于对象的序列化和反序列化这些步骤Spring会自动帮咱们完成
添加如下内容:
<bean id="jmsQueueTemplate2" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory" />
<!-- 非pub/sub模型(发布/订阅),即:队列模型 -->
<property name="pubSubDomain" value="false" />
<property name="defaultDestinationName" value="object.queue"/>
</bean>复制代码
这里,定义了新的JmsTemplate,其默认的目的地是:object.queue
修改队列监听器,添加:
<!-- 定义Queue监听器 -->
<jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
...
<jms:listener destination="object.queue" ref="testQueueReceiver3" method="handle"/>
</jms:listener-container>复制代码
package cn.zifangsky.activemq.producer;
import javax.annotation.Resource;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
import cn.zifangsky.model.User;
@Component("queueSender2")
public class QueueSender2 {
@Resource(name="jmsQueueTemplate2")
private JmsTemplate jmsTemplate;
/** * 发送一条消息到指定队列 * @param user 一个User类型的实体 */
public void send(final User user){
jmsTemplate.convertAndSend(user);
}
}复制代码
能够看出,这里的方法参数就不是普通的文本了,而是一个能够被序列化的对象
注:User.java:
package cn.zifangsky.model;
import java.io.Serializable;
public class User implements Serializable{
private static final long serialVersionUID = 1L;
private Long id;
private String username;
private String password;
public User() {
}
public User(Long id, String username, String password) {
this.id = id;
this.username = username;
this.password = password;
}
(getter和setter方法略)
@Override
public String toString() {
return "User [id=" + id + ", username=" + username + ", password=" + password + "]";
}
}复制代码
package cn.zifangsky.activemq.consumer;
import javax.annotation.Resource;
import org.springframework.stereotype.Component;
import cn.zifangsky.model.User;
@Component("testQueueReceiver3")
public class QueueReceiver3{
public void handle(User user){
System.out.println("接收到消息: " + user);
}
}复制代码
能够看到,这里的handle方法的参数时User类型。固然这个User对象是Spring将消息进行反序列化后生成的
@Resource(name="queueSender2")
private QueueSender2 queueSender2;
@Test
public void testObject(){
User u = new User((long) 1,"test","123456");
queueSender2.send(u);
}复制代码
输出以下:
接收到消息: User [id=1, username=test, password=123456]
能够看出,咱们实际须要作的工做仍是不多的,不少繁琐的步骤都由Spring在后台自动完成了
有时,为了保障消息的可靠性,一般须要在接收到消息以后给某个消息目的地发送一条确认收到的回复消息。固然,要实现这个功能也很简单,只须要在收到消息以后调用某个消息发送者发送一条确认消息便可
好比上面的QueueReceiver3能够改为这样:
package cn.zifangsky.activemq.consumer;
import javax.annotation.Resource;
import org.springframework.stereotype.Component;
import cn.zifangsky.activemq.producer.QueueSender;
import cn.zifangsky.model.User;
@Component("testQueueReceiver3")
public class QueueReceiver3{
@Resource(name="queueSender")
private QueueSender queueSender;
public void handle(User user){
System.out.println("接收到消息: " + user);
queueSender.send("QueueReceiver3已经收到Object类型的消息。。。");
}
}复制代码
这样就能够在收到消息以后,使用QueueSender 这个发送者给“test.queue”这个消息目的地发送一条确认消息了(PS:实际状况的处理可能会比这里稍微复杂一点,这里为了测试只是发送了一条文本消息)
注:使用单元测试的时候最后一条消息可能不会打印出来,由于这次单元测试的生命周期结束以后程序就自动中止了。解决办法能够是手动执行一下第一个实例中的那个单元测试,或者启动这个web项目就能够看到效果了
在前面介绍队列类型的监听器的时候为了验证一条队列里的消息只能被一个接收者接收,所以添加了两个功能彻底同样的接收者:testQueueReceiver1和testQueueReceiver2
其实在实际开发中,为了提升系统的消息处理能力咱们彻底不必像这样定义多个功能同样的消息接收者,相反咱们只须要在配置监听器的时候使用concurrency
这个属性配置多个并行的监听器便可。好比像这样:
<jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" concurrency="5" acknowledge="auto">
<jms:listener destination="test.queue" ref="testQueueReceiver1" method="handle"/>
<jms:listener destination="object.queue" ref="testQueueReceiver3" method="handle"/>
</jms:listener-container>复制代码
若是concurrency属性设置一个固定的数字则表示每一个消息监听器都会被同时启动指定个数的彻底同样的并行监听器来监听消息并转发给消息接收者处理。固然,除了指定固定的数字以外,咱们还能够手动指定一个监听器的数目区间,好比:concurrency=”3-5″
,表示最少打开3个监听器,最多打开5个监听器。消息少时会少打开几个,消息多时会多打开几个,这一过程会自动完成而不须要咱们作其余额外的工做
基于ActiveMQ的异步消息的一些经常使用用法基本上就是这些了。固然,还有一些其余的内容在这里没有介绍到,好比:导出基于JMS的服务、使用AMQP实现消息功能等。限于文章篇幅,这些内容暂且让我放在其余文章单独介绍吧
参考: