ActiveMQ消息传送机制以及ACK机制详解

AcitveMQ是做为一种消息存储和分发组件,涉及到client与broker端数据交互的方方面面,它不只要担保消息的存储安全性,还要提供额外的手段来确保消息的分发是可靠的。java

一. ActiveMQ消息传送机制缓存

    Producer客户端使用来发送消息的, Consumer客户端用来消费消息;它们的协同中心就是ActiveMQ broker,broker也是让producer和consumer调用过程解耦的工具,最终实现了异步RPC/数据交换的功能。随着ActiveMQ的不断发展,支持了愈来愈多的特性,也解决开发者在各类场景下使用ActiveMQ的需求。好比producer支持异步调用;使用flow control机制让broker协同consumer的消费速率;consumer端可使用prefetchACK来最大化消息消费的速率;提供"重发策略"等来提升消息的安全性等。在此咱们不详细介绍。安全

    一条消息的生命周期以下:网络


  

     图片中简单的描述了一条消息的生命周期,不过在不一样的架构环境中,message的流动行可能更加复杂.将在稍后有关broker的架构中详解..一条消息从producer端发出以后,一旦被broker正确保存,那么它将会被consumer消费,而后ACK,broker端才会删除;不过当消息过时或者存储设备溢出时,也会终结它。session


 

     这是一张很复杂,并且有些凌乱的图片;这张图片中简单的描述了:1)producer端如何发送消息 2) consumer端如何消费消息 3) broker端如何调度。若是用文字来描述图示中的概念,恐怕一言难尽。图示中,说起到prefetchAck,以及消息同步、异步发送的基本逻辑;这对你了解下文中的ACK机制将有很大的帮助。架构

二. optimizeACK负载均衡

    "可优化的ACK",这是ActiveMQ对于consumer在消息消费时,对消息ACK的优化选项,也是consumer端最重要的优化参数之一,你能够经过以下方式开启:异步

    1) 在brokerUrl中增长以下查询字符串: tcp

Java代码 工具

  1. String brokerUrl = "tcp://localhost:61616?" +   
  2.                    "jms.optimizeAcknowledge=true" +   
  3.                    "&jms.optimizeAcknowledgeTimeOut=30000" +   
  4.                    "&jms.redeliveryPolicy.maximumRedeliveries=6";  
  5. ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);  

    2) 在destinationUri中,增长以下查询字符串:

Java代码 

  1. String queueName = "test-queue?customer.prefetchSize=100";  
  2. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
  3. Destination queue = session.createQueue(queueName);  

    咱们须要在brokerUrl指定optimizeACK选项,在destinationUri中指定prefetchSize(预获取)选项,其中brokerUrl参数选项是全局的,即当前factory下全部的connection/session/consumer都会默认使用这些值;而destinationUri中的选项,只会在使用此destination的consumer实例中有效;若是同时指定,brokerUrl中的参数选项值将会被覆盖。optimizeAck表示是否开启“优化ACK”,只有在为true的状况下,prefetchSize(下文中将会简写成prefetch)以及optimizeAcknowledgeTimeout参数才会有意义。此处须要注意"optimizeAcknowledgeTimeout"选项只能在brokerUrl中配置。

    prefetch值建议在destinationUri中指定,由于在brokerUrl中指定比较繁琐;在brokerUrl中,queuePrefetchSize和topicPrefetchSize都须要单独设定:"&jms.prefetchPolicy.queuePrefetch=12&jms.prefetchPolicy.topicPrefetch=12"等来逐个指定。

    若是prefetchACK为true,那么prefetch必须大于0;当prefetchACK为false时,你能够指定prefetch为0以及任意大小的正数。不过,当prefetch=0是,表示consumer将使用PULL(拉取)的方式从broker端获取消息,broker端将不会主动push消息给client端,直到client端发送PullCommand时;当prefetch>0时,就开启了broker push模式,此后只要当client端消费且ACK了必定的消息以后,会当即push给client端多条消息。

    当consumer端使用receive()方法同步获取消息时,prefetch能够为0和任意正值;当prefetch=0时,那么receive()方法将会首先发送一个PULL指令并阻塞,直到broker端返回消息为止,这也意味着消息只能逐个获取(相似于Request<->Response),这也是Activemq中PULL消息模式;当prefetch > 0时,broker端将会批量push给client 必定数量的消息(<= prefetch),client端会把这些消息(unconsumedMessage)放入到本地的队列中,只要此队列有消息,那么receive方法将会当即返回,当必定量的消息ACK以后,broker端会继续批量push消息给client端。

    当consumer端使用MessageListener异步获取消息时,这就须要开发设定的prefetch值必须 >=1,即至少为1;在异步消费消息模式中,设定prefetch=0,是相悖的,也将得到一个Exception。

    此外,咱们还能够brokerUrl中配置“redelivery”策略,好比当一条消息处理异常时,broker端能够重发的最大次数;和下文中提到REDELIVERED_ACK_TYPE互相协同。当消息须要broker端重发时,consumer会首先在本地的“deliveredMessage队列”(Consumer已经接收但还未确认的消息队列)删除它,而后向broker发送“REDELIVERED_ACK_TYPE”类型的确认指令,broker将会把指令中指定的消息从新添加到pendingQueue(亟待发送给consumer的消息队列)中,直到合适的时机,再次push给client。

    到目前为止,或许你知道了optimizeACK和prefeth的大概意义,不过咱们可能还会有些疑惑!!optimizeACK和prefetch配合,将会达成一个高效的消息消费模型:批量获取消息,并“延迟”确认(ACK)prefetch表达了“批量获取”消息的语义,broker端主动的批量push多条消息给client端,总比client屡次发送PULL指令而后broker返回一条消息的方式要优秀不少,它不只减小了client端在获取消息时阻塞的次数和阻塞的时间,还可以大大的减小网络开支。optimizeACK表达了“延迟确认”的语义(ACK时机),client端在消费消息后暂且不发送ACK,而是把它缓存下来(pendingACK),等到这些消息的条数达到必定阀值时,只须要经过一个ACK指令把它们所有确认;这比对每条消息都逐个确认,在性能上要提升不少。因而可知,prefetch优化了消息传送的性能,optimizeACK优化了消息确认的性能。

    当consumer端消息消费的速率很高(相对于producer生产消息),并且消息的数量也很大时(好比消息源源不断的生产),咱们使用optimizeACK + prefetch将会极大的提高consumer的性能。不过反过来:

    1) 若是consumer端消费速度很慢(对消息的处理是耗时的),过大的prefetchSize,并不能有效的提高性能,反而不利于consumer端的负载均衡(只针对queue);按照良好的设计准则,当consumer消费速度很慢时,咱们一般会部署多个consumer客户端,并使用较小的prefetch,同时关闭optimizeACK,可让消息在多个consumer间“负载均衡”(即均匀的发送给每一个consumer);若是较大的prefetchSize,将会致使broker一次性push给client大量的消息,可是这些消息须要好久才能ACK(消息积压),并且在client故障时,还会致使这些消息的重发。

    2) 若是consumer端消费速度很快,可是producer端生成消息的速率较慢,好比生产者10秒钟生成10条消息,可是consumer一秒就能消费完毕,并且咱们还部署了多个consumer!!这种场景下,建议开启optimizeACK,可是须要设置的prefetchSize不能过大;这样能够保证每一个consumer都能有"活干",不然将会出现一个consumer很是忙碌,可是其余consumer几乎收不到消息。

    3) 若是消息很重要,特别是不肯意接收到”redelivery“的消息,那么咱们须要将optimizeACK=false,prefetchSize=1

    既然optimizeACK是”延迟“确认,那么就引入一种潜在的风险:在消息被消费以后尚未来得及确认时,client端发生故障,那么这些消息就有可能会被从新发送给其余consumer,那么这种风险就须要client端可以容忍“重复”消息。

    prefetch值默认为1000,固然这个值可能在不少场景下是偏大的;咱们暂且不考虑ACK模式(参见下文),一般状况下,咱们只须要简单的统计出单个consumer每秒的最大消费消息数便可,好比一个consumer每秒能够处理100个消息,咱们指望consumer端每2秒确认一次,那么咱们的prefetchSize能够设置为100 * 2 /0.65大概为300。不管如何设定此值,client持有的消息条数最大为:prefetch + “DELIVERED_ACK_TYPE消息条数”(DELIVERED_ACK_TYPE参见下文)

**     即便当optimizeACK为true,也只会当session的ACK模式为AUTO_ACKNOWLEDGE时才会生效**,即在其余类型的ACK模式时consumer端仍然不会“延迟确认”,即:

Java代码 

  1. consumer.optimizeAck = connection.optimizeACK && session.isAutoAcknowledge()  

    当consumer.optimizeACK有效时,若是客户端已经消费但还没有确认的消息(deliveredMessage)达到prefetch * 0.65,consumer端将会自动进行ACK;同时若是离上一次ACK的时间间隔,已经超过"optimizeAcknowledgeTimout"毫秒,也会致使自动进行ACK。

    此外简单的补充一下,批量确认消息时,只须要在ACK指令中指明“firstMessageId”和“lastMessageId”便可,即消息区间,那么broker端就知道此consumer(根据consumerId识别)须要确认哪些消息。

 
三. ACK模式与类型介绍

    JMS API中约定了Client端可使用四种ACK模式,在javax.jms.Session接口中:

  • AUTO_ACKNOWLEDGE = 1    自动确认
  • CLIENT_ACKNOWLEDGE = 2    客户端手动确认   
  • DUPS_OK_ACKNOWLEDGE = 3    自动批量确认
  • SESSION_TRANSACTED = 0    事务提交并确认

    此外AcitveMQ补充了一个自定义的ACK模式:

  • INDIVIDUAL_ACKNOWLEDGE = 4    单条消息确认

    咱们在开发JMS应用程序的时候,会常用到上述ACK模式,其中"INDIVIDUAL_ACKNOWLEDGE "只有ActiveMQ支持,固然开发者也可使用它. ACK模式描述了Consumer与broker确认消息的方式(时机),好比当消息被Consumer接收以后,Consumer将在什么时候确认消息。对于broker而言,只有接收到ACK指令,才会认为消息被正确的接收或者处理成功了,经过ACK,能够在consumer(/producer)与Broker之间创建一种简单的“担保”机制. 

    Client端指定了ACK模式,可是在Client与broker在交换ACK指令的时候,还须要告知ACK_TYPE,ACK_TYPE表示此确认指令的类型,不一样的ACK_TYPE将传递着消息的状态,broker能够根据不一样的ACK_TYPE对消息进行不一样的操做。

    好比Consumer消费消息时出现异常,就须要向broker发送ACK指令,ACK_TYPE为"REDELIVERED_ACK_TYPE",那么broker就会从新发送此消息。在JMS API中并无定义ACT_TYPE,由于它一般是一种内部机制,并不会面向开发者。ActiveMQ中定义了以下几种ACK_TYPE(参看MessageAck类):

  • DELIVERED_ACK_TYPE = 0    消息"已接收",但还没有处理结束
  • STANDARD_ACK_TYPE = 2    "标准"类型,一般表示为消息"处理成功",broker端能够删除消息了
  • POSION_ACK_TYPE = 1    消息"错误",一般表示"抛弃"此消息,好比消息重发屡次后,都没法正确处理时,消息将会被删除或者DLQ(死信队列)
  • REDELIVERED_ACK_TYPE = 3    消息需"重发",好比consumer处理消息时抛出了异常,broker稍后会从新发送此消息
  • INDIVIDUAL_ACK_TYPE = 4    表示只确认"单条消息",不管在任何ACK_MODE下    
  • UNMATCHED_ACK_TYPE = 5    在Topic中,若是一条消息在转发给“订阅者”时,发现此消息不符合Selector过滤条件,那么此消息将 不会转发给订阅者,消息将会被存储引擎删除(至关于在Broker上确认了消息)。

    到目前为止,咱们已经清楚了大概的原理: Client端在不一样的ACK模式时,将意味着在不一样的时机发送ACK指令,每一个ACK Command中会包含ACK_TYPE,那么broker端就能够根据ACK_TYPE来决定此消息的后续操做. 接下来,咱们详细的分析ACK模式与ACK_TYPE.

Java代码 

  1. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);   

    咱们须要在建立Session时指定ACK模式,因而可知,ACK模式将是session共享的,意味着一个session下全部的 consumer都使用同一种ACK模式。在建立Session时,开发者不能指定除ACK模式列表以外的其余值.若是此session为事务类型,用户指定的ACK模式将被忽略,而强制使用"SESSION_TRANSACTED"类型;若是session非事务类型时,也将不能将 ACK模式设定为"SESSION_TRANSACTED",毕竟这是相悖的.   


 

    Consumer消费消息的风格有2种: 同步/异步..使用consumer.receive()就是同步,使用messageListener就是异步;在同一个consumer中,咱们不能同时使用这2种风格,好比在使用listener的状况下,当调用receive()方法将会得到一个Exception。两种风格下,消息确认时机有所不一样。

    "同步"伪代码:

Java代码 

  1. //receive伪代码---过程  
  2. Message message = sessionMessageQueue.dequeue();  
  3. if(message != null){  
  4.     ack(message);  
  5. }  
  6. return message  

    同步调用时,在消息从receive方法返回以前,就已经调用了ACK;所以若是Client端没有处理成功,此消息将丢失(可能重发,与ACK模式有关)。

    "异步"伪代码:

Java代码 

  1. //基于listener  
  2. Session session = connection.getSession(consumerId);  
  3. sessionQueueBuffer.enqueue(message);  
  4. Runnable runnable = new Ruannale(){  
  5.     run(){  
  6.         Consumer consumer = session.getConsumer(consumerId);  
  7.         Message md = sessionQueueBuffer.dequeue();  
  8.         try{  
  9.             consumer.messageListener.onMessage(md);  
  10.             ack(md);//  
  11.         }catch(Exception e){  
  12.             redelivery();//sometime,not all the time;  
  13.     }  
  14. }  
  15. //session中将采起线程池的方式,分发异步消息  
  16. //所以同一个session中多个consumer能够并行消费  
  17. threadPool.execute(runnable);  

    基于异步调用时,消息的确认是在onMessage方法返回以后,若是onMessage方法异常,会致使消息不能被ACK,会触发重发。

四. ACK模式详解

    AUTO_ACKNOWLEDGE : 自动确认,这就意味着消息的确认时机将有consumer择机确认."择机确认"彷佛充满了不肯定性,这也意味着,开发者必须明确知道"择机确认"的具体时机,不然将有可能致使消息的丢失,或者消息的重复接收.那么在ActiveMQ中,AUTO_ACKNOWLEDGE是如何运做的呢?

    1) 对于consumer而言,optimizeAcknowledge属性只会在AUTO_ACK模式下有效。

    2) 其中DUPS_ACKNOWLEGE也是一种潜在的AUTO_ACK,只是确认消息的条数和时间上有所不一样。

    3) 在“同步”(receive)方法返回message以前,会检测optimizeACK选项是否开启,若是没有开启,此单条消息将当即确认,因此在这种状况下,message返回以后,若是开发者在处理message过程当中出现异常,会致使此消息也不会redelivery,即"潜在的消息丢失";若是开启了optimizeACK,则会在unAck数量达到prefetch * 0.65时确认,固然咱们能够指定prefetchSize = 1来实现逐条消息确认。

    4) 在"异步"(messageListener)方式中,将会首先调用listener.onMessage(message),此后再ACK,若是onMessage方法异常,将致使client端补充发送一个ACK_TYPE为REDELIVERED_ACK_TYPE确认指令;若是onMessage方法正常,消息将会正常确认(STANDARD_ACK_TYPE)。此外须要注意,消息的重发次数是有限制的,每条消息中都会包含“redeliveryCounter”计数器,用来表示此消息已经被重发的次数,若是重发次数达到阀值,将会致使发送一个ACK_TYPE为POSION_ACK_TYPE确认指令,这就致使broker端认为此消息没法消费,此消息将会被删除或者迁移到"dead letter"通道中。

    所以当咱们使用messageListener方式消费消息时,一般建议在onMessage方法中使用try-catch,这样能够在处理消息出错时记录一些信息,而不是让consumer不断去重发消息;若是你没有使用try-catch,就有可能会由于异常而致使消息重复接收的问题,须要注意你的onMessage方法中逻辑是否可以兼容对重复消息的判断。


 
 
 
 

    CLIENT_ACKNOWLEDGE : 客户端手动确认,这就意味着AcitveMQ将不会“自做主张”的为你ACK任何消息,开发者须要本身择机确认。在此模式下,开发者须要须要关注几个方法:1) message.acknowledge(),2) ActiveMQMessageConsumer.acknowledege(),3) ActiveMQSession.acknowledge();其1)和3)是等效的,将当前session中全部consumer中还没有ACK的消息都一块儿确认,2)只会对当前consumer中那些还没有确认的消息进行确认。开发者能够在合适的时机必须调用一次上述方法。为了不混乱,对于这种ACK模式下,建议一个session下只有一个consumer。

    咱们一般会在基于Group(消息分组)状况下会使用CLIENT_ACKNOWLEDGE,咱们将在一个group的消息序列接受完毕以后确认消息(组);不过当你认为消息很重要,只有当消息被正确处理以后才能确认时,也可使用此模式  。

    若是开发者忘记调用acknowledge方法,将会致使当consumer重启后,会接受到重复消息,由于对于broker而言,那些还没有真正ACK的消息被视为“未消费”。

    开发者能够在当前消息处理成功以后,当即调用message.acknowledge()方法来"逐个"确认消息,这样能够尽量的减小因网络故障而致使消息重发的个数;固然也能够处理多条消息以后,间歇性的调用acknowledge方法来一次确认多条消息,减小ack的次数来提高consumer的效率,不过这仍然是一个利弊权衡的问题。

    除了message.acknowledge()方法以外,ActiveMQMessageConumser.acknowledge()和ActiveMQSession.acknowledge()也能够确认消息,只不过前者只会确认当前consumer中的消息。其中sesson.acknowledge()和message.acknowledge()是等效的。

    不管是“同步”/“异步”,ActiveMQ都不会发送STANDARD_ACK_TYPE,直到message.acknowledge()调用。若是在client端未确认的消息个数达到prefetchSize * 0.5时,会补充发送一个ACK_TYPE为DELIVERED_ACK_TYPE的确认指令,这会触发broker端能够继续push消息到client端。(参看PrefetchSubscription.acknwoledge方法)

    在broker端,针对每一个Consumer,都会保存一个由于"DELIVERED_ACK_TYPE"而“拖延”的消息个数,这个参数为prefetchExtension,事实上这个值不会大于prefetchSize * 0.5,由于Consumer端会严格控制DELIVERED_ACK_TYPE指令发送的时机(参见ActiveMQMessageConsumer.ackLater方法),broker端经过“prefetchExtension”与prefetchSize互相配合,来决定即将push给client端的消息个数,count = prefetchExtension + prefetchSize - dispatched.size(),其中dispatched表示已经发送给client端可是尚未“STANDARD_ACK_TYPE”的消息总量;因而可知,在CLIENT_ACK模式下,足够快速的调用acknowledge()方法是决定consumer端消费消息的速率;若是client端由于某种缘由致使acknowledge方法未被执行,将致使大量消息不能被确认,broker端将不会push消息,事实上client端将处于“假死”状态,而没法继续消费消息。咱们要求client端在消费1.5*prefetchSize个消息以前,必须acknowledge()一次;一般咱们老是每消费一个消息调用一次,这是一种良好的设计。

    此外须要额外的补充一下:全部ACK指令都是依次发送给broker端,在CLIET_ACK模式下,消息在交付给listener以前,都会首先建立一个DELIVERED_ACK_TYPE的ACK指令,直到client端未确认的消息达到"prefetchSize * 0.5"时才会发送此ACK指令,若是在此以前,开发者调用了acknowledge()方法,会致使消息直接被确认(STANDARD_ACK_TYPE)。broker端一般会认为“DELIVERED_ACK_TYPE”确认指令是一种“slow consumer”信号,若是consumer不能及时的对消息进行acknowledge而致使broker端阻塞,那么此consumer将会被标记为“slow”,此后queue中的消息将会转发给其余Consumer。

    DUPS_OK_ACKNOWLEDGE : "消息可重复"确认,意思是此模式下,可能会出现重复消息,并非一条消息须要发送屡次ACK才行。它是一种潜在的"AUTO_ACK"确认机制,为批量确认而生,并且具备“延迟”确认的特色。对于开发者而言,这种模式下的代码结构和AUTO_ACKNOWLEDGE同样,不须要像CLIENT_ACKNOWLEDGE那样调用acknowledge()方法来确认消息。

    1) 在ActiveMQ中,若是在Destination是Queue通道,咱们真的能够认为DUPS_OK_ACK就是“AUTO_ACK + optimizeACK + (prefetch > 0)”这种状况,在确认时机上几乎彻底一致;此外在此模式下,若是prefetchSize =1 或者没有开启optimizeACK,也会致使消息逐条确认,从而失去批量确认的特性。

    2) 若是Destination为Topic,DUPS_OK_ACKNOWLEDGE才会产生JMS规范中诠释的意义,即不管optimizeACK是否开启,都会在消费的消息个数>=prefetch * 0.5时,批量确认(STANDARD_ACK_TYPE),在此过程当中,不会发送DELIVERED_ACK_TYPE的确认指令,这是1)和AUTO_ACK的最大的区别。

    这也意味着,当consumer故障重启后,那些还没有ACK的消息会从新发送过来。

    SESSION_TRANSACTED : 当session使用事务时,就是使用此模式。在事务开启以后,和session.commit()以前,全部消费的消息,要么所有正常确认,要么所有redelivery。这种严谨性,一般在基于GROUP(消息分组)或者其余场景下特别适合。在SESSION_TRANSACTED模式下,optimizeACK并不能发挥任何效果,由于在此模式下,optimizeACK会被强制设定为false,不过prefetch仍然能够决定DELIVERED_ACK_TYPE的发送时机。

    由于Session非线程安全,那么当前session下全部的consumer都会共享同一个transactionContext;同时建议,一个事务类型的Session中只有一个Consumer,以免rollback()或者commit()方法被多个consumer调用而形成的消息混乱。

    当consumer接受到消息以后,首先检测TransactionContext是否已经开启,若是没有,就会开启并生成新的transactionId,并把信息发送给broker;此后将检测事务中已经消费的消息个数是否 >= prefetch * 0.5,若是大于则补充发送一个“DELIVERED_ACK_TYPE”的确认指令;这时就开始调用onMessage()方法,若是是同步(receive),那么即返回message。上述过程,和其余确认模式没有任何特殊的地方。

    当开发者决定事务能够提交时,必须调用session.commit()方法,commit方法将会致使当前session的事务中全部消息当即被确认;事务的确认过程当中,首先把本地的deliveredMessage队列中还没有确认的消息所有确认(STANDARD_ACK_TYPE);此后向broker发送transaction提交指令并等待broker反馈,若是broker端事务操做成功,那么将会把本地deliveredMessage队列清空,新的事务开始;若是broker端事务操做失败(此时broker已经rollback),那么对于session而言,将执行inner-rollback,这个rollback所作的事情,就是将当前事务中的消息清空并要求broker重发(REDELIVERED_ACK_TYPE),同时commit方法将抛出异常。

    当session.commit方法异常时,对于开发者而言一般是调用session.rollback()回滚事务(事实上开发者不调用也没有问题),固然你能够在事务开始以后的任什么时候机调用rollback(),rollback意味着当前事务的结束,事务中全部的消息都将被重发。须要注意,不管是inner-rollback仍是调用session.rollback()而致使消息重发,都会致使message.redeliveryCounter计数器增长,最终都会受限于brokerUrl中配置的"jms.redeliveryPolicy.maximumRedeliveries",若是rollback的次数过多,而达到重发次数的上限时,消息将会被DLQ(dead letter)。

    INDIVIDUAL_ACKNOWLEDGE : 单条消息确认,这种确认模式,咱们不多使用,它的确认时机和CLIENT_ACKNOWLEDGE几乎同样,当消息消费成功以后,须要调用message.acknowledege来确认此消息(单条),而CLIENT_ACKNOWLEDGE模式先message.acknowledge()方法将致使整个session中全部消息被确认(批量确认)。

    **结语:**到目前为止,咱们已经已经简单的了解了ActiveMQ中消息传送机制,还有JMS中ACK策略,重点分析了optimizeACK的策略,但愿开发者可以在使用activeMQ中避免一些没必要要的错误。本文若有疏漏和错误之处,请各位不吝赐教,特此感谢。