这是我本身从不知道JMS为什么物到学习如何使用第三方工具实现跨服务器的知识总结,在整个过程当中可能考虑不全。另外,若是想尽快使用JMS,建议直接看实例那一节就能够了。有问题多交流。 java
(有些词可能用的不是很正确,在这里我把本身能意识到的词拿出来解释一下): spring
一、 跨服务器:专业术语好像叫“跨实例”。意思是,能够在多个服务器(能够是不一样的服务器,如resin与tomcat)之间相互通讯。与之对应的是单服务器版。 数据库
二、 消息生产者:就是专门制造消息的类。 apache
三、 消息消费者:也叫消息接收者,它主要是实现了消息监听的一个接口,固然,也能够难过Spring提供的一个转换器接口指定任意一个类中的任意方法。 tomcat
咱们都知道,任何一个系统从总体上来看,其实质就是由无数个小的服务或事件(咱们能够称之为事务单元)有机地组合起来的。对于系统中任何一个比较复杂的功能,都是经过调用各个独立的事务单元以实现统一的协调运做而实现的。 session
如今咱们的问题是,若是有两个彻底独立的服务(好比说两个不一样系统间的服务)须要相互交换数据,咱们该如何实现? 并发
好吧,我认可,我很傻很天真,我想到的第一个方法就是在须要的系统中将代码再写一遍,但我也知道,这绝对不现实!好吧,那我就应该好好学习学习达人们是如何去解决这样的问题。
第一种方法,估计也是用的最多的,就是rpc模式。这种方法就是在本身的代码中远程调用其它程序中的代码以达到交换数据的目的。可是这种方法很显然地存在了一个问题:就是必定要等到获取了数据以后才能继续下面的操做。固然,若是一些逻辑是须要这些数据才能操做,那这就是咱们须要的。
第二种方法就是Hessian,我我的以为Hessian的实如今本质上与rpc模式的同样,只是它采用了配置,简化了代码。
上面这两个方法,基本上能解决全部的远程调用的问题了。可是美中不足的是,若是我在A系统中有一个操做是须要让B系统作一个响应的,但我又不须要等它响应完才作下面的操做,这该怎么办?因而新的解决方案就须要被提出来,而SUN公司的设计师们也考虑到了,在JAVA中这就被体现为JMS(java message service)。
JMS模块的功能只提供了接口,并无给予实现,实现JMS接口的消息中间件叫JMS Provider,这样的消息中间件能够从Java里经过JMS接口进行调用。
JMS消息由两部分构成:header和body。header包含消息的识别信息和路由信息,body包含消息的实际数据。
JMS的通用接口集合以异步方式发送或接收消息。另外, JMS采用一种宽松结合方式整合企业系统的方法,其主要的目的就是建立可以使用跨平台数据信息的、可移植的企业级应用程序,而把开发人力解放出来。
Java消息服务支持两种消息模型:Point-to-Point消息(即P2P)和发布订阅消息(Publish Subscribe messaging,简称Pub/Sub,也就是广播模式)。
根据数据格式,JMS消息可分为如下五种:
BytesMessage 消息是字节流。
MapMessage 消息是一系列的命名和值的对应组合。
ObjectMessage 消息是一个流化(即继承Serializable)的Java对象。
StreamMessage 消息是Java中的输入输出流。
TextMessage 消息是一个字符串,这种类型将会普遍用于XML格式的数据。
在使用JMS时,其步骤很像使用JDBC同样,须要的步骤为:
1、创建消息链接(也就是创建链接工厂);
2、设定消息目的地(其实与步骤1中用的类是同样的,只是它是用来指定目的地,而步骤1中是用来指定消息服务器地址的);
3、建立jmsTemplate实例(为下一步构建消息sessin做准备);
4、建立消息生产者(其中就用到了2、3两步的产物),它就是一个普通的类,通常是经过send方法发送消息,也能够经过MessageListenerAdapter指定发送信息的方法;
5、建立MDP(也就是消息接收者,它是一个必须实现MessageListener接口的类);
6、为每一个MDP创建一个监听容器,当有相应的消息传来,则它会自动调用对应的MDP消费消息。
整个过程就像编写JDBC同样,代码维护量很大。为此,让Spring对其进行管理是个不错的选择。
Spring框架提供了一个模板机制来隐藏Java APIs的细节。开发人员可使用JDBCTemplate和JNDITemplate类来分别访问后台数据库和JEE资源(数据源,链接池)。JMS也不例外,Spring提供JMSTemplate类,所以开发人员不用为一个JMS实现去编写样本代码。接下来是在开发JMS应用程序时Spring所具备一些的优点。
1. 提供JMS抽象API,简化了访问目标(队列或主题)和向指定目标发布消息时JMS的使用。
2. 开发人员不须要关心JMS不一样版本(例如JMS 1.0.2与JMS 1.1)之间的差别。
3. 开发人员没必要专门处理JMS异常,由于Spring为全部JMS异常提供了一个未经检查的异常,并在JMS代码中从新抛出
具体的详细步骤与方法参考 spring-reference2.5.pdf 中的第十九章。
下面,我就将我在整个学习过程当中实践过的例子一一列举出来,并将在其中遇到的问题和心得给出必定的说明,但愿对你们能有所帮助。
1、首先,咱们须要配置resin下的resin.conf文件,在其中(<server></server>之间)加上:
<!-- The ConnectionFactory resource defines the JMS factory for creating JMS connections -->
<resource jndi-name="jms/factory"
type="com.caucho.jms.ConnectionFactoryImpl">
</resource>
<!-- Queue configuration with Resin's database -->
<resource jndi-name="jms/queue"
type="com.caucho.jms.memory. MemoryQueue">
<init>
<queue-name>OssQueue</queue-name>
</init>
</resource>
<!-- Queue configuration with Resin's database -->
<resource jndi-name="jms/topic"
type="com.caucho.jms.memory. MemoryTopic">
<init>
<queue-name>ossTopic</queue-name>
</init>
</resource>
注:i、我如今只知道JNDI方式配置消息的链接工厂,我并不知道有没有其它的方式,但我看了许多资料上也没提到其它配置方式。
ii、网上不多有关于在resin中配置JMS消息工厂的资料,只有在resin的官网上才能见到。
iii、上面JNDI配置的地方须要注意的是,你们若是在网上看资料的话,可能会发现网上会比我给出的老是会多一些,也就是老是多一些<data-source>的初始化配置,如:
<resource jndi-name="jms/factory"
type="com.caucho.jms.ConnectionFactoryImpl">
<init>
<data-source>jdbc/database</data-source>
</init>
</resource>
就这样的配置,单独启动resin是没有问题的,可是若是将其按照下面的Spring配置加到系统中,就会出异常(具体的异常名称我忘了,中文的大概意思是:数据库对象不能转换成JMS链接对象,还有一种状况是启动系统时会内存溢出)。我认为这种配置多是数据库消息模式的配置(由于JMS有内存和数据库两种管理方式,我目前只学习了内存管理的方式,至于数据库管理方式你们要是有兴趣能够参考:
http://www.oracle.com/technology/books/pdfs/2352_Ch06_FINAL.pdf)
2、在web.xml文件中配置一个spring用的上下文:
<context-param>
<param-name>contextConfigLocation</param-name>
<param-value>/WEB-INF/jmsconfig.xml</param-value>
</context-param>
<!-- 配置Spring容器 -->
<listener>
<listener-class>
org.springframework.web.context.ContextLoaderListener
</listener-class>
</listener>
注:我是将jmsconfig.xml加载到service.xml中随系统启动的。
3、建立jmsconfig.xml用来装配jms,内容以下:
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN"
"http://www.springframework.org/dtd/spring-beans.dtd">
<beans>
<bean id="jmsConnectionFactory"
class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName">
<value>java:comp/env/jms/factory </value>
</property>
</bean>
<bean id="destination" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName">
<value> java:comp/env/jms/queue</value>
</property>
</bean>
<!-- Spring JmsTemplate config -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<bean
class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory"
ref="jmsConnectionFactory"/>
</bean>
</property>
</bean>
<!-- POJO which send Message uses Spring JmsTemplate --> <!--配置消息生产者-->
<bean id="messageProducer" class="com.focustech.jms.MessageProducer">
<property name="template" ref="jmsTemplate"/>
<property name="destination" ref="destination"/>
</bean>
<!-- Message Driven POJO (MDP) -->
<bean id="messageListener" class=" com.focustech.jms.MessageConsumer"/>
<!-- listener container,MDP无需实现接口 -->
<bean id="listenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
</beans>
其中:
1) jmsConnectionFactory和destination都是使用自定义的,并且你会发现,这两个对象的加载类实际上是同样的,都是JndiObjectFactoryBean,这是从JNDI读取链接的意思。
3) MessageProducer是消息发送方。
4) MessageConsumer实现了一个MessageListener,监听是否收到消息。
4、发送和接收消息的class以下(主要代码):
MessageProducer.java
public class MessageProducer {
private JmsTemplate template;
private Destination destination;
public void send(final String message) {
template.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
Message m = session.createTextMessage(message);
return m;
}
});
}
public void setDestination(Destination destination) {
this.destination = destination;
}
public void setTemplate(JmsTemplate template) {
this.template = template;
}
}
MessageConsumer.java
public class MessageConsumer implements MessageListener {
public void onMessage(Message message) {
try
{
System.out.println(((TextMessage) message).getText());
}
catch (JMSException e)
{
}
}
}
注:在上面的实例类中,因为在发送方发送的是文本消息(TextMessage),因此在上面的接收者代码中我直接将其转换成TextMessage就好了。若是是在真正的环境下,应该首先判断一下对方发送的是什么类型,而后才转换成对应的消息。
5、测试消息
为了测试的方便,能够在webroot下新建一个test.jsp,而后将下面的代码放到JSP的代码中,而后在网页地址栏中输入连接(如:http://oss.vemic.com/test.jsp 注:oss.vemic.com是本地服务器连接)就能够看到发送的消息了。
<%
try {
ServletContext servletContext = this.getServletContext();
WebApplicationContext wac = WebApplicationContextUtils
.getRequiredWebApplicationContext(servletContext);
MessageProducer mp = (MessageProducer) wac.getBean("messageProducer");
mp.send("JMS TEST!!");
} catch (JmsException e) {
}
%>
(二)、配置跨服务器(即两个或多个resin之间)版消息机制
上面介绍的是单服务器的消息模式配置,使用消息模式,是由于咱们须要在两个或多个服务器之间进行消息的传递,而不是单个服务器内容的消息传递。看过不少资料才发现,几乎全部的服务器都不支持跨服务器的消息模式,就算有(如JBoss),那也是由于它们自己集成了第三方的工具而实现的。而在第三方软件里面,我最终选择了apache activeMQ。
apache activeMQ的简介能够去其官网查看:http://activemq.apache.org/
或参考:http://www.blogjava.net/cctvx1/archive/2007/02/07/98457.html
在其官网上下载最新的对应系统的版本。通常来讲,下载完解压以后就可能经过运行:apache-activemq-5.2.0\bin\ activemq.bat就能够成功启动。具体详细的信息参考:
http://andyao.javaeye.com/blog/153171,或者也可参考其官网。
在真正操做以前,为了避免至于糊涂,咱们应该忘掉前面所说的全部配置(固然,JMS的基础知识咱们仍是应该记住的,由于全部的JMS操做都是基于此的。还有那两个生产消息与消费消息的类与测试页面咱们也要保留,由于咱们下面还须要它们),好吧,如今将全部的配置回归到开始的状态吧(如:resin.conf, JMS在Spring中的配置等等都回到原始状态吧)。
先说一下我运行时所需的环境吧:JDK1.5.0_12和JDK1.6.0_05均可以;resin-3.0.25(其它版本没有试过);配置ActiveMQ所需的JAR包有:
activemq-core-5.2.0.jar、activemq-web-5.2.0.jar、geronimo-j2ee-management_1.0_spec-1.0.jar、geronimo-jms_1.1_spec-1.1.1.jar、geronimo-jta_1.0.1B_spec-1.0.1.jar、xbean-spring-3.4.jar。
好了,一切准备就绪了。那就让ActiveMQ先在系统中运行吧(也就是先单服务器运行,先易后难嘛)。为了让它可以运行起来,咱们须要作如下的准备工做:
其实这里的许多配置和上面说的单服务器的配置是差很少的,只是这里再也不须要配置resin,web.xml的配置与上面的如出一辙(固然,我仍是按照个人方式配置在了service.xml中),好了,如今不一样的配置是jmsconfig.xml:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.org/config/1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.org/config/1.0 http://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd">
<!-- 配置ActiveMQ服务 -->
<amq:broker useJmx="false" persistent="false">
<amq:transportConnectors>
<!-- ActiveMQ目前支持的transport有:VM Transport、TCP Transport、SSL Transport、Peer Transport、UDP Transport、Multicast Transport、HTTP and HTTPS Transport、Failover Transport、Fanout Transport、Discovery Transport、ZeroConf Transport等-->
<amq:transportConnector uri="tcp://test.vemic.com:61616" />
</amq:transportConnectors>
</amq:broker>
<!-- 配置JMS链接工厂(注:brokerURL是关键,它应该是上面的amq:transportConnectors里面的值之一对应,由于这里指定链接的对象) -->
<amq:connectionFactory id="jmsConnectionFactory"
brokerURL="tcp://test.vemic.com:61616" />
<!-- 消息发送的目的地(注:”amq:queue”是用于指定是发送topic仍是queue) -->
<amq:queue name="destination" physicalName="ossQueue" />
<!-- 建立JMS的Session生成类,也就是jmsTemplate -->
<bean id="jmsTemplate"
class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<bean
class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory"
ref="jmsConnectionFactory" />
</bean>
</property>
</bean>
<!-- 消息生产者(经过指定目的地, 就能够同时指定其发送的消息模式是topic仍是queue) -->
<bean id="messageProducer"
class="com.focustech.jms.MessageProducer">
<property name="template" ref="jmsTemplate" />
<property name="destination" ref="destination" />
</bean>
<!-- 消息接收类(这个类须要继承javax.jms.MessageListener,固然也能够经过MessageListenerAdapter指定消息转换器来实现用户自定义的消息收发) -->
<bean id="messageListener"
class=" com.focustech.jms.MessageConsumer">
</bean>
<!-- 消息监听容器,其各属性的意义为:
connectionFactory:指定所监听的对象,在这里就是监听链接到tcp://test.vemic.com:61616上面的ActiveMQ;
destination:监听的消息模式;
messageListener:接收者
) -->
<bean id="listenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="destination" ref="destination" />
<property name="messageListener" ref="messageListener" />
</bean>
</beans>
注:test.vemic.com是我本机的URL,和localhost同样。
采用上面单机测试的消息就能够了。最终运行的结果为:
JMS TEST!!
注:
注意到了没有?上面的配置从jmsTemplate开始往下就和前面介绍过的单服务器的配置同样了。看到这里,我相信你们对JMS的工做过程应该很清楚了。我我的认为咱们能够简单的这样理解其工做过程:
生产者
|
消费者
|
JMS connectionFactory |
产生消息并发往指定的目的地 |
接收消息并给出已消费确认信息,JMS connectionFactory收到确认信息后就将对应的信息从本身的管理库中删除 |
从上面的图很清楚地看出,要想实现跨服务器的JMS消息机制,JMS connectionFactory是关键的地方,简单地说:connectionFactory就决定了JMS的做用范围。若是connectionFactory是受制于系统(也就是说,当系统停掉以后connectionFactory也就跟着销毁),那么它就不能实现跨服务器功能。要想实现跨服务功能,connectionFactory就必须独立于系统或服务器。因而可知,结合前面的知识,咱们就能够知道,可以实现跨服务器的JMS消息机制其实有两种方式:JDBC方式和采用第三方工具。前面我也说过,我选择了后者(并且我也一直这么作了)。
ActiveMQ的单服务器版咱们已经成功搭建并能成功运行了。如今让咱们实现JMS跨服务器功能吧。等等,咱们先准备另外一个服务器环境。为了明显的区别两个服务,我将上面全部的环境从新弄了一份(一个新的MyEclipse;一个新的web工程,固然web工程里面的环境与上面的同样;一个新的resin;一个新的resin端口8081,上面的resin端口是80),我称之为client。
接下来,咱们来配置client中的JMS。在全部的系统配置中只有一个配置文件与上面的有区别,那就是jmsconfig.xml:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.org/config/1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.org/config/1.0 http://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd">
<!-- 配置JMS链接工厂(注:brokerURL是关键,它应该是上面的amq:transportConnectors里面的值之一对应,由于这里指定链接的对象) -->
<amq:connectionFactory id="jmsConnectionFactory"
brokerURL="tcp://test.vemic.com:61616" />
<!-- 消息发送的目的地(注:” amq:queue”是用于指定是发送topic仍是queue,对应上面配置中的amq:destinations) -->
<amq:queue name="destination" physicalName="ossQueue" />
<!-- 建立JMS的Session生成类 -->
<bean id="jmsTemplate"
class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<bean
class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory"
ref="jmsConnectionFactory" />
</bean>
</property>
</bean>
<!-- 消息生产者(经过指定目的地, 就能够同时指定其发送的消息模式是topic仍是queue) -->
<bean id="messageProducer"
class="com.focustech.jms.MessageProducer">
<property name="template" ref="jmsTemplate" />
<property name="destination" ref="destination" />
</bean>
<!-- 消息接收类(这个类须要继承javax.jms.MessageListener,固然也能够经过MessageListenerAdapter指定消息转换器来实现用户自定义的消息收发) -->
<bean id="messageListener"
class="com.focustech.jms.MessageConsumer">
</bean>
<!-- 消息监听容器,其各属性的意义为:
connectionFactory:指定所监听的对象,在这里就是监听链接到tcp://test.vemic.com:61616上面的ActiveMQ;
destination:监听的消息模式;
messageListener:接收者
) -->
<bean id="listenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="destination" ref="destination" />
<property name="messageListener" ref="messageListener" />
</bean>
</beans>
注意了!这个配置与上面的ActiveMQ的配置只有一个地方不同,也就是:这个配置中没有配置ActiveMQ的相关信息。为何?结合上面所述的JMS简单工做方式,咱们应该不可贵到答案:由于ActiveMQ要实现跨服务器就必须独立运行,因此咱们只须要启动一个就够了。
注:
其实在这个简单的跨服务器的例子中,其中一方只须要配置消息生产者,而另外一方只须要配置消息的消费者就能够测试经过了,并且测试效果会很明显:在消息生产者的那个服务器上运行测试程序,在消息接收的服务器上就会有相应的响应!在这里我之因此这样作,是让你们在测试时发现一个奇怪的现象:当在一端运行测试程序,第一个消息会被当前运行测试程序的服务消费掉,而接下来的消息又被另外一个服务器消费掉,如此循环(我没有测试过三个及以上的服务器运行状况,我想多是消息一个个的均摊下去的吧)。之因此要让你们看到这个现象,是为了让你们有个疑问,容易接受后面高级应用中关于JMS的两种消息机制:Point-to-Point消息(即P2P)和发布订阅消息(Publish Subscribe messaging,简称Pub/Sub,也就是广播模式)的使用方法。
从这里开始,咱们将进入JMS消息的一些特殊用法,或者叫高级应用。在介绍这些应用的时候,咱们会用到上面已经布署好的跨服务器的应用来做例子。为了区别两个服务器,咱们把配置有ActiveMQ的叫server,另外一个仍是叫client。
上节内容的最后给你们留下了一个有趣的现象。在这里,咱们将针对这个现象进行详细的解析。
咱们前面也知道了,消息模式有两种,但怎么使用却一直没提过。可是若是仔细的看过官网的资料也许已经知道一些了。在这里,我将用实例的方式给你们展示一下它的具体使用。(参考:http://andyao.javaeye.com/blog/234101)
首先,咱们来看Queue消息的使用实例。上面的跨服务器的实例其实就是queue实例的应用,但问题是:如何指定惟一的接收者呢?也就是不能出现上面提到的那个奇怪的循环现象呢?
其实这个现象也并不难回答,首先让咱们来仔细看一下queue消息的目的地配置:
<amq:queue name="destination" physicalName="ossQueue" />
对于上面的配置,咱们能够一一解读其中各参数的含义就知道奥妙所在了:
amq:queue:表示这是配置是queue消息;
name:指定的消息发送与接收的目的地的名称;
physicalName:指定消息队列的物理名称,在ActiveMQ中它就是一个消息集群的表示形式。
根据上面配置的含义,咱们不难发现,其实奥妙就在physicalName这个属性中体现的。具体来讲,对于queue消息而言,只要发送方与接收方都使用同一个physicalName,这就是点对点指定了。例如:将上面的例子中client中的:
<amq:queue name="destination" physicalName="ossQueue" />
改为:
<amq:queue name="destination" physicalName="ossQueue1" />
这样的话,咱们上面说的“奇怪的现象”就不会存在了。由于server中消息是发送到ossQueue这个消息队列里的,而client中消息目的地是指向ossQueue1的,固然就收不到server里面ossQueue中的消息了。
咱们再来看一下如何使用Topic消息。其实很简单,只要将第II节配置中的
<amq:queue name="destination" physicalName="ossQueue" />
改为:
<amq:topic name="destination" physicalName="ossQueue" />
就能够了。运行测试程序时,会发如今两个服务都会收到响应信息。
注:关于topic,有一个概念,叫“订阅”。关于这个词我也不是很了解,但我理解是:在系统中配置了amq:topic而且connectionFactory指定到对应的uri上的前提上,只要amq:topic中对应的physicalName与publish端相同,这就是订阅,这样的配置以后它就能收到发送的信息了。
总之一句话,点对点(或发布订阅模式)的消息发送关键在于收发双方是否共同指定同一个physicalName。
正如前面例子中配置文件中的一行注释说的同样,消息的收发类用户能够自定义,它是经过MessageListenerAdapter指定消息转换器来实现用户自定义的消息收发。具体的操做咱们仍是来看实例吧(为了简单起见,咱们以单服务器的配置与运行做实例,跨服务配置是同样的):
jmsconfig.xml:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.org/config/1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.org/config/1.0 http://activemq.apache.org/schema/core/activemq-core-5.0.0.xsd">
<!-- 配置ActiveMQ服务 -->
<amq:broker useJmx="false" persistent="false">
<amq:transportConnectors>
<!-- 提供的链接方式有:VM Transport、TCP Transport、SSL Transport、
Peer Transport、UDP Transport、Multicast Transport、HTTP and HTTPS Transport、
Failover Transport、Fanout Transport、Discovery Transport、ZeroConf Transport等 -->
<amq:transportConnector uri="tcp://test.vemic.com:61616" />
</amq:transportConnectors>
</amq:broker>
<!-- 配置JMS链接工厂(注:brokerURL是关键,
它应该是上面的amq:transportConnectors里面的值之一) -->
<amq:connectionFactory id="jmsConnectionFactory"
brokerURL="tcp://test.vemic.com:61616" />
<!-- 消息发送的目的地(注:amq:queue是用于指定是发送topic不是queue,对应上面配置中的amq:destinations) -->
<amq:queue name="destination" physicalName="ossQueue" />
<!-- 建立JMS的Session生成类 -->
<bean id="jmsTemplate"
class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<bean
class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory"
ref="jmsConnectionFactory" />
</bean>
</property>
<!-- 指定发送信息时使用的消息转换类.
这个选项不填的话,默认的是:SimpleMessageConverter,它只支持4种类型的对象:String, byte[],Map,Serializable
-->
<!—若是加上下面这段配置就会出错, 错误缘由是Book不是一个原始类, 但我已经将它继承Serializable了,可仍是不行,我想可能有其余什么缘由吧, 但我如今不清楚 -->
<!-- <property name="messageConverter"
ref="resourceMessageConverter" /> -->
</bean>
<!-- 发送消息的转换类
(这个类要继承org.springframework.jms.support.converter.MessageConverter) -->
<bean id="resourceMessageConverter"
class=" com.focustech.jms.ResourceMessageConverter" />
<!-- 消息生产者(经过指定目的地, 就能够同时指定其发送的消息模式是topic仍是queue) -->
<bean id="resourceMessageProducer"
class=" com.focustech.jms.ResourceMessageProducer">
<property name="template" ref="jmsTemplate" />
<property name="destination" ref="destination" />
</bean>
<!-- 消息接收类(这个类须要继承,固然也能够经过MessageListenerAdapter指定消息转换器来实现用户自定义的消息收发) -->
<bean id="resourceMessageListener"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean
class=" com.focustech.jms.ResourceMessageConsumer">
</bean>
</constructor-arg>
<property name="defaultListenerMethod" value="recieve" />
<!—自定义接收类与接收的方法 -->
<property name="messageConverter"
ref="resourceMessageConverter" />
</bean>
<!-- 消息监听容器,其各属性的意义为:
connectionFactory:指定所监听的对象,在这里就是监听链接到tcp://test.vemic.com:61616上面的ActiveMQ;
destination:监听的消息模式;
messageListener:接收者
-->
<bean id="listenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="destination" ref="destination" />
<property name="messageListener" ref="resourceMessageListener" />
</bean>
</beans>
在这里,咱们须要发送本身定义的消息格式,这样,咱们就须要不一样的消息的生产者与消费者,固然,也须要一个自定义的将二者消息进行转换的一个自定义的类,如上面配置文件中指定的同样,这三个自定义的类的主要代码以下:
ResourceMessageProducer:
public class ResourceMessageProducer
{
private JmsTemplate template;
private Destination destination;
public JmsTemplate getTemplate()
{
return template;
}
public void setTemplate(JmsTemplate template)
{
this.template = template;
}
public Destination getDestination()
{
return destination;
}
public void setDestination(Destination destination)
{
this.destination = destination;
}
public void send(Book book)
{
System.out.println("=======================================");
System.out.println("do send ......");
long l1 = System.currentTimeMillis();
template.convertAndSend(this.destination, book);
System.out.println("send time:" + (System.currentTimeMillis() - l1) / 1000 + "s");
System.out.println("=======================================");
}
}
ResourceMessageConverter:
public class ResourceMessageConverter implements MessageConverter
{
@SuppressWarnings("unchecked")
public Message toMessage(Object obj, Session session) throws JMSException, MessageConversionException
{
// check Type
if (obj instanceof Book)
{
// 采用ActiveMQ的方式传递消息
ActiveMQObjectMessage objMsg = (ActiveMQObjectMessage) session.createObjectMessage();
Map map = new HashMap();
map.put("Book", obj);
// objMsg.setObjectProperty里面放置的类型只能是:String, Map, Object, List
objMsg.setObjectProperty("book", map);
return objMsg;
}
else
{
throw new JMSException("Object:[" + obj + "] is not Book");
}
}
public Object fromMessage(Message msg) throws JMSException, MessageConversionException
{
if (msg instanceof ObjectMessage)
{
Object obj = ((ObjectMessage) msg).getObject();
return obj;
}
else
{
throw new JMSException("Msg:[" + msg + "] is not Map");
}
}
}
ResourceMessageConsumer:
public class ResourceMessageConsumer
{
public void recieve(Object obj)
{
Book book = (Book) obj;
System.out.println("=======================================");
System.out.println("receiveing message ...");
System.out.println(book.toString());
System.out.println("here to invoke our business method...");
System.out.println("=======================================");
}
}
Book:
public class Book implements Serializable
{
/**
*
*/
private static final long serialVersionUID = -6988445616774288928L;
long id;
String name;
String author;
public String getAuthor()
{
return author;
}
public void setAuthor(String author)
{
this.author = author;
}
public long getId()
{
return id;
}
public void setId(long id)
{
this.id = id;
}
public String getName()
{
return name;
}
public void setName(String name)
{
this.name = name;
}
}
消息测试:将测试JSP中的JAVA代码改为:
<%
try {
ServletContext servletContext = this.getServletContext();
WebApplicationContext wac = WebApplicationContextUtils
.getRequiredWebApplicationContext(servletContext);
ResourceMessageProducer resourceMessageProducer = (ResourceMessageProducer) context.getBean("messageProducer");
Book book = new Book();
book.setId(123);
book.setName("jms test!");
book.setAuthor("taofucheng");
resourceMessageProducer.send(book);
} catch (JmsException e) {
}
%>
运行系统,打开测试页面,会发现消息已经成功接收!
注:(1)、经过这种方法,咱们就能够发送咱们想发送的任何对象了(有些限制:这些对象的类型必须是:String, Map, byte[],Serializable。上面的例子已经注释得很清楚)。
(2)、若是你们有兴趣的话,看一下MessageListenerAdapter的源码,你就会发现其实它就是MessageListener的实现类,在它实现的onMessage方法中使用了用户自定义的转换类而已。
Spring提供的JMS的API中已经有了集成事务的功能,咱们只要将上面监听容器的配置改为下面的就好了:
首先,将jmsTemplate设置成支持事务(它默认是不支持事务的):
<bean id="jmsTemplate"
class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory">
<bean
class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory"
ref="jmsConnectionFactory" />
</bean>
</property>
<property name="sessionTransacted" value="true"/>
</bean>
而后再在消息监听容器中设置指定的事务管理:
<bean id="listenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="jmsConnectionFactory" />
<property name="destination" ref="destination" />
<property name="messageListener" ref="resourceMessageListener" />
<!—jtaTransactionManager是系统中的事务管理类,在咱们的系统中,是由Spring托管的 -->
<property name="transactionManager" ref="jtaTransactionManager" />
</bean>
这样配置以后,当事务发生回滚时,消息也会有回滚,即不发送出去。
ActiveMQ还有许多其它高级的应用,如:自动重连机制,也就是保证当通讯双方或多方的连接断裂后它会根据用户的设置自动链接,以保证创建可靠的传输;另外,ActiveMQ还有其它方式嵌入到Spring中,如它能够经过xbean, file等方式创建应用;它还能够经过JMX对消息的发送与接收进行实时查看;消息的确认方式等等,还有不少高级的应用,请参考:《ActiveMQ in Action》(网址:http://whitesock.javaeye.com/blog/164925))