MQ入门总结(三)ActiveMQ的用法和实现

转载:架构设计:系统间通讯(21)——ActiveMQ的安装与使用html

转载:成小胖学习ActiveMQ·基础篇java

转载:ActiveMQ学习心得之ActiveMQ四种存储器分析mysql

转载:ActiveMQ(一)简介与架构sql

转载:ActiveMQ消息传送机制以及ACK机制详解数据库

转载:架构设计:系统间通讯(22)——提升ActiveMQ工做性能(上)apache

转载:架构设计:系统间通讯(23)——提升ActiveMQ工做性能(中)缓存

转载:架构设计:系统间通讯(24)——提升ActiveMQ工做性能(下)安全

1、ActiveMQ

ActiveMQ是Apache软件基金会的开源产品,支持AMQP协议、MQTT协议(和XMPP协议做用相似)、Openwire协议和Stomp协议等多种消息协议。而且ActiveMQ完整支持JMS API接口规范,Apache也提供多种其余语言的客户端,例如:C、C++、C#、Ruby、Perl。服务器

2、ActiveMQ的简单使用

1. 安装和启动ActiveMQ网络

2. 消息生产者代码以下:

package com.ljq.durian.test.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息的生产者(发送者)
 * 
 * @author Administrator
 * 
 */
public class JMSProducer {

    
    public static void main(String[] args) {
        try {
            //第一步:创建ConnectionFactory工厂对象,须要填入用户名、密码、以及要链接的地址,均使用默认便可,默认端口为"tcp://localhost:61616"
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnectionFactory.DEFAULT_USER, 
                    ActiveMQConnectionFactory.DEFAULT_PASSWORD, 
                    "failover:(tcp://localhost:61616)?Randomize=false");
            
            //第二步:经过ConnectionFactory工厂对象咱们建立一个Connection链接,而且调用Connection的start方法开启链接,Connection默认是关闭的。
            Connection connection = connectionFactory.createConnection();
            connection.start();
            
            //第三步:经过Connection对象建立Session会话(上下文环境对象),用于接收消息,参数配置1为是否启用是事务,参数配置2为签收模式,通常咱们设置自动签收。
            Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            
            //第四步:经过Session建立Destination对象,指的是一个客户端用来指定生产消息目标和消费消息来源的对象,在PTP模式中,Destination被称做Queue即队列;在Pub/Sub模式,Destination被称做Topic即主题。在程序中可使用多个Queue和Topic。
            Destination destination = session.createQueue("HelloWorld");
            
            //第五步:咱们须要经过Session对象建立消息的发送和接收对象(生产者和消费者)MessageProducer/MessageConsumer。
            MessageProducer producer = session.createProducer(null);
            
            //第六步:咱们可使用MessageProducer的setDeliveryMode方法为其设置持久化特性和非持久化特性(DeliveryMode),咱们稍后详细介绍。
            //producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            
            //第七步:最后咱们使用JMS规范的TextMessage形式建立数据(经过Session对象),并用MessageProducer的send方法发送数据。同理客户端使用receive方法进行接收数据。最后不要忘记关闭Connection链接。
            
            for(int i = 0 ; i < 10 ; i ++){
                TextMessage msg = session.createTextMessage("我是消息内容" + i);
                // 第一个参数目标地址
                // 第二个参数 具体的数据信息
                // 第三个参数 传送数据的模式
                // 第四个参数 优先级
                // 第五个参数 消息的过时时间
                producer.send(destination, msg, DeliveryMode.NON_PERSISTENT, 0 , 1000L);
                System.out.println("发送消息:" + msg.getText());
                session.commit(); //启用事务时记得提交事务,否则消费端接收不到消息
                Thread.sleep(1000);
            }

            if(connection != null){
                connection.close();
            }            
        } catch (Exception e) {
            e.printStackTrace();
        }
        
    }

}

3. 消息消费者代码以下:

package com.ljq.durian.test.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息的消费者(接受者)
 * 
 * @author Administrator
 * 
 */
public class JMSConsumer {


    public static void main(String[] args)  {
        try {
            //第一步:创建ConnectionFactory工厂对象,须要填入用户名、密码、以及要链接的地址,均使用默认便可,默认端口为"tcp://localhost:61616"
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnectionFactory.DEFAULT_USER, 
                    ActiveMQConnectionFactory.DEFAULT_PASSWORD, 
                    "failover:(tcp://localhost:61616)?Randomize=false");
            
            //第二步:经过ConnectionFactory工厂对象咱们建立一个Connection链接,而且调用Connection的start方法开启链接,Connection默认是关闭的。
            Connection connection = connectionFactory.createConnection();
            connection.start();
            
            //第三步:经过Connection对象建立Session会话(上下文环境对象),用于接收消息,参数配置1为是否启用是事务,参数配置2为签收模式,通常咱们设置自动签收。
            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            
            //第四步:经过Session建立Destination对象,指的是一个客户端用来指定生产消息目标和消费消息来源的对象,在PTP模式中,Destination被称做Queue即队列;在Pub/Sub模式,Destination被称做Topic即主题。在程序中可使用多个Queue和Topic。
            Destination destination = session.createQueue("HelloWorld");
            //第五步:经过Session建立MessageConsumer
            MessageConsumer consumer = session.createConsumer(destination);
            
            while(true){
                TextMessage msg = (TextMessage)consumer.receive();
                if(msg == null) {
                    break;
                }
                System.out.println("收到的内容:" + msg.getText());
            }            
        } catch (Exception e) {
            e.printStackTrace();
        }
        
    }

}


4. 启动消息生产者产生消息,可在ActiveMQ的网页管理中看到消息的状态。


5. 启动消息消费者消费消息,可在ActiveMQ的网页管理中看到消息的状态。


网上例子较多,公司不能传图,留待后补。

3、ActiveMQ的架构

120043_IEeh_1767531.png
ActiveMQ主要涉及到5个方面:
1. 传输协议:消息之间的传递,无疑须要协议进行沟通,启动一个ActiveMQ打开了一个监听端口, ActiveMQ提供了普遍的链接模式,其中主要包括SSL、STOMP、XMPP;ActiveMQ默认的使用的协议是openWire,端口号:61616;
2. 消息域:ActiveMQ主要包含Point-to-Point (点对点),Publish/Subscribe Model (发布/订阅者),其中在Publich/Subscribe 模式下又有Nondurable subscription和durable subscription (持久化订阅)2种消息处理方式
3. 消息存储:在消息传递过程当中,部分重要的消息可能须要存储到数据库或文件系统中,当中介崩溃时,信息不回丢失
4. Cluster  (集群): 最多见到 集群方式包括network of brokers和Master Slave;
5. Monitor (监控) :ActiveMQ通常由jmx来进行监控

默认配置下的ActiveMQ只适合学习代码而不适用于实际生产环境,ActiveMQ的性能须要经过配置挖掘,其性能提升包括代码级性能、规则性能、存储性能、网络性能以及多节点协同方法(集群方案),因此咱们优化ActiveMQ的中心思路也是这样的:

1. 优化ActiveMQ单个节点的性能,包括NIO模型选择和存储选择。

2. 配置ActiveMQ的集群(ActiveMQ的高性能和高可用须要经过集群表现出来)。

4、ActiveMQ的通讯方式

1. 点对点(p2p

点对点模式下一条消息将会发送给一个消息消费者,若是当前Queue没有消息消费者,消息将进行存储。


点对点方式使用生产者-消费者模式,生产者代码以下:

public Producer() throws JMSException {  
    factory = new ActiveMQConnectionFactory(brokerURL);  
    connection = factory.createConnection();  
    connection.start();  
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
    producer = session.createProducer(null);  
}

public void sendMessage() throws JMSException {  
    for(int i = 0; i < jobs.length; i++)  
    {  
        String job = jobs[i];  
        Destination destination = session.createQueue("JOBS." + job);  
        Message message = session.createObjectMessage(i);  
        System.out.println("Sending: id: " + ((ObjectMessage)message).getObject() + " on queue: " + destination);  
        producer.send(destination, message);  
    }  
}  

public static void main(String[] args) throws JMSException {  
    Producer producer = new Producer();  
    for(int i = 0; i < 10; i++) {  
        producer.sendMessage();  
        System.out.println("Produced " + i + " job messages");  
    try {  
            Thread.sleep(1000);  
        } catch (InterruptedException x) {  
        e.printStackTrace();  
        }  
    }  
    producer.close();  
}

生产者将消息放入队列中,由消费者使用,消费者代码以下:

public Consumer() throws JMSException {  
    factory = new ActiveMQConnectionFactory(brokerURL);  
    connection = factory.createConnection();  
    connection.start();  
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
}  

public static void main(String[] args) throws JMSException {  
    Consumer consumer = new Consumer();  
    for (String job : consumer.jobs) {  
        Destination destination = consumer.getSession().createQueue("JOBS." + job);  
        MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination);  
        messageConsumer.setMessageListener(new Listener(job));  
    }  
}  
      
public Session getSession() {  
    return session;  
}
具体注册的对象须要实现MessageListener接口:

import javax.jms.Message;  
import javax.jms.MessageListener;  
import javax.jms.ObjectMessage;  
  
public class Listener implements MessageListener {  
  
    private String job;  
      
    public Listener(String job) {  
        this.job = job;  
    }  
  
    public void onMessage(Message message) {  
        try {  
            //do something here  
            System.out.println(job + " id:" + ((ObjectMessage)message).getObject());  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
  
}



2. 发布-订阅(publish-subscribe)

“发布-订阅”模式下,消息会被复制多份,分别发送给全部“订阅”者。


Publisher
publisher是属于发布信息的一方,它经过定义一个或者多个topic,而后给这些topic发送消息。

public Publisher() throws JMSException {  
        factory = new ActiveMQConnectionFactory(brokerURL);  
        connection = factory.createConnection();  
        try {  
        connection.start();  
        } catch (JMSException jmse) {  
            connection.close();  
            throw jmse;  
        }  
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
        producer = session.createProducer(null); 
}
咱们按照前面说的流程定义了基本的connectionFactory, connection, session, producer。这里代码就是主要实现初始化的效果。接着,咱们须要定义一系列的topic让全部的consumer来订阅,设置topic的代码以下:

protected void setTopics(String[] stocks) throws JMSException {  
    destinations = new Destination[stocks.length];  
    for(int i = 0; i < stocks.length; i++) {  
        destinations[i] = session.createTopic("STOCKS." + stocks[i]);  
    }  
}
这里destinations是一个内部定义的成员变量Destination[]。这里咱们总共定义了的topic数取决于给定的参数stocks。在定义好topic以后咱们要给这些指定的topic发消息,具体实现的代码以下:
protected void sendMessage(String[] stocks) throws JMSException {  
    for(int i = 0; i < stocks.length; i++) {  
        Message message = createStockMessage(stocks[i], session);  
        System.out.println("Sending: " + ((ActiveMQMapMessage)message).getContentMap() + " on destination: " + destinations[i]);  
        producer.send(destinations[i], message);  
    }  
}  
  
protected Message createStockMessage(String stock, Session session) throws JMSException {  
    MapMessage message = session.createMapMessage();  
    message.setString("stock", stock);  
    message.setDouble("price", 1.00);  
    message.setDouble("offer", 0.01);  
    message.setBoolean("up", true);
    return message;  
}
在sendMessage方法里咱们遍历每一个topic,而后给每一个topic发送定义的Message消息。在定义好前面发送消息的基础以后,咱们调用他们的代码就很简单了:

public static void main(String[] args) throws JMSException {  
    if(args.length < 1)  
        throw new IllegalArgumentException();  
      
        // Create publisher       
        Publisher publisher = new Publisher();  
          
        // Set topics  
    publisher.setTopics(args);  
          
    for(int i = 0; i < 10; i++) {  
        publisher.sendMessage(args);  
        System.out.println("Publisher '" + i + " price messages");  
        try {  
            Thread.sleep(1000);  
        } catch(InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
    // Close all resources  
    publisher.close();  
}
调用他们的代码就是咱们遍历全部topic,而后经过sendMessage发送消息。在发送一个消息以后先sleep1秒钟。要注意的一个地方就是咱们使用完资源以后必需要使用close方法将这些资源关闭释放。close方法关闭资源的具体实现以下:

public void close() throws JMSException {  
    if (connection != null) {  
        connection.close();  
     }  
}
Consumer
Consumer的代码也很相似,具体的步骤无非就是1.初始化资源。 2. 接收消息。 3. 必要的时候关闭资源。初始化资源能够放到构造函数里面:

public Consumer() throws JMSException {  
        factory = new ActiveMQConnectionFactory(brokerURL);  
        connection = factory.createConnection();  
        connection.start();  
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
}
接收和处理消息的方法有两种,分为同步和异步的,通常同步的方式咱们是经过MessageConsumer. receive()方法来处理接收到的消息。而异步的方法则是经过注册一个 MessageListener的方法,使用MessageConsumer.setMessageListener()。这里咱们采用异步的方式实现:
public static void main(String[] args) throws JMSException {  
    Consumer consumer = new Consumer();  
    for (String stock : args) {  
    Destination destination = consumer.getSession().createTopic("STOCKS." + stock);  
    MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination);  
    messageConsumer.setMessageListener(new Listener());  
    }  
}  
      
public Session getSession() {  
    return session;  
}
在前面的代码里咱们先找到一样的topic,而后遍历全部的topic去得到消息。对于消息的处理咱们专门经过Listener对象来负责。Listener对象的职责很简单,主要就是处理接收到的消息:

public class Listener implements MessageListener {  
  
    public void onMessage(Message message) {  
        try {  
            MapMessage map = (MapMessage)message;  
            String stock = map.getString("stock");  
            double price = map.getDouble("price");  
            double offer = map.getDouble("offer");  
            boolean up = map.getBoolean("up");  
            DecimalFormat df = new DecimalFormat( "#,###,###,##0.00" );  
            System.out.println(stock + "\t" + df.format(price) + "\t" + df.format(offer) + "\t" + (up?"up":"down"));  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
  
}
它实现了MessageListener接口,里面的onMessage方法就是在接收到消息以后会被调用的方法。

3. 请求-响应(request-response)

和前面两种方式比较起来,request-response的通讯方式很常见,可是不是默认提供的一种模式。在前面的两种模式中都是一方负责发送消息而另一方负责处理。而咱们实际中的不少应用至关于一种一应一答的过程,须要双方都能给对方发送消息。因而请求-应答的这种通讯方式也很重要。它也应用的很广泛。 
请求-应答方式并非JMS规范系统默认提供的一种通讯方式,而是经过在现有通讯方式的基础上稍微运用一点技巧实现的。下图是典型的请求-应答方式的交互过程:


在JMS里面,若是要实现请求/应答的方式,能够利用JMSReplyTo和JMSCorrelationID消息头来将通讯的双方关联起来。另外,QueueRequestor和TopicRequestor可以支持简单的请求/应答过程。如今,若是咱们要实现这么一个过程,在发送请求消息而且等待返回结果的client端的流程以下:

// client side  
Destination tempDest = session.createTemporaryQueue();  
MessageConsumer responseConsumer = session.createConsumer(tempDest);  
...  
  
// send a request..  
message.setJMSReplyTo(tempDest)  
message.setJMSCorrelationID(myCorrelationID);  
  
producer.send(message);
client端建立一个 临时队列并在发送的消息里 指定了发送返回消息的destination以及correlationID。那么在处理消息的server端获得这个消息后就知道该发送给谁了。Server端的大体流程以下:

public void onMessage(Message request) {  
  
  Message response = session.createMessage();  
  response.setJMSCorrelationID(request.getJMSCorrelationID())  
  
  producer.send(request.getJMSReplyTo(), response)  
}
这里咱们是用server端注册 MessageListener,经过设置返回信息的 CorrelationIDJMSReplyTo将信息返回。以上就是发送和接收消息的双方的大体程序结构。具体的实现代码以下:

Client侧实现

public Client() {  
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
        Connection connection;  
        try {  
            connection = connectionFactory.createConnection();  
            connection.start();  
            Session session = connection.createSession(transacted, ackMode);  
            Destination adminQueue = session.createQueue(clientQueueName);  
  
            //Setup a message producer to send message to the queue the server is consuming from  
            this.producer = session.createProducer(adminQueue);  
            this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
  
            //Create a temporary queue that this client will listen for responses on then create a consumer  
            //that consumes message from this temporary queue...for a real application a client should reuse  
            //the same temp queue for each message to the server...one temp queue per client  
            Destination tempDest = session.createTemporaryQueue();  
            MessageConsumer responseConsumer = session.createConsumer(tempDest);  
  
            //This class will handle the messages to the temp queue as well  
            responseConsumer.setMessageListener(this);  
  
            //Now create the actual message you want to send  
            TextMessage txtMessage = session.createTextMessage();  
            txtMessage.setText("MyProtocolMessage");  
  
            //Set the reply to field to the temp queue you created above, this is the queue the server  
            //will respond to  
            txtMessage.setJMSReplyTo(tempDest);  
  
            //Set a correlation ID so when you get a response you know which sent message the response is for  
            //If there is never more than one outstanding message to the server then the  
            //same correlation ID can be used for all the messages...if there is more than one outstanding  
            //message to the server you would presumably want to associate the correlation ID with this  
            //message somehow...a Map works good  
            String correlationId = this.createRandomString();  
            txtMessage.setJMSCorrelationID(correlationId);  
            this.producer.send(txtMessage);  
        } catch (JMSException e) {  
            //Handle the exception appropriately  
        }  
    }
这里的代码除了初始化构造函数里的参数还同时设置了 两个destination一个是本身要发送消息出去的destination,在这一句设置:

session.createProducer(adminQueue);
另一个是本身要接收的消息destination, 经过这两句指定了要接收消息的目的地:
Destination tempDest = session.createTemporaryQueue(); 
responseConsumer = session.createConsumer(tempDest);

这里是用的一个临时队列。在前面指定了返回消息的通讯队列以后,咱们须要通知server端知道发送返回消息给哪一个队列。因而

txtMessage.setJMSReplyTo(tempDest);

指定了这一部分,同时:

txtMessage.setJMSCorrelationID(correlationId);

方法主要是为了保证每次发送回来请求的server端可以知道对应的是哪一个请求。这里一个请求和一个应答是至关于对应一个相同的序列号同样。

由于client端在发送消息以后还要接收server端返回的消息,因此它也要实现一个消息receiver的功能。这里采用实现MessageListener接口的方式:

public void onMessage(Message message) {  
        String messageText = null;  
        try {  
            if (message instanceof TextMessage) {  
                TextMessage textMessage = (TextMessage) message;  
                messageText = textMessage.getText();  
                System.out.println("messageText = " + messageText);  
            }  
        } catch (JMSException e) {  
            //Handle the exception appropriately  
        }  
    }
Server侧实现
server端要执行的过程和client端相反,它是先接收消息,在接收到消息后根据提供的JMSCorelationID来发送返回的消息:

public void onMessage(Message message) {  
        try {  
            TextMessage response = this.session.createTextMessage();  
            if (message instanceof TextMessage) {  
                TextMessage txtMsg = (TextMessage) message;  
                String messageText = txtMsg.getText();  
                response.setText(this.messageProtocol.handleProtocolMessage(messageText));  
            }  
  
            //Set the correlation ID from the received message to be the correlation id of the response message  
            //this lets the client identify which message this is a response to if it has more than  
            //one outstanding message to the server  
            response.setJMSCorrelationID(message.getJMSCorrelationID());  
  
            //Send the response to the Destination specified by the JMSReplyTo field of the received message,  
            //this is presumably a temporary queue created by the client  
            this.replyProducer.send(message.getJMSReplyTo(), response);  
        } catch (JMSException e) {  
            //Handle the exception appropriately  
        }  
    }
在replyProducer.send()方法里,message.getJMSReplyTo()就获得了要发送消息回去的destination。另外,设置这些发送返回信息的replyProducer的信息主要在构造函数相关的方法里实现了:

public Server() {  
        try {  
            //This message broker is embedded  
            BrokerService broker = new BrokerService();  
            broker.setPersistent(false);  
            broker.setUseJmx(false);  
            broker.addConnector(messageBrokerUrl);  
            broker.start();  
        } catch (Exception e) {  
            //Handle the exception appropriately  
        }  
  
        //Delegating the handling of messages to another class, instantiate it before setting up JMS so it  
        //is ready to handle messages  
        this.messageProtocol = new MessageProtocol();  
        this.setupMessageQueueConsumer();  
    }  
  
    private void setupMessageQueueConsumer() {  
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(messageBrokerUrl);  
        Connection connection;  
        try {  
            connection = connectionFactory.createConnection();  
            connection.start();  
            this.session = connection.createSession(this.transacted, ackMode);  
            Destination adminQueue = this.session.createQueue(messageQueueName);  
  
            //Setup a message producer to respond to messages from clients, we will get the destination  
            //to send to from the JMSReplyTo header field from a Message  
            this.replyProducer = this.session.createProducer(null);  
            this.replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
  
            //Set up a consumer to consume messages off of the admin queue  
            MessageConsumer consumer = this.session.createConsumer(adminQueue);  
            consumer.setMessageListener(this);  
        } catch (JMSException e) {  
            //Handle the exception appropriately  
        }  
    }
整体来讲,整个的交互过程并不复杂,只是比较繁琐。

对于请求/应答的方式来讲,这种典型交互的过程就是Client端在设定正常发送请求的Queue同时也设定一个临时的Queue。同时在要发送的message里头指定要返回消息的destination以及CorelationID,这些就比如是一封信里面所带的回执。根据这个信息服务器才知道怎么给客户端回信。

对于Server端来讲则要额外建立一个producer,在处理接收到消息的方法里再利用producer将消息发回去。这一系列的过程看起来很像http协议里面请求-应答的方式,都是一问一答。

5、ActiveMQ的存储

1. 持久化消息和非持久化消息

JMS中对非持久化消息和非持久化消息的称呼分别是:NON_PERSISTENTMessagePERSISTENT Meaage。它们指的是消息在任何一种“发送-接受”模式下(“订阅-发布”模式和“负载均衡模式”),是否进行持久化存储
NON_PERSISTENT Message只存储在JMS服务节点的内存区域,不会存储在某种持久化介质上(AcitveMQ可支持的持久化介质有:KahaBD、AMQ和关系型数据)。在极限状况下,JMS服务节点的内存区域不够使用了,也只会采用某种辅助方案进行转存(例如ActiveMQ会使用磁盘上的一个“临时存储区域”进行暂存)。一旦JMS服务节点宕机了,这些NON_PERSISTENT Message就会丢失。
JMS中对PERSISTENT Meaage的定义是:这些消息不受JMS服务端异常状态的影响,JMS服务端会使用某种持久化存储方案保存这些消息,直到JMS服务端认为这些PERSISTENTMeaage被消费端成功处理。例如ActiveMQ中能够选择的持久化存储方案就包括:KahaDB、AMQ和关系型数据库。
在JMS标准API中,使用setDeliveryMode标记消息发送者是发送的PERSISTENT Meaage仍是NON_PERSISTENT Message。示例以下:

......
for(int index = 0 ; index < 10 ; index++) {
    TextMessage outMessage = session.createTextMessage();
    outMessage.setText("这是发送的消息内容:" + index);
    if(index % 2 == 0) {
        sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    } else {
        sender.setDeliveryMode(DeliveryMode.PERSISTENT);
    }
    sender.send(outMessage);
}
......
那么当 JMS服务节点重启后(注意不是 producer重启),以上代码中发送的10条消息只有其中5条消息可以保存下来。

发送NON_PERSISTENT Message时,消息发送方默认使用异步方式:便是说消息发送后发送方不会等待NON_PERSISTENT Message在服务端的任何回执。那么问题来了:若是这时服务端已经出现了消息堆积,而且堆积程度已经达到“没法再接收新消息”的极限状况了,那么消息发送方若是知晓并采起相应的策略呢?

实际上所谓的异步发送也并不是绝对的异步,消息发送者会在发送必定大小的消息后等待服务端进行回执(这个配置只是针对使用异步方式进行发送消息的状况)

......
// 如下语句设置消息发送者在累计发送102400byte大小的消息后(多是一条消息也多是多条消息)
// 等待服务端进行回执,以便肯定以前发送的消息是否被正确处理
// 肯定服务器端是否产生了过量的消息堆积,须要减慢消息生产端的生产速度
connectionFactory.setProducerWindowSize(102400);
......


若是不特地指定消息的发送类型,那么消息生产者默认发送 PERSISTENT Meaage。这样的消息发送到ActiveMQ服务端后将被进行 持久化存储,而且消息发送者 默认等待ActiveMQ服务端对这条消息处理状况的回执。
以上这个过程很是耗时,ActiveMQ服务端不但要接受消息,在内存中完成存储,而且按照ActiveMQ服务端设置的 持久化存储方案对消息进行存储(主要的处理时间耗费在这里)。为了提升ActiveMQ在接受PERSISTENT Meaage时的性能,ActiveMQ容许开发人员听从JMS API中的设置方式,为消息发送端在发送PERSISTENT Meaage时提供 异步方式

......
// 使用异步传输
// 上文已经说过,若是发送的是NON_PERSISTENT Message
// 那么默认就是异步方式
connectionFactory.setUseAsyncSend(true);
......
一旦您进行了这样的设置,就须要设置回执窗口:

......
// 一样设置消息发送者在累计发送102400byte大小的消息后
// 等待服务端进行回执,以便肯定以前发送的消息是否被正确处理
// 肯定服务器端是否产生了过量的消息堆积,须要减慢消息生产端的生产速度
connectionFactory.setProducerWindowSize(102400);
......

2. 持久化订阅和非持久化订阅

持续订阅和非持续订阅,是针对“订阅-发布”模式的细分处理策略,在JMS规范中的标准称呼是:Durable-SubscribersNon-Durable Subscribers
Durable-Subscribers是指在“订阅-发布”模式下,即便标记为Durable-Subscribers的订阅者下线了(多是由于订阅者宕机,也多是由于这个订阅者故意下线),“订阅-发布”模式的Topic队列也要保存这些消息(视消息不一样的持久化策略影响,保存机制不同),直到下次这个被标记为Durable-Subscribers的订阅者从新上线,并正确处理这条消息为止。换句话说,标记为Durable-Subscribers的订阅者是否能得到某条消息,和它是否曾经下线没有任何关系。
Non-Durable Subscribers是指在“订阅-发布”模式下,“订阅-发布”模式的Topic队列不用为这些已经下线的订阅者保留消息。当后者将消息按照既定的广播规则发送给当前在线的订阅者后,消息就能够被标记为“处理完成”。


3. ActiveMQ的存储机制

ActiveMQ 在 队列中存储 Message 时,采用先进先出顺序(FIFO)存储。同一时间一个消息被分派给单个消费者,且只有当 Message 被消费并确认时,它才能从存储中删除。

对于持久化订阅者来讲,每一个消费者得到 Message 的副本。为了节省存储空间,Provider 仅存储消息的一个副本持久化订阅者维护了指向下一个 Message 的指针,并将其副本分派给消费者。以这种方式实现消息存储,由于每一个持久化订阅者可能以不一样的速率消费 Message,或者它们可能不是所有同时运行。此外,因每一个 Message 可能存在多个消费者,因此在它被成功地传递给全部持久化订阅者以前,不能从存储中删除。

关于持久化和消息的保留见下表:

消息类型 是否持久化 是否有Durable订阅者 消费者延迟启动时,消息是否保留 Broker重启时,消息是否保留
Queue N - Y N
Queue Y - Y Y
Topic N N N N
Topic N Y Y N
Topic Y N N N
Topic Y Y Y Y

ActiveMQ有四种存储器,下面分别介绍和分析各自的特色和优缺点。

一、KahaDB message store

是ActiveMQ的默认以及推荐的存储器,特色是基于文件、支持事务日志、可靠、可扩展、速度快等。重点讨论一下后两点。

KahaDB主要元素包括:一个内存Metadata Cache用来在内存中检索消息的存储位置、若干用于记录消息内容的Data log文件、一个在磁盘上检索消息存储位置的Metadata Store、还有一个用于在系统异常关闭后恢复Btree结构的redo文件。

这里写图片描述

a. 可扩展体如今KahaDB支持其余三种存储器的外接扩展,也就是说能够同时用不止一种,这样能够取长补短,适合更广的应用场景,达到性能最佳。
b. 速度快:(1)快速的事务日志;(2)高度优化的消息ID索引;(3)在内存中的消息缓存。具体分析,消息直接添加在当前日志文件的尾部,因此存的快(相似Redis的Aof);用一个索引文件存储全部的destination,可谓高度优化;支持内存缓存也是必然,但在缓存回复策略上不如内存存储器。

<broker brokerName="broker" persistent="true" useShutdownHook="false">
        <persistenceAdapter>
                <kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="16mb"/>
        </persistenceAdapter>
</broker>

二、 AMQ message store

在基于文件、支持事务方面和KahaDB相似。不一样之处以下:
优势:索引用的是hashbin(哈希桶,没有查到权威定义,可理解为哈希表),天然比KahaDB的Btree索引要快,而且磁盘读写用的是nio,速度也快,因此用于消息吞吐量要求比较大的时候是最佳选择。(有的人把吞吐量理解成消息总数量其实不正确,应该是消息出入队的速率。)
缺点:对于每一个destination都要建一个索引,因此不适于不少destination并发的场合,而这偏偏是KahaDB的优点,它能够支持最大10000个queue的同时等待。(AMQ为每一个索引使用两个分开的文件,而且每一个 Destination 都有一个索引,因此当你打算在代理中使用数千个队列的时候,不该该使用它。)

<persistenceAdapter>
        <amqPersistenceAdapter
                directory="${activemq.data}/kahadb"
                syncOnWrite="true"
                indexPageSize="16kb"
                indexMaxBinSize="100"
                maxFileLength="10mb" />
</persistenceAdapter>

三、 JDBC message store

默认的JDBC驱动是ApacheDerby,同时支持MySQL、PostgreSQL、Oracle、SQLServer、Sybase、Informix、MaxDB等主流的关系数据库。用三张表结构来存储消息,分别是ACTIVEMQ_MSGSACTIVEMQ_ACKSACTIVEMQ_LOCK。第二张表外键关联到第一张表,共同存储消息,第三张表用于锁定保证只有一个broker实例能够访问数据库。选择关系型数据库,一般的缘由是企业已经具有了管理关系型数据的专长,可是它在性能上绝对不优于上述消息存储实现

<beans>
        <broker brokerName="test-broker" persistent="true" xmlns="http://activemq.apache.org/schema/core">
                <persistenceAdapter>
                        <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
                </persistenceAdapter>
        </broker>
        <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
                <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
                <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
                <property name="username" value="activemq"/>
                <property name="password" value="activemq"/>
                <property name="maxActive" value="200"/>
                <property name="poolPreparedStatements" value="true"/>
        </bean>
</beans>

四、 Memory message store

用于实时消息的缓存,只针对非持久订阅的消费者提供了5种订阅恢复策略,能够极大程度加强非持久订阅的可用性。也就是说对于持久订阅的消费者是用不到内存存储的。

<broker brokerName="test-broker" persistent="false" xmlns="http://activemq.apache.org/schema/core">
        <transportConnectors>
                <transportConnector uri="tcp://localhost:61635"/>
        </transportConnectors>
</broker>

5.  LevelDB方式

从ActiveMQ 5.6版本以后,又推出了LevelDB的持久化引擎。
目前默认的持久化方式仍然是KahaDB,不过LevelDB持久化性能高于KahaDB,多是之后的趋势。
在ActiveMQ 5.9版本提供了基于LevelDB和Zookeeper的数据复制方式,用于Master-slave方式的首选数据复制方案。

5、ActiveMQ的消息传输机制

1. 总体架构

Producer客户端使用来发送消息的, Consumer客户端用来消费消息;它们的协同中心就是ActiveMQ broker,broker也是让producer和consumer调用过程解耦的工具,最终实现了异步RPC/数据交换的功能。随着ActiveMQ的不断发展,支持了愈来愈多的特性,也解决开发者在各类场景下使用ActiveMQ的需求。好比producer支持异步调用;使用flow control机制让broker协同consumer的消费速率;consumer端可使用prefetchACK来最大化消息消费的速率;提供"重发策略"等来提升消息的安全性等。一条消息的生命周期以下:


图片中简单的描述了一条消息的生命周期,不过在不一样的架构环境中,message的流动行可能更加复杂.将在稍后有关broker的架构中详解..一条消息从producer端发出以后,一旦被broker正确保存,那么它将会被consumer消费,而后ACK,broker端才会删除;不过当消息过时或者存储设备溢出时,也会终结它。


这是一张很复杂,并且有些凌乱的图片;这张图片中简单的描述了:1)producer端如何发送消息 2) consumer端如何消费消息 3) broker端如何调度。

2. optimizeACK

 "可优化的ACK",这是ActiveMQ对于consumer在消息消费时,对消息ACK的优化选项,也是consumer端最重要的优化参数之一,你能够经过以下方式开启:

1) 在brokerUrl中增长以下查询字符串: 

String brokerUrl = "tcp://localhost:61616?" +   
                   "jms.optimizeAcknowledge=true" +   
                   "&jms.optimizeAcknowledgeTimeOut=30000" +   
                   "&jms.redeliveryPolicy.maximumRedeliveries=6";  
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
 2) 在 destinationUri中,增长以下查询字符串:

String queueName = "test-queue?customer.prefetchSize=100";  
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
Destination queue = session.createQueue(queueName);
咱们须要在 brokerUrl指定 optimizeACK选项,在 destinationUri中指定 prefetchSize(预获取)选项,其中brokerUrl参数选项是 全局的,即当前factory下全部的connection/session/consumer都会默认使用这些值;而destinationUri中的选项, 只会在使用此destination的consumer实例中有效;若是同时指定, brokerUrl中的参数选项值将会 被覆盖

optimizeAck表示是否开启“优化ACK”,只有在为true的状况下,prefetchSize(下文中将会简写成prefetch)以及optimizeAcknowledgeTimeout参数才会有意义。此处须要注意"optimizeAcknowledgeTimeout"选项只能在brokerUrl中配置。
prefetch值建议在destinationUri中指定,由于在brokerUrl中指定比较繁琐;

在brokerUrl中,queuePrefetchSize和topicPrefetchSize都须要单独设定:

"&jms.prefetchPolicy.queuePrefetch=12&jms.prefetchPolicy.topicPrefetch=12"
等来逐个指定。

2.1 prefetchACK和prefetch

若是prefetchACKtrue,那么prefetch必须大于0;当prefetchACKfalse时,你能够指定prefetch为0以及任意大小的正数。

  • 1. 当prefetch=0是,表示consumer将使用PULL(拉取)的方式从broker端获取消息,broker端将不会主动push消息给client端,直到client端发送PullCommand时;
  • 2. 当prefetch>0时,就开启了broker push模式,此后只要当client端消费且ACK了必定的消息以后,会当即push给client端多条消息。

 
当consumer端使用receive()方法同步获取消息时,prefetch能够为0和任意正值:

  • 1. 当prefetch=0时,那么receive()方法将会首先发送一个PULL指令并阻塞,直到broker端返回消息为止,这也意味着消息只能逐个获取(相似于Request<->Response),这也是Activemq中PULL消息模式;
  • 2. 当prefetch > 0时,broker端将会批量push给client 必定数量的消息(<= prefetch),client端会把这些消息(unconsumedMessage)放入到本地的队列中,只要此队列有消息,那么receive方法将会当即返回,当必定量的消息ACK以后,broker端会继续批量push消息给client端。

当consumer端使用MessageListener异步获取消息时,这就须要开发设定的prefetch值必须 >=1,即至少为1;在异步消费消息模式中,设定prefetch=0,是相悖的,也将得到一个Exception。

2.2 redelivery

此外,咱们还能够brokerUrl中配置“redelivery”策略,好比当一条消息处理异常时,broker端能够重发的最大次数;和下文中提到REDELIVERED_ACK_TYPE互相协同。

当消息须要broker端重发时,consumer会首先在本地的“deliveredMessage队列”(Consumer已经接收但还未确认的消息队列)删除它,而后向broker发送“REDELIVERED_ACK_TYPE”类型的确认指令,broker将会把指令中指定的消息从新添加到pendingQueue(亟待发送给consumer的消息队列)中,直到合适的时机,再次push给client。

2.3 optimizeACK和prefetch模型

    到目前为止,或许你知道了optimizeACK和prefeth的大概意义,不过咱们可能还会有些疑惑!!optimizeACK和prefetch配合,将会达成一个高效的消息消费模型批量获取消息,并“延迟”确认(ACK)

prefetch表达了“批量获取”消息的语义,broker端主动的批量push多条消息给client端,总比client屡次发送PULL指令而后broker返回一条消息的方式要优秀不少,它不只减小了client端在获取消息时阻塞的次数和阻塞的时间,还可以大大的减小网络开支optimizeACK表达了“延迟确认”的语义(ACK时机),client端在消费消息后暂且不发送ACK,而是把它缓存下来(pendingACK),等到这些消息的条数达到必定阀值时,只须要经过一个ACK指令把它们所有确认;这比对每条消息都逐个确认,在性能上要提升不少。因而可知,prefetch优化了消息传送的性能,optimizeACK优化了消息确认的性能

2.4 optimizeACK和prefetch模型的例外状况


consumer端消息消费的速率很高(相对于producer生产消息),并且消息的数量也很大时(好比消息源源不断的生产),咱们使用optimizeACK + prefetch将会极大的提高consumer的性能。不过反过来:
    1) 若是consumer端消费速度很慢(对消息的处理是耗时的),过大的prefetchSize,并不能有效的提高性能,反而不利于consumer端的负载均衡(只针对queue);按照良好的设计准则,当consumer消费速度很慢时,咱们一般会部署多个consumer客户端,并使用较小的prefetch,同时关闭optimizeACK,可让消息在多个consumer间“负载均衡”(即均匀的发送给每一个consumer);若是较大的prefetchSize,将会致使broker一次性push给client大量的消息,可是这些消息须要好久才能ACK(消息积压),并且在client故障时,还会致使这些消息的重发。
 
    2) 若是consumer端消费速度很快,可是producer端生成消息的速率较慢,好比生产者10秒钟生成10条消息,可是consumer一秒就能消费完毕,并且咱们还部署了多个consumer!!这种场景下,建议开启optimizeACK,可是须要设置的prefetchSize不能过大;这样能够保证每一个consumer都能有"活干",不然将会出现一个consumer很是忙碌,可是其余consumer几乎收不到消息。
 
    3) 若是消息很重要,特别是不肯意接收到”redelivery“的消息,那么咱们须要将optimizeACK=false,prefetchSize=1
 
    既然optimizeACK是”延迟“确认,那么就引入一种潜在的风险:在消息被消费以后尚未来得及确认时,client端发生故障,那么这些消息就有可能会被从新发送给其余consumer,那么这种风险就须要client端可以容忍“重复”消息

2.5 定制prefetchSize

    prefetch值默认为1000,固然这个值可能在不少场景下是偏大的;咱们暂且不考虑ACK模式,一般状况下,咱们只须要简单的统计出单个consumer每秒的最大消费消息数便可,好比一个consumer每秒能够处理100个消息,咱们指望consumer端每2秒确认一次,那么咱们的prefetchSize能够设置为100 * 2 /0.65大概为300。不管如何设定此值,client持有的消息条数最大为:prefetch + “DELIVERED_ACK_TYPE消息条数”(DELIVERED_ACK_TYPE参见下文)
 
即便当optimizeACK为true,也只会当session的ACK模式为AUTO_ACKNOWLEDGE时才会生效,即在其余类型的ACK模式时consumer端仍然不会“延迟确认”,即:
consumer.optimizeAck = connection.optimizeACK && session.isAutoAcknowledge()
consumer.optimizeACK有效时,若是客户端已经消费但还没有确认的消息(deliveredMessage) 达到prefetch * 0.65,consumer端将会自动进行ACK;同时若是离上一次ACK的时间间隔,已经超过" optimizeAcknowledgeTimout"毫秒,也会致使自动进行ACK。
 
    此外简单的补充一下,批量确认消息时,只须要在ACK指令中指明“ firstMessageId”和“ lastMessageId”便可,即消息区间,那么broker端就知道此consumer(根据consumerId识别)须要确认哪些消息。


3. ACK模式与类型介绍

3.1 ACK类型

JMS API中约定了Client端可使用四种ACK模式,在javax.jms.Session接口中:

  • AUTO_ACKNOWLEDGE = 1          自动确认
  • CLIENT_ACKNOWLEDGE = 2        客户端手动确认   
  • DUPS_OK_ACKNOWLEDGE = 3    自动批量确认
  • SESSION_TRANSACTED = 0         事务提交并确认
此外AcitveMQ补充了一个自定义的ACK模式:
  • INDIVIDUAL_ACKNOWLEDGE = 4    单条消息确认

ACK模式描述了Consumer与broker确认消息的方式(时机),好比当消息被Consumer接收以后,Consumer将在什么时候确认消息。对于broker而言,只有接收到ACK指令,才会认为消息被正确的接收或者处理成功了,经过ACK,能够在consumer(/producer)与Broker之间创建一种简单的“担保”机制. 

AUTO_ACKNOWLEDGE

自动确认,这就意味着消息的确认时机将有consumer择机确认."择机确认"彷佛充满了不肯定性,这也意味着,开发者必须明确知道"择机确认"的具体时机,不然将有可能致使消息的丢失,或者消息的重复接收.那么在ActiveMQ中,AUTO_ACKNOWLEDGE是如何运做的呢?
    1) 对于consumer而言,optimizeAcknowledge属性只会在AUTO_ACK模式下有效。
    2) 其中DUPS_ACKNOWLEGE也是一种潜在的AUTO_ACK,只是确认消息的条数和时间上有所不一样
    3) 在“同步”(receive)方法返回message以前,会检测optimizeACK选项是否开启,若是没有开启,此单条消息将当即确认,因此在这种状况下,message返回以后,若是开发者在处理message过程当中出现异常,会致使此消息也不会redelivery,即"潜在的消息丢失";若是开启了optimizeACK,则会在unAck数量达到prefetch * 0.65时确认,固然咱们能够指定prefetchSize = 1来实现逐条消息确认
    4) 在"异步"(messageListener)方式中,将会首先调用listener.onMessage(message),此后再ACK,

若是onMessage方法异常,将致使client端补充发送一个ACK_TYPEREDELIVERED_ACK_TYPE确认指令;

若是onMessage方法正常,消息将会正常确认(STANDARD_ACK_TYPE)。此外须要注意,消息的重发次数是有限制的,每条消息中都会包含“redeliveryCounter”计数器,用来表示此消息已经被重发的次数,若是重发次数达到阀值,将会致使发送一个ACK_TYPE为POSION_ACK_TYPE确认指令,这就致使broker端认为此消息没法消费,此消息将会被删除或者迁移到"dead letter"通道中。
    
    所以当咱们使用messageListener方式消费消息时,一般建议在onMessage方法中使用try-catch,这样能够在处理消息出错时记录一些信息,而不是让consumer不断去重发消息;若是你没有使用try-catch,就有可能会由于异常而致使消息重复接收的问题,须要注意你的onMessage方法中逻辑是否可以兼容对重复消息的判断

CLIENT_ACKNOWLEDGE : 

客户端手动确认,这就意味着AcitveMQ将不会“自做主张”的为你ACK任何消息,开发者须要本身择机确认。在此模式下,开发者须要须要关注几个方法:

1) message.acknowledge(),

2) ActiveMQMessageConsumer.acknowledege(),

3) ActiveMQSession.acknowledge();

其1)和3)是等效的,将当前session中全部consumer中还没有ACK的消息都一块儿确认,2)只会对当前consumer中那些还没有确认的消息进行确认。开发者能够在合适的时机必须调用一次上述方法。为了不混乱,对于这种ACK模式下,建议一个session下只有一个consumer

咱们一般会在基于Group(消息分组)状况下会使用CLIENT_ACKNOWLEDGE,咱们将在一个group的消息序列接受完毕以后确认消息(组);不过当你认为消息很重要,只有当消息被正确处理以后才能确认时,也可使用此模式  。
若是开发者忘记调用acknowledge方法,将会致使当consumer重启后,会接受到重复消息,由于对于broker而言,那些还没有真正ACK的消息被视为“未消费”。

开发者能够在当前消息处理成功以后,当即调用message.acknowledge()方法来"逐个"确认消息,这样能够尽量的减小因网络故障而致使消息重发的个数;固然也能够处理多条消息以后,间歇性的调用acknowledge方法来一次确认多条消息,减小ack的次数来提高consumer的效率,不过这仍然是一个利弊权衡的问题。

除了message.acknowledge()方法以外,ActiveMQMessageConumser.acknowledge()ActiveMQSession.acknowledge()也能够确认消息,只不过前者只会确认当前consumer中的消息。其中sesson.acknowledge()和message.acknowledge()是等效的。

不管是“同步”/“异步”,ActiveMQ都不会发送STANDARD_ACK_TYPE,直到message.acknowledge()调用。若是在client端未确认的消息个数达到prefetchSize * 0.5时,会补充发送一个ACK_TYPE为DELIVERED_ACK_TYPE的确认指令,这会触发broker端能够继续push消息到client端。(参看PrefetchSubscription.acknwoledge方法)
 
在broker端,针对每一个Consumer,都会保存一个由于"DELIVERED_ACK_TYPE"而“拖延”的消息个数,这个参数为prefetchExtension,事实上这个值不会大于prefetchSize * 0.5,由于Consumer端会严格控制DELIVERED_ACK_TYPE指令发送的时机(参见ActiveMQMessageConsumer.ackLater方法),broker端经过“prefetchExtension”与prefetchSize互相配合,来决定即将push给client端的消息个数,count = prefetchExtension + prefetchSize - dispatched.size(),其中dispatched表示已经发送给client端可是尚未“STANDARD_ACK_TYPE”的消息总量;因而可知,在CLIENT_ACK模式下,足够快速的调用acknowledge()方法是决定consumer端消费消息的速率;若是client端由于某种缘由致使acknowledge方法未被执行,将致使大量消息不能被确认,broker端将不会push消息,事实上client端将处于“假死”状态,而没法继续消费消息。咱们要求client端在消费1.5*prefetchSize个消息以前,必须acknowledge()一次;一般咱们老是每消费一个消息调用一次,这是一种良好的设计。
 
此外须要额外的补充一下:全部ACK指令都是依次发送给broker端,在CLIET_ACK模式下,消息在交付给listener以前,都会首先建立一个DELIVERED_ACK_TYPE的ACK指令,直到client端未确认的消息达到"prefetchSize * 0.5"时才会发送此ACK指令,若是在此以前,开发者调用了acknowledge()方法,会致使消息直接被确认(STANDARD_ACK_TYPE)。broker端一般会认为“DELIVERED_ACK_TYPE”确认指令是一种“slow consumer”信号,若是consumer不能及时的对消息进行acknowledge而致使broker端阻塞,那么此consumer将会被标记为“slow”,此后queue中的消息将会转发给其余Consumer。
 
DUPS_OK_ACKNOWLEDGE : 

"消息可重复"确认,意思是此模式下,可能会出现重复消息,并非一条消息须要发送屡次ACK才行。它是一种潜在的"AUTO_ACK"确认机制,为批量确认而生,并且具备“延迟”确认的特色。

对于开发者而言,这种模式下的代码结构和AUTO_ACKNOWLEDGE同样,不须要像CLIENT_ACKNOWLEDGE那样调用acknowledge()方法来确认消息。
 
    1) 在ActiveMQ中,若是在Destination是Queue通道,咱们真的能够认为DUPS_OK_ACK就是“AUTO_ACK+ optimizeACK + (prefetch > 0)”这种状况,在确认时机上几乎彻底一致;此外在此模式下,若是prefetchSize =1 或者没有开启optimizeACK,也会致使消息逐条确认,从而失去批量确认的特性。
 
    2) 若是Destination为Topic,DUPS_OK_ACKNOWLEDGE才会产生JMS规范中诠释的意义,即不管optimizeACK是否开启,都会在消费的消息个数>=prefetch * 0.5时,批量确认(STANDARD_ACK_TYPE),在此过程当中,不会发送DELIVERED_ACK_TYPE的确认指令,这是1)和AUTO_ACK的最大的区别。
 
    这也意味着,当consumer故障重启后,那些还没有ACK的消息会从新发送过来
 
SESSION_TRANSACTED :

当session使用事务时,就是使用此模式。在事务开启以后,和session.commit()以前,全部消费的消息,要么所有正常确认,要么所有redelivery。这种严谨性,一般在基于GROUP(消息分组)或者其余场景下特别适合

在SESSION_TRANSACTED模式下,optimizeACK并不能发挥任何效果,由于在此模式下,optimizeACK会被强制设定为false,不过prefetch仍然能够决定DELIVERED_ACK_TYPE的发送时机

由于Session非线程安全,那么当前session下全部的consumer都会共享同一个transactionContext;同时建议,一个事务类型的Session中只有一个Consumer,以免rollback()或者commit()方法被多个consumer调用而形成的消息混乱。

当consumer接受到消息以后,首先检测TransactionContext是否已经开启,若是没有,就会开启并生成新的transactionId,并把信息发送给broker;此后将检测事务中已经消费的消息个数是否 >= prefetch * 0.5,若是大于则补充发送一“DELIVERED_ACK_TYPE”的确认指令;这时就开始调用onMessage()方法,若是是同步(receive),那么即返回message。上述过程,和其余确认模式没有任何特殊的地方。
当开发者决定事务能够提交时,必须调用session.commit()方法,commit方法将会致使当前session的事务中全部消息当即被确认;事务的确认过程当中,首先把本地的deliveredMessage队列中还没有确认的消息所有确认(STANDARD_ACK_TYPE);此后向broker发送transaction提交指令并等待broker反馈,若是broker端事务操做成功,那么将会把本地deliveredMessage队列清空,新的事务开始;若是broker端事务操做失败(此时broker已经rollback),那么对于session而言,将执行inner-rollback,这个rollback所作的事情,就是将当前事务中的消息清空并要求broker重发(REDELIVERED_ACK_TYPE),同时commit方法将抛出异常。
 
当session.commit方法异常时,对于开发者而言一般是调用session.rollback()回滚事务(事实上开发者不调用也没有问题),固然你能够在事务开始以后的任什么时候机调用rollback(),rollback意味着当前事务的结束,事务中全部的消息都将被重发。须要注意,不管是inner-rollback仍是调用session.rollback()而致使消息重发,都会致使message.redeliveryCounter计数器增长,最终都会受限于brokerUrl中配置的"jms.redeliveryPolicy.maximumRedeliveries",若是rollback的次数过多,而达到重发次数的上限时,消息将会被DLQ(dead letter)。
 
INDIVIDUAL_ACKNOWLEDGE : 

单条消息确认,这种确认模式,咱们不多使用,它的确认时机和CLIENT_ACKNOWLEDGE几乎同样,当消息消费成功以后,须要调用message.acknowledege来确认此消息(单条),而CLIENT_ACKNOWLEDGE模式先message.acknowledge()方法将致使整个session中全部消息被确认(批量确认)。
 

3.2 ACK类型

Client端指定了ACK模式,可是在Client与broker在交换ACK指令的时候,还须要告知ACK_TYPE,ACK_TYPE表示此确认指令的类型,不一样的ACK_TYPE将传递着消息的状态,broker能够根据不一样的ACK_TYPE对消息进行不一样的操做。
 
好比Consumer消费消息时出现异常,就须要向broker发送ACK指令,ACK_TYPE为"REDELIVERED_ACK_TYPE",那么broker就会从新发送此消息。在JMS API中并无定义ACT_TYPE,由于它一般是一种内部机制,并不会面向开发者。ActiveMQ中定义了以下几种ACK_TYPE(参看MessageAck类):
 
  • DELIVERED_ACK_TYPE = 0    消息"已接收",但还没有处理结束
  • STANDARD_ACK_TYPE = 2    "标准"类型,一般表示为消息"处理成功",broker端能够删除消息了
  • POSION_ACK_TYPE = 1    消息"错误",一般表示"抛弃"此消息,好比消息重发屡次后,都没法正确处理时,消息将会被删除或者DLQ(死信队列)
  • REDELIVERED_ACK_TYPE = 3    消息需"重发",好比consumer处理消息时抛出了异常,broker稍后会从新发送此消息
  • INDIVIDUAL_ACK_TYPE = 4    表示只确认"单条消息",不管在任何ACK_MODE下    
  • UNMATCHED_ACK_TYPE = 5    在Topic中,若是一条消息在转发给“订阅者”时,发现此消息不符合Selector过滤条件,那么此消息将 不会转发给订阅者,消息将会被存储引擎删除(至关于在Broker上确认了消息)。
    到目前为止,咱们已经清楚了大概的原理: Client端在不一样的ACK模式时,将意味着在不一样的时机发送ACK指令,每一个ACK Command中会包含ACK_TYPE,那么broker端就能够根据ACK_TYPE来决定此消息的后续操做. 接下来,咱们详细的分析ACK模式与ACK_TYPE.

3.3 ACK

咱们须要在建立Session时指定ACK模式,因而可知,ACK模式将是session共享的,意味着一个session下全部的 consumer都使用同一种ACK模式。在建立Session时,开发者不能指定除ACK模式列表以外的其余值。

若是此session为事务类型,用户指定的ACK模式将被忽略,而强制使用"SESSION_TRANSACTED"类型;

若是此session为非事务类型时,也将不能将 ACK模式设定为"SESSION_TRANSACTED",毕竟这是相悖的。


Consumer消费消息的风格有2种: 同步/异步。使用consumer.receive()就是同步,使用messageListener就是异步;在同一个consumer中,咱们不能同时使用这2种风格,好比在使用listener的状况下,当调用receive()方法将会得到一个Exception。两种风格下,消息确认时机有所不一样。

1. 同步消费机制

同步调用时,在消息从receive方法返回以前,就已经调用了ACK;所以若是Client端没有处理成功,此消息将丢失(可能重发,与ACK模式有关)。

Message message = sessionMessageQueue.dequeue();  
if(message != null){  
    ack(message);  
}  
return message

2. 异步消费机制

基于异步调用时,消息的确认是在onMessage方法返回以后,若是onMessage方法异常,会致使消息不能被ACK,会触发重发。

//基于listener  
Session session = connection.getSession(consumerId);  
sessionQueueBuffer.enqueue(message);  
Runnable runnable = new Ruannale(){  
    run(){  
        Consumer consumer = session.getConsumer(consumerId);  
        Message md = sessionQueueBuffer.dequeue();  
        try{  
            consumer.messageListener.onMessage(md);  
            ack(md);//  
        }catch(Exception e){  
            redelivery();//sometime,not all the time;  
    }  
}  
//session中将采起线程池的方式,分发异步消息  
//所以同一个session中多个consumer能够并行消费  
threadPool.execute(runnable);

6、ActiveMQ的事务机制

1. 消息生产者事务

JMS规范中支持带事务的消息,也就是说您能够启动一个事务(并由消息发送者的链接会话设置一个事务号Transaction ID),而后在事务中发送多条消息。这个事务提交前这些消息都不会进入队列(不管是Queue仍是Topic)。

不进入队列,并不表明JMS不会在事务提交前将消息发送给ActiveMQ服务端。 实际上这些消息都会发送给服务端,服务端发现这是一条带有Transaction ID的消息,就会将先把这条消息放置在“transaction store”区域中(而且带有redo日志,这样保证在收到rollback指令后能进行取消操做),等待这个Transaction ID被rollback或者commit。

一旦这个Transaction ID被commit,ActiveMQ才会依据自身设置的PERSISTENT Message处理规则或者NON_PERSISTENT Meaage处理规则,将Transaction ID对应的message进行入队操做(不管是Queue仍是Topic)。如下代码示例了如何在生产者端使用事务发送消息:

......
//进行链接
connection = connectionFactory.createQueueConnection();
connection.start();

//创建会话(设置一个带有事务特性的会话)
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//创建queue(固然若是有了就不会重复创建)
Queue sendQueue = session.createQueue("/test");
//创建消息发送者对象
MessageProducer sender = session.createProducer(sendQueue);

//发送(JMS是支持事务的)
for(int index = 0 ; index < 10 ; index++) {
    TextMessage outMessage = session.createTextMessage();
    outMessage.setText("这是发送的消息内容-------------------" + index);
    // 不管是NON_PERSISTENT message仍是PERSISTENT message
    // 都要在commit后才能真正的入队
    if(index % 2 == 0) {
        sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    } else {
        sender.setDeliveryMode(DeliveryMode.PERSISTENT);
    }

    // 没有commit的消息,也是要先发送给服务端的
    sender.send(outMessage);
}

session.commit();
......
在“connection.createSession”这个方法中一共有两个参数(这句代码在上文中已经出现过屡次)。第一个布尔型参数很好理解,就是标示这个链接会话是否启动事务;第二个整型参数标示了消息消费者的“应答模型”。

2. 消息消费者事务

JMS规范除了为消息生产者端提供事务支持之外,还为消费服务端准备了事务的支持。您能够经过在消费者端操做事务的commit和rollback方法,向服务器告知一组消息是否处理完成。采用事务的意义在于,一组消息要么被所有处理并确认成功,要么所有被回滚并从新处理。

......
//创建会话(采用commit方式确认一批消息处理完毕)
session = connection.createSession(true, Session.SESSION_TRANSACTED);
//创建Queue(固然若是有了就不会重复创建)
sendQueue = session.createQueue("/test");
//创建消息发送者对象
MessageConsumer consumer = session.createConsumer(sendQueue);
consumer.setMessageListener(new MyMessageListener(session));

......

class MyMessageListener implements MessageListener {
    private int number = 0;

    /**
     * 会话
     */
    private Session session;

    public MyMessageListener(Session session) {
        this.session = session;
    }

    @Override
    public void onMessage(Message message) {
        // 打印这条消息
        System.out.println("Message = " + message);
        // 若是条件成立,就向服务器确认这批消息处理成功
        // 服务器将从队列中删除这些消息
        if(number++ % 3 == 0) {
            try {
                this.session.commit();
            } catch (JMSException e) {
                e.printStackTrace(System.out);
            }
        }
    }
}
以上代码演示的是 消费者经过事务commit的方式,向服务器确认一批消息正常处理完成的方式。请注意代码示例中的“session = connection.createSession(true, Session.SESSION_TRANSACTED);”语句。第一个参数表示 链接会话启用事务支持;第二个参数表示 使用commit或者rollback的方式进行向服务器应答
这是调用commit的状况,那么若是调用rollback方法又会发生什么状况呢?调用rollback方法时,在rollback以前已处理过的消息(注意,并非全部预取的消息)将 从新发送一次到消费者端(发送给同一个链接会话)。而且消息中 redeliveryCounter(重发计数器)属性将会加1。请看以下所示的代码片断和运行结果:

@Override
public void onMessage(Message message) {
    // 打印这条消息
    System.out.println("Message = " + message);
    // rollback这条消息
    this.session.rollback();
}
以上代码片断中,咱们不停的回滚正在处理的这条消息,经过打印出来的信息能够看到,这条消息被不停的重发:

Message = ActiveMQTextMessage {...... redeliveryCounter = 0, text = 这是发送的消息内容-------------------20}
Message = ActiveMQTextMessage {...... redeliveryCounter = 1, text = 这是发送的消息内容-------------------20}
Message = ActiveMQTextMessage {...... redeliveryCounter = 2, text = 这是发送的消息内容-------------------20}
Message = ActiveMQTextMessage {...... redeliveryCounter = 3, text = 这是发送的消息内容-------------------20}
Message = ActiveMQTextMessage {...... redeliveryCounter = 4, text = 这是发送的消息内容-------------------20}
能够看到同一条记录被重复的处理,而且其中的redeliveryCounter属性不断累加。

7、ActiveMQ的重发和死信队列

消息处理失败后,不断的重发消息确定不是一个最好的处理办法:若是一条消息被不断的处理失败,那么最可能的状况就是这条消息承载的业务内容自己就有问题。那么不管重发多少次,这条消息仍是会处理失败。

为了解决这个问题,ActiveMQ中引入了“死信队列”(Dead Letter Queue)的概念。即一条消息再被重发了屡次后(默认为重发6次redeliveryCounter==6),将会被ActiveMQ移入“死信队列”。开发人员能够在这个Queue中查看处理出错的消息,进行人工干预。

默认状况下“死信队列”只接受PERSISTENT Message,若是NON_PERSISTENT Message超过了重发上限,将直接被删除。如下配置信息可让NON_PERSISTENT Message在超太重发上限后,也移入“死信队列”:

<policyEntry queue=">">  
    <deadLetterStrategy>  
        <sharedDeadLetterStrategy processNonPersistent="true" />  
    </deadLetterStrategy>  
</policyEntry>

上文提到的默认重发次数redeliveryCounter的上限也是能够进行设置的,为了保证消息异常状况下尽量小的影响消费者端的处理效率,实际工做中建议将这个上限值设置为3。缘由上文已经说过,若是消息自己的业务内容就存在问题,那么重发多少次也没有用。

RedeliveryPolicy redeliveryPolicy = connectionFactory.getRedeliveryPolicy();
// 设置最大重发次数
redeliveryPolicy.setMaximumRedeliveries(3);

实际上ActiveMQ的重发机制还有包括以上提到的rollback方式在内的多种方式:
1. 在支持事务的消费者链接会话中调用 rollback方法

2. 在支持事务的消费者链接会话中,使用commit方法明确告知服务器端消息已处理成功前,会话链接就终止了(最多是异常终止)

3. 在须要使用ACK模式的会话中,使用消息的acknowledge方式明确告知服务器端消息已处理成功前,会话链接就终止了(最多是异常终止)
可是以上几种重发机制有一些小小的差别,主要体如今redeliveryCounter属性的做用区域。简而言之,第一种方法redeliveryCounter属性的做用区域是本次链接会话,然后两种redeliveryCounter属性的做用区域是在整个ActiveMQ系统范围。

以上是这篇博文的主要内容,参考了不少文章,想要给出ActiveMQ的一个大概,在其消息协议上还不大清楚,对于消息机制略有涉及,下一篇准备总结下其部署和集群。