ActiveMq中Session的事务与消息过时

ActiveMQ有支持两种事务,html

  • JMS transactions - the commit() / rollback() methods on a Session (which is like doing commit() / rollback() on a JDBC connection)
  • XA Transactions - where the XASession acts as an XAResource by communicating with the Message Broker, rather like a JDBC Connection takes place in an XA transaction by communicating with the database.

在支持事务的session中,producer发送message时在message中带有transaction ID。broker收到message后判断是否有transaction ID,若是有就把message保存在transaction store中,等待commit或者rollback消息。因此ActiveMq的事务是针对broker而不是producer的,无论session是否commit,broker都会收到message。java

若是producer发送模式选择了persistent,那么message过时后会进入死亡队列。在message进入死亡队列以前,ActiveMQ会删除message中的transaction ID,这样过时的message就不在事务中了,不会保存在transaction store中,会直接进入死亡队列。具体删除transaction ID的地方是在apache

org.apache.activemq.util.BrokerSupport的doResend,将transaction ID保存在了originalTransactionID中,删除了transaction IDapi

public static void doResend(final ConnectionContext context, Message originalMessage, ActiveMQDestination deadLetterDestination, boolean copy) throws Exception {         
        Message message = copy ? originalMessage.copy() : originalMessage;  
        message.setOriginalDestination(message.getDestination());  
        <strong>message.setOriginalTransactionId(message.getTransactionId());</strong>  
        message.setDestination(deadLetterDestination);  
        <strong>message.setTransactionId(null);</strong>  
        message.setMemoryUsage(null);  
        message.setRedeliveryCounter(0);  
        boolean originalFlowControl = context.isProducerFlowControl();  
        try {  
            context.setProducerFlowControl(false);  
            ProducerInfo info = new ProducerInfo();  
            ProducerState state = new ProducerState(info);  
            ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();  
            producerExchange.setProducerState(state);  
            producerExchange.setMutable(true);  
            producerExchange.setConnectionContext(context);  
            context.getBroker().send(producerExchange, message);  
        } finally {  
            context.setProducerFlowControl(originalFlowControl);  
        }
相关文章
相关标签/搜索