确认的机制是:消费者从目的地接收Message以后,发送接收确认给JMS Provider,JMS Provider收到确认后,将从目的地中删除该消息,发送处理确认给消费者。
- AUTO_ACKNOWLEDGE
自动确认:会话对收到的每条Message,自动发送接收确认;而后会话线程阻塞,直到收到了JMS Provider发来的处理确认。
接收确认的发送时机:同步接收中,调用receive()方法成功返回;异步接收中,MessageListener的onMessage(Message)方法被调用,并成功返回。
- CLIENT_ACKNOWLEDGE
客户端确认:客户端调用Message#acknowledge()发送接收确认[1];而后会话线程阻塞,直到收到了JMS Provider发来的处理确认。
接收确认的发送时机:显示调用Message#acknowledge()方法。
注:
[1] 每次确认不是只对当前的Message进行确认,而是对自上次确认以来的全部Message进行确认.
- DUPS_OK_ACKNOWLEDGE
消息可重复确认:这个机制行为上表现的和AUTO_ACKNOWLEDGE同样,具备延迟确认的特色。这个机制可能会致使收到重复的消息。
接收确认的发送时机:测试中,PTP Mode每一条消息都会即时确认;Pub/Sub Mode在接收的消息数,每超过prefetch size阀值[1]一半的时候确认一次。
注:
[1] 在brokerURL中能够指定参数jms.prefetchPolicy.topicPrefetch做为prefetch size阀值。你可能会猜想:应该也有参数jms.prefetchPolicy.queuePrefetch,可让DUPS_OK_ACKNOWLEDGE在PTP Mode中也批量确认。可是实验的结果是每一条消息都会即时确认,就好像DUPS_OK_ACKNOWLEDGE只是针对Pub/Sub设定的。
1.3.确认机制的设置
javax.jms.Session针对三种确认机制,分别定义了整型常量:
- AUTO_ACKNOWLEDGE - 自动确认
- CLIENT_ACKNOWLEDGE - 客户端确认
- DUPS_OK_ACKNOWLEDGE - 可重复确认
javax.jms.Connection提供了设置的方法:
- createSession(boolean transacted, int acknowledgeMode):Session
transacted - 设置事务,后面才会谈到事务,这里设置false就行了
acknowledgeMode - 设置确认机制
确认机制的设置是对于有接收须要的Session来说的,若是客户端是纯粹的Producer,确认机制的设置应该是被忽略的。鉴于JMS API只提供了上面一个方法来获取Session实例,建议在Producer-Client端获取Session时,使用Connection#createSession(false, Session.AUTO_ACKNOWLEDGE)。对于纯粹的消费者,或者同时做为生产者和消费者,要针对消费的须要来考虑选择哪个确认机制。
1.4.确认机制的获取
javax.jms.Session提供获取的方法:
- getAcknowledgeMode():int
返回结果有4种,除了上述的3种确认机制以外,还有一个Session.SESSION_TRANSACTED表示Session启用了事务。
1.5.测试
2.Transaction - 事务
事务用于将多个操做组成一个原子操做,要么所有成功,要么所有失败。与事务相关的两个指令:提交、回滚。当确认全部的操做都是正确的时候,执行提交,这些操做就会生效;当其中有一个操做失败的时候,执行回滚,全部已执行的操做就会撤销。
事务的原理:类比于数据库中的事务,原理相同。事务都是用于对于多个修改性质的操做进行绑定,以这些个修改操做要么所有成功,要么所有失败的保障,来确保数据的一致性。数据库中的事务是对修改数据的行为进行绑定,JMS中的事务是对修改目的地中消息的行为(发送消息、接收消息)进行绑定。
若是你对JDBC中的事务有所了解,对比其和JMS中的事务,兴许能帮助你记忆JMS中的事务:
对比内容\事务 |
JDBC事务 |
JMS事务 |
范围 |
java.sql.Connection实例的生命周期以内 |
javax.jms.Session实例的生命周期以内 |
设置和获取 |
java.sql.Connection:
- setAutoCommit(false)
设置事务
- getAutoCommit():boolean
获取事务
|
javax.jms.Connection:
- createSession(true, *)
设置事务。 第二个参数能够设置确认模式,能够是任意值:启用了事务就会忽略确认模式。 不过建议使用Session.SESSION_TRANSACTED
javax.jms.Session:
- getTransacted():boolean
获取事务
|
操做 |
java.sql.Connection:
- commit():void
提交
- rollback():void
回滚
|
javax.jms.Session:
- commit():void
提交
- rollback():void
回滚
|
事务的设置和确认机制的设置使用同一个方法,那么有必要把确认机制和事务进行对比:
对比内容\机制 |
确认机制 |
事务机制 |
本质 |
确保消费者可以获取到消息,在收到消费者的确认以后,才会将消息从目的地移除。 |
将多个操做绑在一块儿,成为原子操做,从而有相同的操做结果(成功或失败)。 |
客户端编程 |
在Producer端是透明的,只能在Consumer端进行控制。 |
能够在Producer和Consumer端实施控制。 |
功能性 |
确认消息已收到。 |
事务的commit,提供相似于批量确认的功能。 不要单纯为了批量确认而使用事务,要从业务的须要上考虑使用事务。 |
由于事务机制提供了确认的功能,因此确认机制和事务机制是互斥的。Connection#createSession(boolean transacted, int acknowledgeMode)方法用于设置事务或确认:
- 若是第一个参数为true,表示启用事务,第二个参数就会被忽略(建议传入Session.SESSION_TRANSACTED)
- 若是第一个参数为false,表示不启用事务,第二个参数用于设置确认模式,就有效了。
2.1.使用模型
1 try {
2 // ...
3 javax.jms.Connection connection = ...;
4 javax.jms.Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
5
6 // send or receive operates
7
8 session.commit(); // commit at lase
9 } catch(JMSException e) {
10 session.rollback(); // rollback when exception happens
11 } finally {
12 // close resources
13 }
2.2.测试
3.持久化模式
确认模式和事务模式足以
[1]处理发送阶段、接收阶段的安全隐患,持久化的机制则用来处理消息在目的地阶段的安全隐患。
若是Producer选择不使用持久化,则消息缓存在内存中,虽然能够得到高吞吐率,可是一旦JMS Provider宕掉,就会致使消息的丢失。非持久化具备高吞吐量和低可靠性的特色。
若是Producer选择持久化,则JMS Provider会将消息存到物理媒介上(文件、数据库),Consumer获取消息,也是从物理媒介读取,吞吐量受到影响,可是即便JMS Provider宕掉,消息也不会丢失。持久化具备低吞吐量和高可靠性的特色。
针对持久化,JMS Provider能够提供多种持久化方案,好比持久化到本地文件、不一样的数据库等,不过这些不是Producer须要关心的,Producer只须要告诉JMS Provider要不要持久化消息就行了。
持久化方案的选择,在%ActiveMQ_HOME%\conf\activemq.xml文件,<persistenceAdapter />元素下配置。能够参考
ActiveMQ持久化,这里不介绍这块内容。
补充:
[1] 实际上,在发送阶段须要咱们本身提供容错方案 - 好比对发送失败的消息,缓存到本地文件等。
3.1.持久化设置
javax.jms.MessageProducer:
- setDeliveryMode(int):void
- 设置此producer实例的默认递送模式
- send(Message message):void
- 以默认的deliveryMode发送消息
- send(Message message, int deliveryMode, int priority, long timeToLive):void
- 以指定的deliveryMode发送消息
javax.jms.DeliveryMode
- static int NON_PERSISTENT
非持久化模式
- static int PERSISTENT
持久化模式
设置递送模式的代码:
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 非持久化
producer.setDelicertMode(DeliveryMode.PERSISTENT); // 持久化
3.2.测试
参考