activeMQ 使用总结(机制介绍)

1 基本处理方式

 

1.1 topic(publish-subscribe)

     发布订阅模式,发布者发布消息到broker,全部订阅者都会接收到相同消息的copy。javascript

1.2 queue(p2p)

    p2p是生产者生成消息,通过broker-queue,只能有一个消费者处理。在p2p的场景里,相互通讯的双方是经过一个相似于队列的方式来进行交流。和前面pub-sub的区别在于一个topic有一个发送者和多个接收者,而在p2p里一个queue只有一个发送者和一个接收者。html

下图为queue处理过程,以及各类ack_type回复的处理流程,这些ack_type配合ack_mode进行,由jms内部控制。java

   

1. 得到JMS connection factory. 经过咱们提供特定环境的链接信息来构造factory。mysql

2. 利用factory构造JMS connectionspring

3. 启动connectionsql

4. 经过connection建立JMS session.docker

5. 指定JMS destination.数据库

6. 建立JMS producer或者建立JMS message并提供destination.apache

7. 建立JMS consumer或注册JMS message listener.缓存

8. 发送和接收JMS message.

9. 关闭全部JMS资源,包括connection, session, producer, consumer等。

1.3 request-response(p2p)

和前面两种方式比较起来,request-response的通讯方式很常见,可是不是默认提供的一种模式。在前面的两种模式中都是一方负责发送消息而另一方负责处理。而咱们实际中的不少应用至关于一种一应一答的过程,须要双方都能给对方发送消息。因而请求-应答的这种通讯方式也很重要。它也应用的很广泛。 

     请求-应答方式并非JMS规范系统默认提供的一种通讯方式,而是经过在现有通讯方式的基础上稍微运用一点技巧实现的。下图是典型的请求-应答方式的交互过程:

在JMS里面,若是要实现请求/应答的方式,能够利用JMSReplyTo和JMSCorrelationID消息头来将通讯的双方关联起来。另外,QueueRequestor和TopicRequestor可以支持简单的请求/应答过程。

b031376e-2af7-3502-b235-8c4db1ad890f.jpg (775×330)

2 生产模式

经过建立producer,而后调用send接口实现。

3 消费模式

spring框架中使用JMS传递消息有两种方式:JMS template和message listener Container,前者用于同步收发消息,后者用于异步收发消息。

3.1 同步方式

同步方式是建立consumer,经过session进行receive()方式接收消息。

3.2 异步方式

异步方式是经过实现messageListener的onMessage()接口实现,在该接口中完成消息处理。

 

4 可靠性机制

4.1 AUTO_ACKNOWLEDGE 

        Session.AUTO_ACKNOWLEDGE(自动确认模式)

  当消息成功的从receive方法返回时,或者从MessageListener接口的onMessage方法成功返回时,会话自动确认客户端的消息接收。

    自动确认,这就意味着消息的确认时机将有consumer择机确认."择机确认"彷佛充满了不肯定性,这也意味着,开发者必须明确知道"择机确认"的具体时机,不然将有可能致使消息的丢失,或者消息的重复接受.那么在ActiveMQ中,AUTO_ACKNOWLEDGE是如何运做的呢?

    1) 对于consumer而言,optimizeAcknowledge属性只会在AUTO_ACK模式下有效。

    2) 其中DUPS_ACKNOWLEGE也是一种潜在的AUTO_ACK,只是确认消息的条数和时间上有所不一样。

    3) 在“同步”(receive)方法返回message以前,会检测optimizeACK选项是否开启,若是没有开启,此单条消息将当即确认,因此在这种状况下,message返回以后,若是开发者在处理message过程当中出现异常,会致使此消息也不会redelivery,即"潜在的消息丢失";若是开启了optimizeACK,则会在unAck数量达到prefetch * 0.65时确认,固然咱们能够指定prefetchSize = 1来实现逐条消息确认。

    4) 在"异步"(messageListener)方式中,将会首先调用listener.onMessage(message),此后再ACK,若是onMessage方法异常,将致使client端补充发送一个ACK_TYPE为REDELIVERED_ACK_TYPE确认指令;若是onMessage方法正常,消息将会正常确认(STANDARD_ACK_TYPE)。此外须要注意,消息的重发次数是有限制的,每条消息中都会包含“redeliveryCounter”计数器,用来表示此消息已经被重发的次数,若是重发次数达到阀值,将会致使发送一个ACK_TYPE为POSION_ACK_TYPE确认指令,这就致使broker端认为此消息没法消费,此消息将会被删除或者迁移到"dead letter"通道中。

        所以当咱们使用messageListener方式消费消息时,一般建议在onMessage方法中使用try-catch,这样能够在处理消息出错时记录一些信息,而不是让consumer不断去重发消息;若是你没有使用try-catch,就有可能会由于异常而致使消息重复接收的问题,须要注意你的onMessage方法中逻辑是否可以兼容对重复消息的判断。

 

4.2 CLIENT_ACKNOWLEDGE 

        Session.CLIENT_ACKNOWLEDGE(客户端确认模式)

  客户端经过调用消息的acknowledge方法签收消息。在这种模式中,签收是在会话层上进行:签收一个已消费的消息会自动地签收这个Session全部已消费消息的收条。

客户端手动确认,这就意味着AcitveMQ将不会“自做主张”的为你ACK任何消息,开发者须要本身择机确认。在此模式下,开发者须要须要关注几个方法:

1) message.acknowledge(),

2) ActiveMQMessageConsumer.acknowledege(),

3) ActiveMQSession.acknowledge();

其1)和3)是等效的,将当前session中全部consumer中还没有ACK的消息都一块儿确认,2)只会对当前consumer中那些还没有确认的消息进行确认。开发者能够在合适的时机必须调用一次上述方法。

    咱们一般会在基于Group(消息分组)状况下会使用CLIENT_ACKNOWLEDGE,咱们将在一个group的消息序列接受完毕以后确认消息(组);不过当你认为消息很重要,只有当消息被正确处理以后才能确认时,也很可使用此ACK_MODE。

    若是开发者忘记调用acknowledge方法,将会致使当consumer重启后,会接受到重复消息,由于对于broker而言,那些还没有真正ACK的消息被视为“未消费”。

    开发者能够在当前消息处理成功以后,当即调用message.acknowledge()方法来"逐个"确认消息,这样能够尽量的减小因网络故障而致使消息重发的个数;固然也能够处理多条消息以后,间歇性的调用acknowledge方法来一次确认多条消息,减小ack的次数来提高consumer的效率,不过这仍然是一个利弊权衡的问题。

    除了message.acknowledge()方法以外,ActiveMQMessageConumser.acknowledge()和ActiveMQSession.acknowledge()也能够确认消息,只不过前者只会确认当前consumer中的消息。其中sesson.acknowledge()和message.acknowledge()是等效的。

    不管是“同步”/“异步”,ActiveMQ都不会发送STANDARD_ACK_TYPE,直到message.acknowledge()调用。若是在client端未确认的消息个数达到prefetchSize * 0.5时,会补充发送一个ACK_TYPE为DELIVERED_ACK_TYPE的确认指令,这会触发broker端能够继续push消息到client端。(参看PrefetchSubscription.acknwoledge方法)

    在broker端,针对每一个Consumer,都会保存一个由于"DELIVERED_ACK_TYPE"而“拖延”的消息个数,这个参数为prefetchExtension,事实上这个值不会大于prefetchSize * 0.5,由于Consumer端会严格控制DELIVERED_ACK_TYPE指令发送的时机(参见ActiveMQMessageConsumer.ackLater方法),broker端经过“prefetchExtension”与prefetchSize互相配合,来决定即将push给client端的消息个数,count = prefetchExtension + prefetchSize - dispatched.size(),其中dispatched表示已经发送给client端可是尚未“STANDARD_ACK_TYPE”的消息总量;因而可知,在CLIENT_ACK模式下,足够快速的调用acknowledge()方法是决定consumer端消费消息的速率;若是client端由于某种缘由致使acknowledge方法未被执行,将致使大量消息不能被确认,broker端将不会push消息,事实上client端将处于“假死”状态,而没法继续消费消息。咱们要求client端在消费1.5*prefetchSize个消息以前,必须acknowledge()一次;一般咱们老是每消费一个消息调用一次,这是一种良好的设计。

    此外须要额外的补充一下:全部ACK指令都是依次发送给broker端,在CLIET_ACK模式下,消息在交付给listener以前,都会首先建立一个DELIVERED_ACK_TYPE的ACK指令,直到client端未确认的消息达到"prefetchSize * 0.5"时才会发送此ACK指令,若是在此以前,开发者调用了acknowledge()方法,会致使消息直接被确认(STANDARD_ACK_TYPE)。broker端一般会认为“DELIVERED_ACK_TYPE”确认指令是一种“slow consumer”信号,若是consumer不能及时的对消息进行acknowledge而致使broker端阻塞,那么此consumer将会被标记为“slow”,此后queue中的消息将会转发给其余Consumer。

4.3 DUPS_OK_ACKNOWLEDGE

        Session.DUPS_OK_ACKNOWLEDGE(延时/批量确认模式)

  这种确认方式容许JMS没必要急于确认收到的消息,容许在收到多个消息以后一次完成确认,与Auto_AcKnowledge相比,这种确认方式在某些状况下可能更有效,由于没有确认,当系统崩溃或者网络出现故障的时候,消息能够被从新传递. 

  这种方式会引发消息的重复,可是下降了Session的开销,因此只有客户端能容忍重复的消息,才可以使用。(若是ActiveMQ再次传送同一消息,那么消息头中的JMSRedelivered将被设置为true)

 "消息可重复"确认,意思是此模式下,可能会出现重复消息,并非一条消息须要发送屡次ACK才行。它是一种潜在的"AUTO_ACK"确认机制,为批量确认而生,并且具备“延迟”确认的特色。对于开发者而言,这种模式下的代码结构和AUTO_ACKNOWLEDGE同样,不须要像CLIENT_ACKNOWLEDGE那样调用acknowledge()方法来确认消息。

    1) 在ActiveMQ中,若是在Destination是Queue通道,咱们真的能够认为DUPS_OK_ACK就是“AUTO_ACK + optimizeACK + (prefetch > 0)”这种状况,在确认时机上几乎彻底一致;此外在此模式下,若是prefetchSize =1 或者没有开启optimizeACK,也会致使消息逐条确认,从而失去批量确认的特性。

    2) 若是Destination为Topic,DUPS_OK_ACKNOWLEDGE才会产生JMS规范中诠释的意义,即不管optimizeACK是否开启,都会在消费的消息个数>=prefetch * 0.5时,批量确认(STANDARD_ACK_TYPE),在此过程当中,不会发送DELIVERED_ACK_TYPE的确认指令,这是1)和AUTO_ACK的最大的区别。

    这也意味着,当consumer故障重启后,那些还没有ACK的消息会从新发送过来。

4.4 SESSION_TRANSACTED

当session使用事务时,就是使用此模式。在事务开启以后,和session.commit()以前,全部消费的消息,要么所有正常确认,要么所有redelivery。这种严谨性,一般在基于GROUP(消息分组)或者其余场景下特别适合。在SESSION_TRANSACTED模式下,optimizeACK并不能发挥任何效果,由于在此模式下,optimizeACK会被强制设定为false,不过prefetch仍然能够决定DELIVERED_ACK_TYPE的发送时机。

    由于Session非线程安全,那么当前session下全部的consumer都会共享同一个transactionContext;同时建议,一个事务类型的Session中只有一个Consumer,已避免rollback()或者commit()方法被多个consumer调用而形成的消息混乱。

    当consumer接受到消息以后,首先检测TransactionContext是否已经开启,若是没有,就会开启并生成新的transactionId,并把信息发送给broker;此后将检测事务中已经消费的消息个数是否 >= prefetch * 0.5,若是大于则补充发送一个“DELIVERED_ACK_TYPE”的确认指令;这时就开始调用onMessage()方法,若是是同步(receive),那么即返回message。上述过程,和其余确认模式没有任何特殊的地方。

    当开发者决定事务能够提交时,必须调用session.commit()方法,commit方法将会致使当前session的事务中全部消息当即被确认;事务的确认过程当中,首先把本地的deliveredMessage队列中还没有确认的消息所有确认(STANDARD_ACK_TYPE);此后向broker发送transaction提交指令并等待broker反馈,若是broker端事务操做成功,那么将会把本地deliveredMessage队列清空,新的事务开始;若是broker端事务操做失败(此时broker已经rollback),那么对于session而言,将执行inner-rollback,这个rollback所作的事情,就是将当前事务中的消息清空并要求broker重发(REDELIVERED_ACK_TYPE),同时commit方法将抛出异常。

    当session.commit方法异常时,对于开发者而言一般是调用session.rollback()回滚事务(事实上开发者不调用也没有问题),固然你能够在事务开始以后的任什么时候机调用rollback(),rollback意味着当前事务的结束,事务中全部的消息都将被重发。须要注意,不管是inner-rollback仍是调用session.rollback()而致使消息重发,都会致使message.redeliveryCounter计数器增长,最终都会受限于brokerUrl中配置的"jms.redeliveryPolicy.maximumRedeliveries",若是rollback的次数过多,而达到重发次数的上限时,消息将会被DLQ(dead letter)。

4.5 INDIVIDUAL_ACKNOWLEDGE

单条消息确认,这种确认模式,咱们不多使用,它的确认时机和CLIENT_ACKNOWLEDGE几乎同样,当消息消费成功以后,须要调用message.acknowledege来确认此消息(单条),而CLIENT_ACKNOWLEDGE模式先message.acknowledge()方法将致使整个session中全部消息被确认(批量确认)。

5 消息重传机制

5.1 重传触发机制

消息重传主要是针对事务性消息以及client_ack方式的进行,重传触发时机以下,

1)事务session,而且调用了rollback()方法;
2)事务session,关闭以前调用了commit;
3)非事务session中使用CLIENT_ACKNOWLEDGE签收模式,而且调用了Session.recover()方法。

若是不是以上状况,(未成功确认的消息)不会当即触发消息重传,会在下次客户端启动时,重传。

一旦消息重发尝试超太重发策略中配置的maximumRedeliveries(缺省为6次)时,会给broker发送一个"Poison ack",通知它,这个消息被认为是一个毒丸(a poison pill),接着broker会将这个消息发送到DLQ(Dead Letter Queue),以便后续分析处理。

经过recover重传方式,当前线程会和消息绑定,当前线程挂起,消息不转发给其余线程,只有当该线程关闭,才会释放对该消息的持有。

经过抛异常的方式,也会重发从新投递,但此时,只是将该消息看成一个新消息,从新负载分配到全部监听。

5.2 消息重传配置

具体参数以下,

 属性 默认值 说明 
 collisionAvoidanceFactor  0.15  设置防止冲突范围的正负百分比,只有启用useCollisionAvoidance参数时才生效。
 maximumRedeliveries  6  最大重传次数,达到最大重连次数后抛出异常。为-1时不限制次数,为0时表示不进行重传。
 maximumRedeliveryDelay  -1  最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,之后每次重连时间间隔都为最大重连时间间隔。
 initialRedeliveryDelay  1000L  初始重发延迟时间
 redeliveryDelay  1000L  重发延迟时间,当initialRedeliveryDelay=0时生效(v5.4)
 useCollisionAvoidance  false  启用防止冲突功能,由于消息接收时是可使用多线程并发处理的,应该是为了重发的安全性,避开全部并发线程都在同一个时间点进行消息接收处理。全部线程在同一个时间点处理时会发生什么问题呢?应该没有问题,只是为了平衡broker处理性能,不会有时很忙,有时很空闲。
 useExponentialBackOff  false  启用指数倍数递增的方式增长延迟时间。
 backOffMultiplier  5  重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生效。

6  消息持久化

 

为了不意外宕机之后丢失信息,须要作到重启后能够恢复消息队列,消息系统通常都会采用持久化机制。

ActiveMQ的消息持久化机制有JDBC,AMQ,KahaDB和LevelDB,不管使用哪一种持久化方式,消息的存储逻辑都是一致的。

就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等,而后试图将消息发送给接收者,发送成功则将消息从存储中删除,失败则继续尝试。

消息中心启动之后首先要检查指定的存储位置,若是有未发送成功的消息,则须要把消息发送出去。

6.1 JDBC持久化方式

使用JDBC持久化方式,数据库会建立3个表:activemq_msgs,activemq_acks和activemq_lock。
activemq_msgs用于存储消息,Queue和Topic都存储在这个表中。

(1)配置方式

配置持久化的方式,都是修改安装目录下conf/acticvemq.xml文件,

首先定义一个mysql-ds的MySQL数据源,而后在persistenceAdapter节点中配置jdbcPersistenceAdapter而且引用刚才定义的数据源。

<persistenceAdapter> 

    <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false" /> 

</persistenceAdapter>

dataSource指定持久化数据库的bean,createTablesOnStartup是否在启动的时候建立数据表,默认值是true,这样每次启动都会去建立数据表了,通常是第一次启动的时候设置为true,以后改为false。
使用MySQL配置JDBC持久化:

<beans>

    <broker brokerName="test-broker" persistent="true" xmlns="http://activemq.apache.org/schema/core">

        <persistenceAdapter>

            <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false"/>

        </persistenceAdapter>

    </broker>

    <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">

        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>

        <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>

        <property name="username" value="activemq"/>

        <property name="password" value="activemq"/>

        <property name="maxActive" value="200"/>

        <property name="poolPreparedStatements" value="true"/>

    </bean>

</beans>

(2)数据库表信息 

activemq_msgs用于存储消息,Queue和Topic都存储在这个表中:
ID:自增的数据库主键
CONTAINER:消息的Destination
MSGID_PROD:消息发送者客户端的主键
MSG_SEQ:是发送消息的顺序,MSGID_PROD+MSG_SEQ能够组成JMS的MessageID
EXPIRATION:消息的过时时间,存储的是从1970-01-01到如今的毫秒数
MSG:消息本体的Java序列化对象的二进制数据
PRIORITY:优先级,从0-9,数值越大优先级越高

activemq_acks用于存储订阅关系。若是是持久化Topic,订阅者和服务器的订阅关系在这个表保存:
主要的数据库字段以下:
CONTAINER:消息的Destination
SUB_DEST:若是是使用Static集群,这个字段会有集群其余系统的信息
CLIENT_ID:每一个订阅者都必须有一个惟一的客户端ID用以区分
SUB_NAME:订阅者名称
SELECTOR:选择器,能够选择只消费知足条件的消息。条件能够用自定义属性实现,可支持多属性AND和OR操做
LAST_ACKED_ID:记录消费过的消息的ID。

表activemq_lock在集群环境中才有用,只有一个Broker能够得到消息,称为Master Broker,
其余的只能做为备份等待Master Broker不可用,才可能成为下一个Master Broker。
这个表用于记录哪一个Broker是当前的Master Broker。

6.2 AMQ方式

性能高于JDBC,写入消息时,会将消息写入日志文件,因为是顺序追加写,性能很高。为了提高性能,建立消息主键索引,而且提供缓存机制,进一步提高性能。每一个日志文件的大小都是有限制的(默认32m,可自行配置)。
当超过这个大小,系统会从新创建一个文件。当全部的消息都消费完成,系统会删除这个文件或者归档(取决于配置)。
主要的缺点是AMQ Message会为每个Destination建立一个索引,若是使用了大量的Queue,索引文件的大小会占用不少磁盘空间。
并且因为索引巨大,一旦Broker崩溃,重建索引的速度会很是慢。

配置片断以下:

<persistenceAdapter>

     <amqPersistenceAdapter directory="${activemq.data}/activemq-data" maxFileLength="32mb"/>

</persistenceAdapter>

然AMQ性能略高于下面的Kaha DB方式,可是因为其重建索引时间过长,并且索引文件占用磁盘空间过大,因此已经不推荐使用。

6.3  KahaDB方式

KahaDB是从ActiveMQ 5.4开始默认的持久化插件,也是咱们项目如今使用的持久化方式。

KahaDb恢复时间远远小于其前身AMQ而且使用更少的数据文件,因此能够彻底代替AMQ。
kahaDB的持久化机制一样是基于日志文件,索引和缓存。

配置方式:

<persistenceAdapter>

    <kahaDB directory="${activemq.data}/activemq-data" journalMaxFileLength="16mb"/>

</persistenceAdapter>

directory : 指定持久化消息的存储目录

journalMaxFileLength : 指定保存消息的日志文件大小,具体根据你的实际应用配置

(1)KahaDB主要特性
一、日志形式存储消息;
二、消息索引以B-Tree结构存储,能够快速更新;
三、彻底支持JMS事务;
四、支持多种恢复机制;

(2)KahaDB的结构

消息存储在基于文件的数据日志中。若是消息发送成功,变标记为可删除的。系统会周期性的清除或者归档日志文件。
消息文件的位置索引存储在内存中,这样能快速定位到。按期将内存中的消息索引保存到metadata store中,避免大量消息未发送时,消息索引占用过多内存空间。

Data logs:
Data logs用于存储消息日志,消息的所有内容都在Data logs中。
同AMQ同样,一个Data logs文件大小超过规定的最大值,会新建一个文件。一样是文件尾部追加,写入性能很快。
每一个消息在Data logs中有计数引用,因此当一个文件里全部的消息都不须要了,系统会自动删除文件或放入归档文件夹。

Metadata cache :
缓存用于存放在线消费者的消息。若是消费者已经快速的消费完成,那么这些消息就不须要再写入磁盘了。
Btree索引会根据MessageID建立索引,用于快速的查找消息。这个索引一样维护持久化订阅者与Destination的关系,以及每一个消费者消费消息的指针。

Metadata store 
在db.data文件中保存消息日志中消息的元数据,也是以B-Tree结构存储的,定时从Metadata cache更新数据。Metadata store中也会备份一些在消息日志中存在的信息,这样可让Broker实例快速启动。
即使metadata store文件被破坏或者误删除了。broker能够读取Data logs恢复过来,只是速度会相对较慢些。

6.4 LevelDB方式

从ActiveMQ 5.6版本以后,又推出了LevelDB的持久化引擎。
目前默认的持久化方式仍然是KahaDB,不过LevelDB持久化性能高于KahaDB,多是之后的趋势。
在ActiveMQ 5.9版本提供了基于LevelDB和Zookeeper的数据复制方式,用于Master-slave方式的首选数据复制方案。

  结语:到目前为止,咱们已经已经简单的了解了ActiveMQ中消息传送机制,还有JMS中ACK策略,重点分析了optimizeACK的策略,但愿开发者可以在使用activeMQ中避免一些没必要要的错误。本文若有疏漏和错误之处,请各位不吝赐教,特此感谢。在后续的文章中,会详细介绍具体配置。

7 延迟消息

 

有时候咱们不但愿消息立刻被broker投递出去,而是想要消息60秒之后发给消费者,或者咱们想让消息没隔必定时间投递一次,一共投递指定的次数。。。

相似这种需求,ActiveMQ提供了一种broker端消息定时调度机制。

咱们只须要把几个描述消息定时调度方式的参数做为属性添加到消息,broker端的调度器就会按照咱们想要的行为去处理消息。

首先开启schedulerSupport为true,在activemq.xml文件添加

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">

一共有四个属性:

 

Property name type description
AMQ_SCHEDULED_DELAY long 延迟投递的时间
AMQ_SCHEDULED_PERIOD long 重复投递的时间间隔
AMQ_SCHEDULED_REPEAT int 重复投递次数
AMQ_SCHEDULED_CRON String Cron表达式

 

固然ActiveMQ也提供了一个封装的消息类型:org.apache.activemq.ScheduledMessage.

使用示例,延迟60秒:

MessageProducer producer = session.createProducer(destination);
        TextMessage message = session.createTextMessage("test msg");
        long time = 60 * 1000;
        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
        producer.send(message);

延迟30秒,投递10次,间隔10秒:

MessageProducer producer = session.createProducer(destination);
        TextMessage message = session.createTextMessage("test msg");
        long delay = 30 * 1000;
        long period = 10 * 1000;
        int repeat = 9;
        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
        producer.send(message);

使用 CRON 表达式的例子:

MessageProducer producer = session.createProducer(destination);
        TextMessage message = session.createTextMessage("test msg");
        message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
        producer.send(message);

CRON表达式的优先级高于另外三个参数,若是在设置了CRON的同时,也有repeat和period参数,则会在每次CRON执行的时候,重复投递repeat次,每次间隔为period。就是说设置是叠加的效果。例如每小时都会发生消息被投递10次,延迟1秒开始,每次间隔1秒:

MessageProducer producer = session.createProducer(destination);
        TextMessage message = session.createTextMessage("test msg");
        message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000);
        message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1000);
        message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 9);
        producer.send(message);

stomp方式,只须要在head中添加属性便可,

activeMQ.publish(exports.signQueue,msg,{'persistent':true,'AMQ_SCHEDULED_DELAY':60000});

 

参考文章:

http://blog.csdn.net/czp11210/article/details/47022639

http://activemq.apache.org/redelivery-policy.html

http://www.myexception.cn/internet/1252460.html