JMS和ActiveMQ介绍(2)_JMS

181144337.jpg
181211455.jpg

JMSAPI能够分为3个主要部分:公共API、队列API和主题API。spring

JMSAPI中,ConnectionFactory和Destination既能够做为受管对象,由JMS提供者建立,并使用JNDI从提供者得到,也能够直接动态建立;服务器

其余接口经过工厂方法建立,好比Session能够经过Connection建立;网络

消息生产者和消费者通常仅建立一个链接(Connection),而能够建立多个会话(Session);session

由Session保存用于消息发送的事务性工做单元。异步

经过Session能够建立Message,MessageProducer,MessageConsumer。ide

181244742.jpg

消息可分为3部分:消息头、属性和有效负载。spa

其中消息又分为自动分配消息头和开发者分配消息头。线程

自动分配的消息头:3d

1)JMSDestination,消息目的地。对象

2)JMSDeliveryMode,消息传送模式,默认设置为持久性模式。

3)JMSMessageID,消息ID。

4)JMSTimestamp,JMS提供者接收消息的时间。

5)JMSExpiration,消息过时时间,默认值为0,即永不过时。

6)JMSRedelivered,是不是重发的消息。

7)JMSPriority,0~4级是普通优先级,而5~9级则是加急优先级。

开发者分配的消息头:

1)JMSReplyTo,接收消息后发送响应消息的目的地。

2)JMSCorrelationID,与消息相关联的特定ID,在大多数状况下,用于接收消息后发送响应消息时,记录上一个消息的ID。

3)JMSType,消息类型。

属性:

与消息头相似,由开发者添加,支持String、Int、Boolean、Double、Float、Byte、Long、Short、Object等类型,可用于消息选择;

有效负载,即实际传输的消息内容。

根据要携带的有效负载类型,JMS定义了6种消息接口:

Message,不含有效负载;

TextMessage,携带一个String做为有效负载;

ObjectMessage,携带一个可序列化Java对象做为有效负载;

BytesMessage,携带一组原始类型字节流;

StreamMessage,携带一个Java原始数据类型流;

MapMessage,携带一组键值对做为有效负载。

消息持久化,经过JMSDeliveryMode设定。在持久性模式下,消息在服务器端被持久化保存,直至消费者接收到该消息,所以能够保证消息成功发送有且仅有一次,而非持久性模式只能保证消息最多发送一次,而不能保证消息被成功接收。

181317579.jpg

对于消息生产者相关API的使用,咱们分队列和主题分别介绍。

在队列下:

经过ActiveMQ提供的链接工厂类建立queueConnection。

queueConnection调用start方法真正创建与消息服务器的长链接。

调用queueConnectoin的createQueueSession方法建立负责消息相关操做的会话单元。

调用queueSession的createQueue、createSender、createMessage方法分别建立队列、发送者和消息。

最后调用queueSender的send方法发送消息,消息发送是一个同步操做,在send方法中线程会被阻塞,直到接收到来自消息服务器的确认。

在主题下,消息生产过程当中各接口的调用方法与队列下的基本相似,只是使用了与主题相关的一些接口。所以,对于消息生产,可使用公共API中的接口来完成。

181346160.jpg

使用公共API中的接口实现消息生产者,其中使用了公共API中的接口Connection、Session、Destination和MessageProducer。

在建立destination时,调用createTopic方法和createQueue方法来分别建立主题和队列。

181411646.jpg

对于消息消费者相关API的使用,咱们也分队列和主题分别介绍。

在队列下,调用queueSession的createReceiver方法建立接收者。

调用queueReceiver的receive方法接收消息,在receive方法中,线程将被阻塞直至接收到消息。也能够在调用receive方法时,设置最长延时,若该时间内未收到消息,receive方法将直接返回。

在主题下,调用topicSession的createSubscriber方法建立订阅者。

另外须要经过实现MessageListener接口,实现消息侦听器,并重写侦听器的onMessage方法。经过topicSubscriber的setMessageListener将侦听器注册到订阅者中。当消息服务器将一条消息推向topicSubscriber时,topicSubscriber将调用侦听器的onMessage方法对消息进行具体处理。

181440686.jpg

使用公共API中的接口实现消息消费者。

对于队列和主题,除了在建立destination时的方法不一样外,在消息接收时,队列下的consumer是经过receive方法阻塞线程主动接收消息,主题下的comsumer是经过设置侦听器侦听推送过来的消息。

181517760.jpg

对于持久订阅者,它与普通订阅者的区别是:对于普通订阅者,在不链接时,发送的消息将被丢失,链接后这些消息不会被接收,而对于持久订阅者,在不链接时,发送的消息将被保存,链接后这些消息会被正常接收。

持久订阅者的建立与普通订阅者的建立有两点不一样:一是在链接中须要设置ClientID,二是在建立订阅者时调用createDurableSubscriber方法建立持久订阅者,并设置订阅者名称。消息服务器将ClientID和订阅者名称做为持久订阅者的惟一标识符,在不链接时,为其保留一份消息副本,在链接时,将消息发送至该持久订阅者。

若要完全关闭持久订阅者,使消息服务器再也不为其保留消息副本,则须要调用unsubscribe方法。

持久订阅者的优势是无论订阅者是否链接,消息都能被正常接收。

持久订阅者的缺点是若订阅者一直不链接,消息将被一直保存,形成存储压力。

所以是否选用持久订阅者要根据具体应用场景来决定,若须要保证订阅者在有链接丢失的状况下仍能接收到全部消息,则采用持久订阅者,若只须要订阅者在链接时接收消息,且容许有必定的消息丢失,则采用普通订阅者。

181543737.jpg

队列中的消息只能被一个消费者消费,若查看队列中的消息,但不消费消息,可使用QueueBrowser。

经过调用getEnumeration方法能够按照消息接收顺序获取到当前队列中的全部消息。

181611827.jpg

对于消费者,有时候须要处理知足特定条件的消息,一个可行的解决方案是在处理消息前,增长条件判断,对于不知足条件的消息不做处理,但这个方案存在的一个问题是因为消息已经被该消费者接收,但不会被处理,从而形成网络带宽的浪费,另外不做处理的消息也不能再被其余可能会处理这些消息的消费者接收。

咱们能够经过消息选择器解决上述问题。使用消息选择器能够实现消息的过滤,只接收知足特定要求的消息。

消息选择器能够增长的条件包括消息头和消息属性。消息头中能够用做选择条件的有JMSDeliveryMode,JMSPriority,JMSMessageID,JMSTimestamp,JMSCorrelationID,JMSType,例如能够选择权重值较高的消息。在消息属性中能够设置业务属性,这样就能够选择知足特定业务需求的消息。

消息选择器的基本表达式由标识符、比较运算符和常量组成,例如,JMSPriority> 5,即选择权重大于5的消息。

多个基本表达式能够由AND、OR组成复杂表达式,例如,JMSPriority> 5 AND Oper= ‘Delete’,即选择权重大于5且业务上操做类型为“Delete”的消息。

消息选择器在建立消费者时设置,以下所示。

181634269.jpg

对于持久化消息,消息服务器会对消息进行确认,以保证消息成功发送。

消息确认机制有以下几种:AUTO_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE,CLIENT_ACKNOWLEDGE,

消息确认机制能够在建立session时指定。

AUTO_ACKNOWLEDGE是由消息服务器自动完成消息确认。

消息的发送和接收是两个异步过程,所以分别讨论这两个过程当中AUTO机制的消息确认。

消息发送时:

1)生产者发送消息,生产者阻塞线程等待消息服务器接收到消息的确认。

2)消息服务器接收到消息后,对消息进行持久化。

3)持久化成功后,消息服务器向生成者发送确认通知。

4)生产者接收到确认通知后,从发送方法返回。

消息接收时:

5)消费者接收消息。

6)消费者发送确认通知。

7)消息服务器从存储中删除消息。

DUPS_OK_ACKNOWLEDGE不一样于AUTO对单条消息进行确认,是一种延时、批量确认机制,这样作的一个好处是能够减小因单条消息确认而带来的系统开销,但带来的一个问题是可能将一条消息向同一目的地发送两次以上,所以,适用于能够重复接收消息的应用场景。

CLIENT_ACKNOWLEDGE是由消费者经过调用消息的acknowledge方法进行确认,若是消费者使用CLIENT机制,且调用acknowledge方法,那么在上次确认之间所发的未被确认的消息都将被确认。

INDIVIDUAL_ACKNOWLEDGE也是由消费者经过调用消息的acknowledge方法进行确认,但不一样于CLIENT机制,每条消息都须要调用acknowledge方法进行确认。

181702304.jpg

消息事务能够保证:

若是在一个会话中,只有消息生产者或只有消息消费者,则对于消息生产者,事务将保证其全部消息都发送至服务器或都不发送,而对于消息消费者,事务将保证其接收全部消息或都不接收。

若是在一个会话中,既有消息生产者,又有消息消费者,则事务将保证全部消息都发送并接收或都不发送和接收。


建立会话时能够设置是否使用事务。

屡次操做后经过调用commit()提交。在出现异常时,经过调用rollback()回滚。

181744622.jpg

Spring对JMS提供支持,在客户端能够经过Spring配置链接、生产者和消费者。

经过spring提供的DefaultMessageListenerContainer能够设置消息的侦听,经过spring提供的JmsTemplate能够进行消息的发送和接收。