本篇主要介绍了异步消息机制及Spring对JMS封装,本篇文章讲解较为详细,若是想直接看如何配置,能够参考: Spring整合JMS(消息中间件)实例,但仍是建议你们先看完本篇文章。java
相似于RMI、Hessian、Burlap等远程方法调用,它们都是同步的,所谓同步调用就是客户端必须等待操做完成,若是远程服务没有返回任何响应,客户端会一直等待直到服务完成。 spring
异步调用则不一样,客户端发送完消息无需等待服务处理完成即可当即返回,就像发送完消息就马上被处理成功同样。 apache
在异步处理的世界,咱们能够把消息的发送比做一个邮局系统。好比咱们要给某我的发送信件,咱们只需准备好信件,把它投入邮局的邮箱便可,咱们没必要关心邮件如何送出、可否到达,邮局系统会保证信件最终送达到咱们但愿的接收者手中。和邮局系统相似,当一个应用向另外一个应用发送消息,两个应用之间没有直接的关联,而是发送消息的应用把消息交给一个消息系统,由消息系统确保把消息传递给接收消息的应用。服务器
在异步消息系统中有两个重要的角色:消息broker和destination。当一个应用发送一条消息,它会直接把它发送给消息broker,消息broker扮演的就是邮局,它会确保消息被传递到特定的destination。当咱们邮寄信件时,信件的地址尤其重要,消息系统中的地址就是destination。不过与信件中的地址不一样,destination中定义的不是接收者是谁,而是消息被放在消息broker的什么地方(具体指queue或者topic),destination其实更像邮局系统中的邮筒。网络
尽管存在各类各样的消息系统,每一个消息系统都有各自的消息路由方式,但整体上有两种类型的destination:queue和topic,它们也各自关联着一种特定的消息处理模型:点对点(point-to-point/queue)和发布/订阅(publish/subscribe/topic)session
在点对点模型中,每一个消息只有一个发送者和一个接收者。以下图所示:
在点对点模型中, 消息broker会把消息放入一个queue。当一个接收者请求下一个消息时,消息会被从queue中取出并传递给接收者。由于消息从queue中取出便会被移除,因此这保证了一个消息只能有一个接收者。app
尽管消息队列中的每一个消息只有一个接收者,但这并不意味着只能有一个接收者从队列获取消息,能够同时有多个接收者从队列获取消息,只不过它们只能处理各自接收到的消息。其实这就像在银行排队同样,排队的人能够看作一个个消息,而银行工做窗口即是消息的接收者,每一个窗口服务完一个客户以后都会让队列中的“下一个”到窗口办理业务。框架
还有,若是多个接收者监听一个队列,咱们是很难肯定到底哪一个接收者处理哪一个消息的。不过这也不必定很差,由于这样就使得咱们很方便的经过增长接收者来拓展应用处理能力了。异步
在发布/订阅模式中,消息是被发送到topic中的。就像queue同样,不少接收者能够监听同一个topic,可是与queue每一个消息只传递给一个接收者不一样,订阅了同一个topic的全部接收者都会收到消息的拷贝,以下图所示:
从发布/订阅的名字中咱们也可看出,发布者发布一条消息,全部订阅者都能收到,这就是发布订阅模式最大的特性。对于发布者来讲,它只知道将消息发布到了一个特定的topic,它不关心谁监听这个topic,这也就意味着它并不知道这些消息是被如何处理的。tcp
在具体介绍异步消息系统带来的好处以前,咱们先看看同步系统的局限性:
下面咱们再看一下异步消息系统是如何解决这些问题的。
无需等待
当一个消息被异步发送,客户端不须要等待它处理完成。客户端直接把消息扔给broker而后作其它事情,broker负责把消息送到合适的目的地。
由于客户端不须要等待,因此客户端的性能会有很大的提高。
面向消息和解耦合
不一样于传统基于方法调用的RPC会话,消息异步发送是以数据为中心的。这就意味着客户端不须要和某个方法签名绑定,任何queue或topic的订阅者均可以处理客户端发送的消息。客户端没必要再关心服务方任何相关的问题。
位置独立
同步RPC服务的调用是经过网络地址定位的,这就意味着客户端没法摆脱网络拓扑的变化。若是服务的IP或端口发生改变,客户端也须要作相应的改变。
相反,异步消息系统中的客户端并不关心服务所在的位置及其如何处理消息,它只负责将消息发送到特定的queue或topic。因此,服务位于什么地方都无所谓,只要它们可以从queue或topic中获取消息便可。
在点对点模式中,能够很方便的利用位置独立这个特性建立一个服务集群。客户端不须要关心服务的位置,集群中各个服务仅需知道broker的位置,并从同一个queue获取消息,若是服务压力过大没法及时处理消息,咱们只须要在集群中增长一个服务实例去监听同一个queue便可。
在发布/订阅模式中,位置独立一样有很重要的做用。多个服务能够订阅同一个topic,他们都能获取到topic中的每一个消息,可是对各个服务的处理能够不一样。好比咱们有一个服务集合订阅了一个接收新员工消息的topic,因此这些服务均可以获得每一个新员工消息,一个服务能够将新员工添加到薪资系统,另外一个服务能够将新员工增长到hr系统,还有服务负责赋予新员工各类系统权限等等,每一个订阅topic的服务都能对各自的消息作出本身的处理。
可靠性保证
当一个客户端和服务经过同步方式进行交互时,若是服务出现任何问题挂掉,都会影响客户端正常工做。可是当消息是异步发送时,客户端与服务之间被broker隔离,客户端只负责发送消息,即便当发送消息时服务挂掉,消息也会被broker存储起来,等到服务可用时再接着进行处理。
Java Message Service是一个Java标准,它定义了一套与消息broker交互的通用API。在JMS出现以前,每一种消息broker都有本身独特的一套API,使得应用代码没法在不一样的broker之间适用。可是经过JMS,全部与broker交互的代码就能够适用一套通用的API,就像JDBC同样。
固然Spring对JMS也提供了支持,即JmsTemplate。经过JmsTemplate,咱们能够更加方便地向queue和topic发送和接收消息。后面咱们会详细介绍Spring对JMS的实现,可是在发送和接收消息以前,咱们须要现有一个broker。
ActiveMQ是很是优秀的JMS框架,关于ActiveMQ相关内容这里很少作介绍,具体能够参考:http://activemq.apache.org/,本篇主要介绍如何在Spring中对其进行配置和使用。
咱们要想发送消息到ActiveMQ,就须要先建立到它的链接,ActiveMQConnectionFactory
就是JMS中负责建立到ActiveMQ链接的工厂类。在Spring中配置方式以下:
<bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" p:brokerURL="tcp://localhost:61616"/>
除此以外,Spring为ActiveMQ提供了专门的命名空间,咱们可使用Spring的ActiveMQ命名空间来建立链接工厂。首先要在配置文件中声明amq命名空间:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:amq="http://activemq.apache.org/schema/core" xsi:schemaLocation="http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> ... </beans>
而后咱们就能够利用<amq:connectionFactory>
元素来声明一个链接工厂:
<amq:connectionFactory id="connectionFactory" brokerURL="tcp://localhost:61616"/>
须要注意,<amq:connectionFactory>
元素是专门针对ActiveMQ的。若是咱们用到的是其它broker,就须要用另外的标签元素或注入另外的工厂bean。上面元素中的brokerURL
指定了ActiveMQ在服务器中的IP和端口,上面端口值就是ActiveMQ默认端口。
除了要有一个链接工厂以外,咱们还须要知道消息发送到的destination。上面讲过了,消息的destination只有两类queue或者topic,在Spring中,咱们须要配置queue或topic对应的bean。
配置一个ActiveMQ queue bean:
<bean id="queue" class="org.apache.activemq.command.ActiveMQQueue" c:_="biz1.queue" />
配置一个ActiveMQ topic bean:
<bean id="topic" class="org.apache.activemq.command.ActiveMQTopic" c:_="biz1.topic" />
上面例子中c:_
属性表明的是构造器参数,它指定了queue或topic的名称。
像链接工厂同样,Spring提供了另一种配置destination的方式,就是经过Spring ActiveMQ命名空间进行配置。
使用<amq:queue>
元素配置一个queue:
<amq:queue id="spittleQueue" physicalName="spittle.alert.queue" />
使用<amq:topic>
元素配置一个topic:
<amq:topic id="spittleTopic" physicalName="biz1.topic" />
上面元素中physicalName
属性表明消息通道的名称,也就是queue和topic的名称。
经过上面两个组件的配置,咱们就能够向ActiveMQ发送和接收消息了。发送和接收消息咱们使用的是Spring提供的JmsTempate,它是Spring对JMS的抽象,下面就详细介绍JMSTemplate的使用。
虽然JMS提供了一套与各类broker交互的通用API,但实际使用起来并非很方便,咱们先看一下使用普通JMS API与broker交互的代码。
ConnectionFactory cf =new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection conn = null; Session session = null; try { conn = cf.createConnection(); session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = new ActiveMQQueue("spitter.queue"); MessageProducer producer = session.createProducer(destination); TextMessage message = session.createTextMessage(); message.setText("Hello world!"); producer.send(message); } catch (JMSException e) { // handle exception? } finally { try { if (session != null) { session.close(); } if (conn != null) { conn.close(); } } catch (JMSException ex) { } }
上面代码中咱们能够看到,为了发送一条 “Hello world”的消息却用了20多行代码,就像JDBC同样,咱们大部分代码都是再作一些重复性的准备工做,好比获取链接、建立session、异常处理等等。其实接收消息的代码也是如此,在JDBC中,Spring提供了一个JdbcTemplate来简化JDBC代码开发,一样,Spring也提供了JmsTemplate
来简化JMS消息处理的开发。
JmsTemplate实际上是Spring对JMS更高一层的抽象,它封装了大部分建立链接、获取session及发送接收消息相关的代码,使得咱们能够把精力集中在消息的发送和接收上。另外,JmsTemplate
对异常也作了很好的封装,其对应的基本的异常为JMSException
。
要使用JmsTemplate,就要在Spring配置文件中配置它做为一个bean:
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" c:_-ref="connectionFactory" />
由于JmsTemplate须要先和broker进行链接,因此它须要依赖一个connectionFactory。
发送消息
假如咱们有一个业务须要用到异步消息发送,咱们先定义这样一个业务接口:
public interface MyMessageService { void sendMessage(String message); }
上面接口中只有一个方法,就是发送消息。
咱们写这个接口的实现,在这个接口实现中,咱们就是用JmsTemplate
实现异步消息发送:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsOperations; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; /** * Created by whd@zizizizizi.com on 2016/6/17. */ @Component public class MyMessageServiceImpl implements MyMessageService{ @Autowired private JmsOperations jmsOperations; public void sendMessage(final String message) { jmsOperations.send("biz1.queue", new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } }
咱们能够看到,咱们业务的实现中注入了一个JmsOperations
对象,这个对象就是JmsTempate
的实现。JmsOperations
的send()
方法有两个参数,第一个是消息的destination
,第二个即是具体的Message
,在上面例子中message是经过一个匿名内部类MessageCreator
的createMessage()
方法构造的。
经过上面例子能够发现,经过JmsTempate
,咱们只须要关心发送消息便可,全部的链接和session的维护都由JmsTempate
负责。
设置默认destination
大部分状况下,一个业务消息的destination是相同的,因此咱们没必要每次发送都填写destination,咱们能够在配置文件中对其进行配置:
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" c:_-ref="connectionFactory" p:defaultDestinationName="biz1.queue" />
在上面配置中咱们默认destination值为biz1.queue
,由于它只是声明了一个名称,并无说明是哪一种类型的destination,因此,若是存在相同名称的queue或topic,就会自动与之匹配,若是不存在,则会默认建立一个相同名称的queue。若是咱们想指定destination的类型,咱们能够经过配置让其依赖以前配置的destination bean便可:
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" c:_-ref="connectionFactory" p:defaultDestination-ref="biz1.Topic" />
当咱们配置了默认destination,咱们就能够在发送消息时省略第一个参数了:
jmsOperations.send( new MessageCreator() { ... } );
其实上面的send()
方法能够变得更简单,咱们能够利用消息转换器。
使用消息转换器发送消息
除了send()
方法以外,JmsTemplate
还提供了convertAndSend()
方法。与send()
方法须要依赖一个MessageCreator
不一样,convertAndSend()
方法只须要传入你想发送的消息便可。下面咱们用convertAndSend()
实现接口中的sendMessage()
方法:
public void sendMessage(final String message) { jmsOperations.convertAndSend(message); }
convertAndSend()
方法会自动把你发的消息转换成Message
,具体如何转换的由org.springframework.messaging.converter.MessageConverter
的实现来决定。咱们先看一下MessageConverter
接口:
public interface MessageConverter { Object fromMessage(Message<?> var1, Class<?> var2); Message<?> toMessage(Object var1, MessageHeaders var2); }
咱们能够看到这个接口中只有两个方法并且很容易实现。其实大部分状况下咱们不须要本身去实现这个接口,Spring已经为咱们提供给了不少经常使用的实现:
默认状况下,当JmsTemplate
的convertAndSend()
方法使用的是SimpleMessageConverter
。可是咱们也能够经过配置把咱们自定义的MessageConverter
做为属性注入到JmsTemplate
中,好比咱们有个一MessageConverter
的实现bean:
<bean id="messageConverter" class="org.springframework.jms.support.converter.MappingJacksonMessageConverter" />
咱们能够把上面这个bean注入到JmsTemplate中:
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" c:_-ref="connectionFactory" p:defaultDestinationName="spittle.alert.queue" p:messageConverter-ref="messageConverter" />
消费消息
对于消费来讲,JmsTemplate
使用起来比发送更简单,只须要调用JmsOperations
的receive()
方法便可:
public class ReceiveMessage { @Autowired private JmsOperations jmsOperations; public String receive() { try { ObjectMessage message = (ObjectMessage) jmsOperations.receive(); return (String) message.getObject(); } catch (JMSException e) { e.printStackTrace(); throw JmsUtils.convertJmsAccessException(e); } } }
当调用 jmsOperations.receive()
方法时,它会尝试从broker获取消息,若此时没有消息,receive()
方法会一直等待直到有消息产生。前面例子中,当咱们发送消息的时候消息被封装成的是ObjectMessage
,所咱们在获取的时候能够再将其转换回ObjectMessage
。
这里有一点须要注意,当调用message.getObject()
方法时会抛出JMSException
,这个异常是属于JMS API的。JMSException
是一个检查异常,在JMS操做中会抛出各类各样的JMSException
,可是前面咱们使用JmsTemplate
时并无捕获任何JMSException
,是由于JmsTemplate
内部已经将须要检查的JMSException
转换成了非检查的Spring本身的JmsException
。在上面代码中由于调用的是message.getObject()
方法而不是JmsTemplate
的方法,因此咱们须要捕获JMSException
。可是按照Spring的设计理念,咱们应该尽可能减小检查异常,因此在catch块里面咱们又经过JmsUtils工具把JMSException
转换成了非检查的JmsException
。
一样,就行消息的发送同样,咱们也可使用JmsTemplate的receiveAndConvert()
方法替换receive()
方法:
public String receive() { return (String)jmsOperations.receiveAndConvert(); }
咱们看到,由于使用的是JmsTemplate
的方法,因此咱们不须要再捕获JMSException
检查异常。
无论使用msTemplate
的receive()
仍是receiveAndConvert()
方法消费消息,它们都是同步的。也就是说接收者在消息到达时须要等待。这样看起来是否是有点奇怪?发送消息时是异步的,接收消息时倒是同步的。
这也就是为何会有下面的消息驱动POJO出现的缘由,下面咱们就看一下如何实现异步的接收消息。
咱们上面已经知道,JmsTemplate
的receive()
方法是一个同步方法,在消息到达以前这个方法会挂起一直等待直到消息出现,若是这样的话,咱们的应用可能会出现一直等待消息而不能作其它事情的状况。为什么不让应用先去处理其它业务,当消息出现时再告知应用处理呢?
在EJB中,message driven bean(MDB)
就能够实现异步的处理消息。Spring在这方面参考了EJB3对MDB的实现,不过在Spring中咱们把它称做消息驱动POJO,也就是message-driven POJO(MDP)
。
要想在消息出现时获得通知,那么就须要一个监听器监听queue或者topic,之因此称做消息驱动POJO,意识由于监听器是消息驱动的,而是由于这个监听器自己就是一个普通的POJO对象,不须要依赖任何接口:
public class MyMessageHandler { public void handleMessage(String message){ //具体的实现 } }
有了这个POJO对象,下面只须要作简单的配置便可。
赋予上面POJO接收消息能力的关键在于将其配置成一个Spring消息监听器,Spring的jms命名空间提供了全部相关配置。
首先,咱们现须要把上面的POJO对象声明成一个bean:
<bean id="myMessageHandler" class="com.heaven.springexamples.jms.MyMessageHandler" />
其次,把MessageHandler变成一个消息驱动POJO,即把这个bean声明成一个listener:
<jms:listener-container connection-factory="connectionFactory"> <jms:listener destination="biz1.queue" ref="myMessageHandler" method="handleMessage" /> </jms:listener-container>
经过上面配置,消息监听容器里面就多了一个消息监听器。消息监听容器是一个特殊的bean,它可以监听JMS的destination,监听消息的到达。一旦消息到达,消息监听容器会接受这个消息并将其发送给全部相关的listener。下面这幅图展现了整个内部处理过程:
为了配置监听容器和监听者,咱们用到了jms命名空间中的两个元素。<jms:listener-container>
是父元素,<jms:listener >
是子元素。<jms:listener-container>
依赖一个connectionFactory
,这样它的各个<jms:listener >
就能够监听消息了。<jms:listener >
用来定义具体接收消息的bean及方法。按照上面的配置,当消息到达queue时,MyMessageHandler
的handleMessage
方法便会被调用。
须要注意到是,咱们的MessageHandler
还能够实现一个MessageListener
接口,这样的话就不须要再单独指定消息处理的方法了,MyMessageHandler
的onMessage()
方法会自动被调用。MessageListener接口定义以下:
public interface MessageListener { void onMessage(Message var1); }
咱们写一个简单的实现类:
public class MyMessageListener implements MessageListener{ public void onMessage(Message message) { //具体的实现 } }
而后直接配置listener便可(不用再配置method方法属性):
<jms:listener-container connection-factory="connectionFactory"> <jms:listener destination="biz1.queue" ref="myMessageHandler" /> </jms:listener-container>
原文地址:http://blog.csdn.net/suifeng3051/article/details/51718675