转载:架构设计:系统间通讯(21)——ActiveMQ的安装与使用html
转载:成小胖学习ActiveMQ·基础篇java
转载:ActiveMQ学习心得之ActiveMQ四种存储器分析mysql
转载:ActiveMQ(一)简介与架构sql
转载:ActiveMQ消息传送机制以及ACK机制详解数据库
转载:架构设计:系统间通讯(22)——提升ActiveMQ工做性能(上)apache
转载:架构设计:系统间通讯(23)——提升ActiveMQ工做性能(中)缓存
转载:架构设计:系统间通讯(24)——提升ActiveMQ工做性能(下)安全
ActiveMQ是Apache软件基金会的开源产品,支持AMQP协议、MQTT协议(和XMPP协议做用相似)、Openwire协议和Stomp协议等多种消息协议。而且ActiveMQ完整支持JMS API接口规范,Apache也提供多种其余语言的客户端,例如:C、C++、C#、Ruby、Perl。服务器
1. 安装和启动ActiveMQ网络
2. 消息生产者代码以下:
package com.ljq.durian.test.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息的生产者(发送者) * * @author Administrator * */ public class JMSProducer { public static void main(String[] args) { try { //第一步:创建ConnectionFactory工厂对象,须要填入用户名、密码、以及要链接的地址,均使用默认便可,默认端口为"tcp://localhost:61616" ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "failover:(tcp://localhost:61616)?Randomize=false"); //第二步:经过ConnectionFactory工厂对象咱们建立一个Connection链接,而且调用Connection的start方法开启链接,Connection默认是关闭的。 Connection connection = connectionFactory.createConnection(); connection.start(); //第三步:经过Connection对象建立Session会话(上下文环境对象),用于接收消息,参数配置1为是否启用是事务,参数配置2为签收模式,通常咱们设置自动签收。 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //第四步:经过Session建立Destination对象,指的是一个客户端用来指定生产消息目标和消费消息来源的对象,在PTP模式中,Destination被称做Queue即队列;在Pub/Sub模式,Destination被称做Topic即主题。在程序中可使用多个Queue和Topic。 Destination destination = session.createQueue("HelloWorld"); //第五步:咱们须要经过Session对象建立消息的发送和接收对象(生产者和消费者)MessageProducer/MessageConsumer。 MessageProducer producer = session.createProducer(null); //第六步:咱们可使用MessageProducer的setDeliveryMode方法为其设置持久化特性和非持久化特性(DeliveryMode),咱们稍后详细介绍。 //producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //第七步:最后咱们使用JMS规范的TextMessage形式建立数据(经过Session对象),并用MessageProducer的send方法发送数据。同理客户端使用receive方法进行接收数据。最后不要忘记关闭Connection链接。 for(int i = 0 ; i < 10 ; i ++){ TextMessage msg = session.createTextMessage("我是消息内容" + i); // 第一个参数目标地址 // 第二个参数 具体的数据信息 // 第三个参数 传送数据的模式 // 第四个参数 优先级 // 第五个参数 消息的过时时间 producer.send(destination, msg, DeliveryMode.NON_PERSISTENT, 0 , 1000L); System.out.println("发送消息:" + msg.getText()); session.commit(); //启用事务时记得提交事务,否则消费端接收不到消息 Thread.sleep(1000); } if(connection != null){ connection.close(); } } catch (Exception e) { e.printStackTrace(); } } }
3. 消息消费者代码以下:
package com.ljq.durian.test.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息的消费者(接受者) * * @author Administrator * */ public class JMSConsumer { public static void main(String[] args) { try { //第一步:创建ConnectionFactory工厂对象,须要填入用户名、密码、以及要链接的地址,均使用默认便可,默认端口为"tcp://localhost:61616" ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "failover:(tcp://localhost:61616)?Randomize=false"); //第二步:经过ConnectionFactory工厂对象咱们建立一个Connection链接,而且调用Connection的start方法开启链接,Connection默认是关闭的。 Connection connection = connectionFactory.createConnection(); connection.start(); //第三步:经过Connection对象建立Session会话(上下文环境对象),用于接收消息,参数配置1为是否启用是事务,参数配置2为签收模式,通常咱们设置自动签收。 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); //第四步:经过Session建立Destination对象,指的是一个客户端用来指定生产消息目标和消费消息来源的对象,在PTP模式中,Destination被称做Queue即队列;在Pub/Sub模式,Destination被称做Topic即主题。在程序中可使用多个Queue和Topic。 Destination destination = session.createQueue("HelloWorld"); //第五步:经过Session建立MessageConsumer MessageConsumer consumer = session.createConsumer(destination); while(true){ TextMessage msg = (TextMessage)consumer.receive(); if(msg == null) { break; } System.out.println("收到的内容:" + msg.getText()); } } catch (Exception e) { e.printStackTrace(); } } }
4. 启动消息生产者产生消息,可在ActiveMQ的网页管理中看到消息的状态。
5. 启动消息消费者消费消息,可在ActiveMQ的网页管理中看到消息的状态。
网上例子较多,公司不能传图,留待后补。
ActiveMQ主要涉及到5个方面:
1. 传输协议:消息之间的传递,无疑须要协议进行沟通,启动一个ActiveMQ打开了一个监听端口, ActiveMQ提供了普遍的链接模式,其中主要包括SSL、STOMP、XMPP;ActiveMQ默认的使用的协议是openWire,端口号:61616;
2. 消息域:ActiveMQ主要包含Point-to-Point (点对点),Publish/Subscribe Model (发布/订阅者),其中在Publich/Subscribe 模式下又有Nondurable subscription和durable subscription (持久化订阅)2种消息处理方式
3. 消息存储:在消息传递过程当中,部分重要的消息可能须要存储到数据库或文件系统中,当中介崩溃时,信息不回丢失
4. Cluster (集群): 最多见到 集群方式包括network of brokers和Master Slave;
5. Monitor (监控) :ActiveMQ通常由jmx来进行监控
默认配置下的ActiveMQ只适合学习代码而不适用于实际生产环境,ActiveMQ的性能须要经过配置挖掘,其性能提升包括代码级性能、规则性能、存储性能、网络性能以及多节点协同方法(集群方案),因此咱们优化ActiveMQ的中心思路也是这样的:
1. 优化ActiveMQ单个节点的性能,包括NIO模型选择和存储选择。
2. 配置ActiveMQ的集群(ActiveMQ的高性能和高可用须要经过集群表现出来)。
点对点模式下一条消息将会发送给一个消息消费者,若是当前Queue没有消息消费者,消息将进行存储。
点对点方式使用生产者-消费者模式,生产者代码以下:
public Producer() throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(null); } public void sendMessage() throws JMSException { for(int i = 0; i < jobs.length; i++) { String job = jobs[i]; Destination destination = session.createQueue("JOBS." + job); Message message = session.createObjectMessage(i); System.out.println("Sending: id: " + ((ObjectMessage)message).getObject() + " on queue: " + destination); producer.send(destination, message); } } public static void main(String[] args) throws JMSException { Producer producer = new Producer(); for(int i = 0; i < 10; i++) { producer.sendMessage(); System.out.println("Produced " + i + " job messages"); try { Thread.sleep(1000); } catch (InterruptedException x) { e.printStackTrace(); } } producer.close(); }
public Consumer() throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } public static void main(String[] args) throws JMSException { Consumer consumer = new Consumer(); for (String job : consumer.jobs) { Destination destination = consumer.getSession().createQueue("JOBS." + job); MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination); messageConsumer.setMessageListener(new Listener(job)); } } public Session getSession() { return session; }具体注册的对象须要实现MessageListener接口:
import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.ObjectMessage; public class Listener implements MessageListener { private String job; public Listener(String job) { this.job = job; } public void onMessage(Message message) { try { //do something here System.out.println(job + " id:" + ((ObjectMessage)message).getObject()); } catch (Exception e) { e.printStackTrace(); } } }
“发布-订阅”模式下,消息会被复制多份,分别发送给全部“订阅”者。
Publisher
publisher是属于发布信息的一方,它经过定义一个或者多个topic,而后给这些topic发送消息。
public Publisher() throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); try { connection.start(); } catch (JMSException jmse) { connection.close(); throw jmse; } session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(null); }咱们按照前面说的流程定义了基本的connectionFactory, connection, session, producer。这里代码就是主要实现初始化的效果。接着,咱们须要定义一系列的topic让全部的consumer来订阅,设置topic的代码以下:
protected void setTopics(String[] stocks) throws JMSException { destinations = new Destination[stocks.length]; for(int i = 0; i < stocks.length; i++) { destinations[i] = session.createTopic("STOCKS." + stocks[i]); } }这里destinations是一个内部定义的成员变量Destination[]。这里咱们总共定义了的topic数取决于给定的参数stocks。在定义好topic以后咱们要给这些指定的topic发消息,具体实现的代码以下:
protected void sendMessage(String[] stocks) throws JMSException { for(int i = 0; i < stocks.length; i++) { Message message = createStockMessage(stocks[i], session); System.out.println("Sending: " + ((ActiveMQMapMessage)message).getContentMap() + " on destination: " + destinations[i]); producer.send(destinations[i], message); } } protected Message createStockMessage(String stock, Session session) throws JMSException { MapMessage message = session.createMapMessage(); message.setString("stock", stock); message.setDouble("price", 1.00); message.setDouble("offer", 0.01); message.setBoolean("up", true); return message; }在sendMessage方法里咱们遍历每一个topic,而后给每一个topic发送定义的Message消息。在定义好前面发送消息的基础以后,咱们调用他们的代码就很简单了:
public static void main(String[] args) throws JMSException { if(args.length < 1) throw new IllegalArgumentException(); // Create publisher Publisher publisher = new Publisher(); // Set topics publisher.setTopics(args); for(int i = 0; i < 10; i++) { publisher.sendMessage(args); System.out.println("Publisher '" + i + " price messages"); try { Thread.sleep(1000); } catch(InterruptedException e) { e.printStackTrace(); } } // Close all resources publisher.close(); }调用他们的代码就是咱们遍历全部topic,而后经过sendMessage发送消息。在发送一个消息以后先sleep1秒钟。要注意的一个地方就是咱们使用完资源以后必需要使用close方法将这些资源关闭释放。close方法关闭资源的具体实现以下:
public void close() throws JMSException { if (connection != null) { connection.close(); } }Consumer
public Consumer() throws JMSException { factory = new ActiveMQConnectionFactory(brokerURL); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); }接收和处理消息的方法有两种,分为同步和异步的,通常同步的方式咱们是经过MessageConsumer. receive()方法来处理接收到的消息。而异步的方法则是经过注册一个 MessageListener的方法,使用MessageConsumer.setMessageListener()。这里咱们采用异步的方式实现:
public static void main(String[] args) throws JMSException { Consumer consumer = new Consumer(); for (String stock : args) { Destination destination = consumer.getSession().createTopic("STOCKS." + stock); MessageConsumer messageConsumer = consumer.getSession().createConsumer(destination); messageConsumer.setMessageListener(new Listener()); } } public Session getSession() { return session; }在前面的代码里咱们先找到一样的topic,而后遍历全部的topic去得到消息。对于消息的处理咱们专门经过Listener对象来负责。Listener对象的职责很简单,主要就是处理接收到的消息:
public class Listener implements MessageListener { public void onMessage(Message message) { try { MapMessage map = (MapMessage)message; String stock = map.getString("stock"); double price = map.getDouble("price"); double offer = map.getDouble("offer"); boolean up = map.getBoolean("up"); DecimalFormat df = new DecimalFormat( "#,###,###,##0.00" ); System.out.println(stock + "\t" + df.format(price) + "\t" + df.format(offer) + "\t" + (up?"up":"down")); } catch (Exception e) { e.printStackTrace(); } } }它实现了MessageListener接口,里面的onMessage方法就是在接收到消息以后会被调用的方法。
和前面两种方式比较起来,request-response的通讯方式很常见,可是不是默认提供的一种模式。在前面的两种模式中都是一方负责发送消息而另一方负责处理。而咱们实际中的不少应用至关于一种一应一答的过程,须要双方都能给对方发送消息。因而请求-应答的这种通讯方式也很重要。它也应用的很广泛。
请求-应答方式并非JMS规范系统默认提供的一种通讯方式,而是经过在现有通讯方式的基础上稍微运用一点技巧实现的。下图是典型的请求-应答方式的交互过程:
在JMS里面,若是要实现请求/应答的方式,能够利用JMSReplyTo和JMSCorrelationID消息头来将通讯的双方关联起来。另外,QueueRequestor和TopicRequestor可以支持简单的请求/应答过程。如今,若是咱们要实现这么一个过程,在发送请求消息而且等待返回结果的client端的流程以下:
// client side Destination tempDest = session.createTemporaryQueue(); MessageConsumer responseConsumer = session.createConsumer(tempDest); ... // send a request.. message.setJMSReplyTo(tempDest) message.setJMSCorrelationID(myCorrelationID); producer.send(message);client端建立一个 临时队列并在发送的消息里 指定了发送返回消息的destination以及correlationID。那么在处理消息的server端获得这个消息后就知道该发送给谁了。Server端的大体流程以下:
public void onMessage(Message request) { Message response = session.createMessage(); response.setJMSCorrelationID(request.getJMSCorrelationID()) producer.send(request.getJMSReplyTo(), response) }这里咱们是用server端注册 MessageListener,经过设置返回信息的 CorrelationID和 JMSReplyTo将信息返回。以上就是发送和接收消息的双方的大体程序结构。具体的实现代码以下:
Client侧实现
public Client() { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection; try { connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(transacted, ackMode); Destination adminQueue = session.createQueue(clientQueueName); //Setup a message producer to send message to the queue the server is consuming from this.producer = session.createProducer(adminQueue); this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //Create a temporary queue that this client will listen for responses on then create a consumer //that consumes message from this temporary queue...for a real application a client should reuse //the same temp queue for each message to the server...one temp queue per client Destination tempDest = session.createTemporaryQueue(); MessageConsumer responseConsumer = session.createConsumer(tempDest); //This class will handle the messages to the temp queue as well responseConsumer.setMessageListener(this); //Now create the actual message you want to send TextMessage txtMessage = session.createTextMessage(); txtMessage.setText("MyProtocolMessage"); //Set the reply to field to the temp queue you created above, this is the queue the server //will respond to txtMessage.setJMSReplyTo(tempDest); //Set a correlation ID so when you get a response you know which sent message the response is for //If there is never more than one outstanding message to the server then the //same correlation ID can be used for all the messages...if there is more than one outstanding //message to the server you would presumably want to associate the correlation ID with this //message somehow...a Map works good String correlationId = this.createRandomString(); txtMessage.setJMSCorrelationID(correlationId); this.producer.send(txtMessage); } catch (JMSException e) { //Handle the exception appropriately } }这里的代码除了初始化构造函数里的参数还同时设置了 两个destination, 一个是本身要发送消息出去的destination,在这一句设置:
session.createProducer(adminQueue);另一个是本身要接收的消息destination, 经过这两句指定了要接收消息的目的地:
Destination tempDest = session.createTemporaryQueue(); responseConsumer = session.createConsumer(tempDest);
这里是用的一个临时队列。在前面指定了返回消息的通讯队列以后,咱们须要通知server端知道发送返回消息给哪一个队列。因而
txtMessage.setJMSReplyTo(tempDest);
指定了这一部分,同时:
txtMessage.setJMSCorrelationID(correlationId);
方法主要是为了保证每次发送回来请求的server端可以知道对应的是哪一个请求。这里一个请求和一个应答是至关于对应一个相同的序列号同样。
由于client端在发送消息以后还要接收server端返回的消息,因此它也要实现一个消息receiver的功能。这里采用实现MessageListener接口的方式:
public void onMessage(Message message) { String messageText = null; try { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; messageText = textMessage.getText(); System.out.println("messageText = " + messageText); } } catch (JMSException e) { //Handle the exception appropriately } }Server侧实现
public void onMessage(Message message) { try { TextMessage response = this.session.createTextMessage(); if (message instanceof TextMessage) { TextMessage txtMsg = (TextMessage) message; String messageText = txtMsg.getText(); response.setText(this.messageProtocol.handleProtocolMessage(messageText)); } //Set the correlation ID from the received message to be the correlation id of the response message //this lets the client identify which message this is a response to if it has more than //one outstanding message to the server response.setJMSCorrelationID(message.getJMSCorrelationID()); //Send the response to the Destination specified by the JMSReplyTo field of the received message, //this is presumably a temporary queue created by the client this.replyProducer.send(message.getJMSReplyTo(), response); } catch (JMSException e) { //Handle the exception appropriately } }在replyProducer.send()方法里,message.getJMSReplyTo()就获得了要发送消息回去的destination。另外,设置这些发送返回信息的replyProducer的信息主要在构造函数相关的方法里实现了:
public Server() { try { //This message broker is embedded BrokerService broker = new BrokerService(); broker.setPersistent(false); broker.setUseJmx(false); broker.addConnector(messageBrokerUrl); broker.start(); } catch (Exception e) { //Handle the exception appropriately } //Delegating the handling of messages to another class, instantiate it before setting up JMS so it //is ready to handle messages this.messageProtocol = new MessageProtocol(); this.setupMessageQueueConsumer(); } private void setupMessageQueueConsumer() { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(messageBrokerUrl); Connection connection; try { connection = connectionFactory.createConnection(); connection.start(); this.session = connection.createSession(this.transacted, ackMode); Destination adminQueue = this.session.createQueue(messageQueueName); //Setup a message producer to respond to messages from clients, we will get the destination //to send to from the JMSReplyTo header field from a Message this.replyProducer = this.session.createProducer(null); this.replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //Set up a consumer to consume messages off of the admin queue MessageConsumer consumer = this.session.createConsumer(adminQueue); consumer.setMessageListener(this); } catch (JMSException e) { //Handle the exception appropriately } }整体来讲,整个的交互过程并不复杂,只是比较繁琐。
对于请求/应答的方式来讲,这种典型交互的过程就是Client端在设定正常发送请求的Queue同时也设定一个临时的Queue。同时在要发送的message里头指定要返回消息的destination以及CorelationID,这些就比如是一封信里面所带的回执。根据这个信息服务器才知道怎么给客户端回信。
对于Server端来讲则要额外建立一个producer,在处理接收到消息的方法里再利用producer将消息发回去。这一系列的过程看起来很像http协议里面请求-应答的方式,都是一问一答。
JMS中对非持久化消息和非持久化消息的称呼分别是:NON_PERSISTENTMessage和PERSISTENT Meaage。它们指的是消息在任何一种“发送-接受”模式下(“订阅-发布”模式和“负载均衡模式”),是否进行持久化存储。
NON_PERSISTENT Message只存储在JMS服务节点的内存区域,不会存储在某种持久化介质上(AcitveMQ可支持的持久化介质有:KahaBD、AMQ和关系型数据)。在极限状况下,JMS服务节点的内存区域不够使用了,也只会采用某种辅助方案进行转存(例如ActiveMQ会使用磁盘上的一个“临时存储区域”进行暂存)。一旦JMS服务节点宕机了,这些NON_PERSISTENT Message就会丢失。
JMS中对PERSISTENT Meaage的定义是:这些消息不受JMS服务端异常状态的影响,JMS服务端会使用某种持久化存储方案保存这些消息,直到JMS服务端认为这些PERSISTENTMeaage被消费端成功处理。例如ActiveMQ中能够选择的持久化存储方案就包括:KahaDB、AMQ和关系型数据库。
在JMS标准API中,使用setDeliveryMode标记消息发送者是发送的PERSISTENT Meaage仍是NON_PERSISTENT Message。示例以下:
...... for(int index = 0 ; index < 10 ; index++) { TextMessage outMessage = session.createTextMessage(); outMessage.setText("这是发送的消息内容:" + index); if(index % 2 == 0) { sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } else { sender.setDeliveryMode(DeliveryMode.PERSISTENT); } sender.send(outMessage); } ......那么当 JMS服务节点重启后(注意不是 producer重启),以上代码中发送的10条消息只有其中5条消息可以保存下来。
发送NON_PERSISTENT Message时,消息发送方默认使用异步方式:便是说消息发送后发送方不会等待NON_PERSISTENT Message在服务端的任何回执。那么问题来了:若是这时服务端已经出现了消息堆积,而且堆积程度已经达到“没法再接收新消息”的极限状况了,那么消息发送方若是知晓并采起相应的策略呢?
实际上所谓的异步发送也并不是绝对的异步,消息发送者会在发送必定大小的消息后等待服务端进行回执(这个配置只是针对使用异步方式进行发送消息的状况)
...... // 如下语句设置消息发送者在累计发送102400byte大小的消息后(多是一条消息也多是多条消息) // 等待服务端进行回执,以便肯定以前发送的消息是否被正确处理 // 肯定服务器端是否产生了过量的消息堆积,须要减慢消息生产端的生产速度 connectionFactory.setProducerWindowSize(102400); ......
...... // 使用异步传输 // 上文已经说过,若是发送的是NON_PERSISTENT Message // 那么默认就是异步方式 connectionFactory.setUseAsyncSend(true); ......一旦您进行了这样的设置,就须要设置回执窗口:
...... // 一样设置消息发送者在累计发送102400byte大小的消息后 // 等待服务端进行回执,以便肯定以前发送的消息是否被正确处理 // 肯定服务器端是否产生了过量的消息堆积,须要减慢消息生产端的生产速度 connectionFactory.setProducerWindowSize(102400); ......
持续订阅和非持续订阅,是针对“订阅-发布”模式的细分处理策略,在JMS规范中的标准称呼是:Durable-Subscribers和Non-Durable Subscribers。
Durable-Subscribers是指在“订阅-发布”模式下,即便标记为Durable-Subscribers的订阅者下线了(多是由于订阅者宕机,也多是由于这个订阅者故意下线),“订阅-发布”模式的Topic队列也要保存这些消息(视消息不一样的持久化策略影响,保存机制不同),直到下次这个被标记为Durable-Subscribers的订阅者从新上线,并正确处理这条消息为止。换句话说,标记为Durable-Subscribers的订阅者是否能得到某条消息,和它是否曾经下线没有任何关系。
Non-Durable Subscribers是指在“订阅-发布”模式下,“订阅-发布”模式的Topic队列不用为这些已经下线的订阅者保留消息。当后者将消息按照既定的广播规则发送给当前在线的订阅者后,消息就能够被标记为“处理完成”。
ActiveMQ 在 队列中存储 Message 时,采用先进先出顺序(FIFO)存储。同一时间一个消息被分派给单个消费者,且只有当 Message 被消费并确认时,它才能从存储中删除。
对于持久化订阅者来讲,每一个消费者得到 Message 的副本。为了节省存储空间,Provider 仅存储消息的一个副本。持久化订阅者维护了指向下一个 Message 的指针,并将其副本分派给消费者。以这种方式实现消息存储,由于每一个持久化订阅者可能以不一样的速率消费 Message,或者它们可能不是所有同时运行。此外,因每一个 Message 可能存在多个消费者,因此在它被成功地传递给全部持久化订阅者以前,不能从存储中删除。
关于持久化和消息的保留见下表:
消息类型 | 是否持久化 | 是否有Durable订阅者 | 消费者延迟启动时,消息是否保留 | Broker重启时,消息是否保留 |
Queue | N | - | Y | N |
Queue | Y | - | Y | Y |
Topic | N | N | N | N |
Topic | N | Y | Y | N |
Topic | Y | N | N | N |
Topic | Y | Y | Y | Y |
ActiveMQ有四种存储器,下面分别介绍和分析各自的特色和优缺点。
一、KahaDB message store:
是ActiveMQ的默认以及推荐的存储器,特色是基于文件、支持事务日志、可靠、可扩展、速度快等。重点讨论一下后两点。
KahaDB主要元素包括:一个内存Metadata Cache用来在内存中检索消息的存储位置、若干用于记录消息内容的Data log文件、一个在磁盘上检索消息存储位置的Metadata Store、还有一个用于在系统异常关闭后恢复Btree结构的redo文件。
a. 可扩展体如今KahaDB支持其余三种存储器的外接扩展,也就是说能够同时用不止一种,这样能够取长补短,适合更广的应用场景,达到性能最佳。
b. 速度快:(1)快速的事务日志;(2)高度优化的消息ID索引;(3)在内存中的消息缓存。具体分析,消息直接添加在当前日志文件的尾部,因此存的快(相似Redis的Aof);用一个索引文件存储全部的destination,可谓高度优化;支持内存缓存也是必然,但在缓存回复策略上不如内存存储器。
<broker brokerName="broker" persistent="true" useShutdownHook="false"> <persistenceAdapter> <kahaDB directory="${activemq.data}/kahadb" journalMaxFileLength="16mb"/> </persistenceAdapter> </broker>
在基于文件、支持事务方面和KahaDB相似。不一样之处以下:
优势:索引用的是hashbin(哈希桶,没有查到权威定义,可理解为哈希表),天然比KahaDB的Btree索引要快,而且磁盘读写用的是nio,速度也快,因此用于消息吞吐量要求比较大的时候是最佳选择。(有的人把吞吐量理解成消息总数量其实不正确,应该是消息出入队的速率。)
缺点:对于每一个destination都要建一个索引,因此不适于不少destination并发的场合,而这偏偏是KahaDB的优点,它能够支持最大10000个queue的同时等待。(AMQ为每一个索引使用两个分开的文件,而且每一个 Destination 都有一个索引,因此当你打算在代理中使用数千个队列的时候,不该该使用它。)
<persistenceAdapter> <amqPersistenceAdapter directory="${activemq.data}/kahadb" syncOnWrite="true" indexPageSize="16kb" indexMaxBinSize="100" maxFileLength="10mb" /> </persistenceAdapter>
默认的JDBC驱动是ApacheDerby,同时支持MySQL、PostgreSQL、Oracle、SQLServer、Sybase、Informix、MaxDB等主流的关系数据库。用三张表结构来存储消息,分别是ACTIVEMQ_MSGS、ACTIVEMQ_ACKS、ACTIVEMQ_LOCK。第二张表外键关联到第一张表,共同存储消息,第三张表用于锁定保证只有一个broker实例能够访问数据库。选择关系型数据库,一般的缘由是企业已经具有了管理关系型数据的专长,可是它在性能上绝对不优于上述消息存储实现。
<beans> <broker brokerName="test-broker" persistent="true" xmlns="http://activemq.apache.org/schema/core"> <persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds"/> </persistenceAdapter> </broker> <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <property name="username" value="activemq"/> <property name="password" value="activemq"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean> </beans>
用于实时消息的缓存,只针对非持久订阅的消费者提供了5种订阅恢复策略,能够极大程度加强非持久订阅的可用性。也就是说对于持久订阅的消费者是用不到内存存储的。
<broker brokerName="test-broker" persistent="false" xmlns="http://activemq.apache.org/schema/core"> <transportConnectors> <transportConnector uri="tcp://localhost:61635"/> </transportConnectors> </broker>
Producer客户端使用来发送消息的, Consumer客户端用来消费消息;它们的协同中心就是ActiveMQ broker,broker也是让producer和consumer调用过程解耦的工具,最终实现了异步RPC/数据交换的功能。随着ActiveMQ的不断发展,支持了愈来愈多的特性,也解决开发者在各类场景下使用ActiveMQ的需求。好比producer支持异步调用;使用flow control机制让broker协同consumer的消费速率;consumer端可使用prefetchACK来最大化消息消费的速率;提供"重发策略"等来提升消息的安全性等。一条消息的生命周期以下:
图片中简单的描述了一条消息的生命周期,不过在不一样的架构环境中,message的流动行可能更加复杂.将在稍后有关broker的架构中详解..一条消息从producer端发出以后,一旦被broker正确保存,那么它将会被consumer消费,而后ACK,broker端才会删除;不过当消息过时或者存储设备溢出时,也会终结它。
这是一张很复杂,并且有些凌乱的图片;这张图片中简单的描述了:1)producer端如何发送消息 2) consumer端如何消费消息 3) broker端如何调度。
"可优化的ACK",这是ActiveMQ对于consumer在消息消费时,对消息ACK的优化选项,也是consumer端最重要的优化参数之一,你能够经过以下方式开启:
1) 在brokerUrl中增长以下查询字符串:
String brokerUrl = "tcp://localhost:61616?" + "jms.optimizeAcknowledge=true" + "&jms.optimizeAcknowledgeTimeOut=30000" + "&jms.redeliveryPolicy.maximumRedeliveries=6"; ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);2) 在 destinationUri中,增长以下查询字符串:
String queueName = "test-queue?customer.prefetchSize=100"; Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination queue = session.createQueue(queueName);咱们须要在 brokerUrl指定 optimizeACK选项,在 destinationUri中指定 prefetchSize(预获取)选项,其中brokerUrl参数选项是 全局的,即当前factory下全部的connection/session/consumer都会默认使用这些值;而destinationUri中的选项, 只会在使用此destination的consumer实例中有效;若是同时指定, brokerUrl中的参数选项值将会 被覆盖。
optimizeAck表示是否开启“优化ACK”,只有在为true的状况下,prefetchSize(下文中将会简写成prefetch)以及optimizeAcknowledgeTimeout参数才会有意义。此处须要注意"optimizeAcknowledgeTimeout"选项只能在brokerUrl中配置。
prefetch值建议在destinationUri中指定,由于在brokerUrl中指定比较繁琐;
在brokerUrl中,queuePrefetchSize和topicPrefetchSize都须要单独设定:
"&jms.prefetchPolicy.queuePrefetch=12&jms.prefetchPolicy.topicPrefetch=12"等来逐个指定。
若是prefetchACK为true,那么prefetch必须大于0;当prefetchACK为false时,你能够指定prefetch为0以及任意大小的正数。
当consumer端使用receive()方法同步获取消息时,prefetch能够为0和任意正值:
当consumer端使用MessageListener异步获取消息时,这就须要开发设定的prefetch值必须 >=1,即至少为1;在异步消费消息模式中,设定prefetch=0,是相悖的,也将得到一个Exception。
当消息须要broker端重发时,consumer会首先在本地的“deliveredMessage队列”(Consumer已经接收但还未确认的消息队列)删除它,而后向broker发送“REDELIVERED_ACK_TYPE”类型的确认指令,broker将会把指令中指定的消息从新添加到pendingQueue(亟待发送给consumer的消息队列)中,直到合适的时机,再次push给client。
prefetch表达了“批量获取”消息的语义,broker端主动的批量push多条消息给client端,总比client屡次发送PULL指令而后broker返回一条消息的方式要优秀不少,它不只减小了client端在获取消息时阻塞的次数和阻塞的时间,还可以大大的减小网络开支。optimizeACK表达了“延迟确认”的语义(ACK时机),client端在消费消息后暂且不发送ACK,而是把它缓存下来(pendingACK),等到这些消息的条数达到必定阀值时,只须要经过一个ACK指令把它们所有确认;这比对每条消息都逐个确认,在性能上要提升不少。因而可知,prefetch优化了消息传送的性能,optimizeACK优化了消息确认的性能。
consumer.optimizeAck = connection.optimizeACK && session.isAutoAcknowledge()当 consumer.optimizeACK有效时,若是客户端已经消费但还没有确认的消息(deliveredMessage) 达到prefetch * 0.65,consumer端将会自动进行ACK;同时若是离上一次ACK的时间间隔,已经超过" optimizeAcknowledgeTimout"毫秒,也会致使自动进行ACK。
JMS API中约定了Client端可使用四种ACK模式,在javax.jms.Session接口中:
ACK模式描述了Consumer与broker确认消息的方式(时机),好比当消息被Consumer接收以后,Consumer将在什么时候确认消息。对于broker而言,只有接收到ACK指令,才会认为消息被正确的接收或者处理成功了,经过ACK,能够在consumer(/producer)与Broker之间创建一种简单的“担保”机制.
AUTO_ACKNOWLEDGE:
自动确认,这就意味着消息的确认时机将有consumer择机确认."择机确认"彷佛充满了不肯定性,这也意味着,开发者必须明确知道"择机确认"的具体时机,不然将有可能致使消息的丢失,或者消息的重复接收.那么在ActiveMQ中,AUTO_ACKNOWLEDGE是如何运做的呢?
1) 对于consumer而言,optimizeAcknowledge属性只会在AUTO_ACK模式下有效。
2) 其中DUPS_ACKNOWLEGE也是一种潜在的AUTO_ACK,只是确认消息的条数和时间上有所不一样。
3) 在“同步”(receive)方法返回message以前,会检测optimizeACK选项是否开启,若是没有开启,此单条消息将当即确认,因此在这种状况下,message返回以后,若是开发者在处理message过程当中出现异常,会致使此消息也不会redelivery,即"潜在的消息丢失";若是开启了optimizeACK,则会在unAck数量达到prefetch * 0.65时确认,固然咱们能够指定prefetchSize = 1来实现逐条消息确认。
4) 在"异步"(messageListener)方式中,将会首先调用listener.onMessage(message),此后再ACK,
若是onMessage方法异常,将致使client端补充发送一个ACK_TYPE为REDELIVERED_ACK_TYPE确认指令;
若是onMessage方法正常,消息将会正常确认(STANDARD_ACK_TYPE)。此外须要注意,消息的重发次数是有限制的,每条消息中都会包含“redeliveryCounter”计数器,用来表示此消息已经被重发的次数,若是重发次数达到阀值,将会致使发送一个ACK_TYPE为POSION_ACK_TYPE确认指令,这就致使broker端认为此消息没法消费,此消息将会被删除或者迁移到"dead letter"通道中。
所以当咱们使用messageListener方式消费消息时,一般建议在onMessage方法中使用try-catch,这样能够在处理消息出错时记录一些信息,而不是让consumer不断去重发消息;若是你没有使用try-catch,就有可能会由于异常而致使消息重复接收的问题,须要注意你的onMessage方法中逻辑是否可以兼容对重复消息的判断。
CLIENT_ACKNOWLEDGE :
客户端手动确认,这就意味着AcitveMQ将不会“自做主张”的为你ACK任何消息,开发者须要本身择机确认。在此模式下,开发者须要须要关注几个方法:
1) message.acknowledge(),
2) ActiveMQMessageConsumer.acknowledege(),
3) ActiveMQSession.acknowledge();
其1)和3)是等效的,将当前session中全部consumer中还没有ACK的消息都一块儿确认,2)只会对当前consumer中那些还没有确认的消息进行确认。开发者能够在合适的时机必须调用一次上述方法。为了不混乱,对于这种ACK模式下,建议一个session下只有一个consumer。
咱们一般会在基于Group(消息分组)状况下会使用CLIENT_ACKNOWLEDGE,咱们将在一个group的消息序列接受完毕以后确认消息(组);不过当你认为消息很重要,只有当消息被正确处理以后才能确认时,也可使用此模式 。
若是开发者忘记调用acknowledge方法,将会致使当consumer重启后,会接受到重复消息,由于对于broker而言,那些还没有真正ACK的消息被视为“未消费”。
开发者能够在当前消息处理成功以后,当即调用message.acknowledge()方法来"逐个"确认消息,这样能够尽量的减小因网络故障而致使消息重发的个数;固然也能够处理多条消息以后,间歇性的调用acknowledge方法来一次确认多条消息,减小ack的次数来提高consumer的效率,不过这仍然是一个利弊权衡的问题。
除了message.acknowledge()方法以外,ActiveMQMessageConumser.acknowledge()和ActiveMQSession.acknowledge()也能够确认消息,只不过前者只会确认当前consumer中的消息。其中sesson.acknowledge()和message.acknowledge()是等效的。
不管是“同步”/“异步”,ActiveMQ都不会发送STANDARD_ACK_TYPE,直到message.acknowledge()调用。若是在client端未确认的消息个数达到prefetchSize * 0.5时,会补充发送一个ACK_TYPE为DELIVERED_ACK_TYPE的确认指令,这会触发broker端能够继续push消息到client端。(参看PrefetchSubscription.acknwoledge方法)
在broker端,针对每一个Consumer,都会保存一个由于"DELIVERED_ACK_TYPE"而“拖延”的消息个数,这个参数为prefetchExtension,事实上这个值不会大于prefetchSize * 0.5,由于Consumer端会严格控制DELIVERED_ACK_TYPE指令发送的时机(参见ActiveMQMessageConsumer.ackLater方法),broker端经过“prefetchExtension”与prefetchSize互相配合,来决定即将push给client端的消息个数,count = prefetchExtension + prefetchSize - dispatched.size(),其中dispatched表示已经发送给client端可是尚未“STANDARD_ACK_TYPE”的消息总量;因而可知,在CLIENT_ACK模式下,足够快速的调用acknowledge()方法是决定consumer端消费消息的速率;若是client端由于某种缘由致使acknowledge方法未被执行,将致使大量消息不能被确认,broker端将不会push消息,事实上client端将处于“假死”状态,而没法继续消费消息。咱们要求client端在消费1.5*prefetchSize个消息以前,必须acknowledge()一次;一般咱们老是每消费一个消息调用一次,这是一种良好的设计。
此外须要额外的补充一下:全部ACK指令都是依次发送给broker端,在CLIET_ACK模式下,消息在交付给listener以前,都会首先建立一个DELIVERED_ACK_TYPE的ACK指令,直到client端未确认的消息达到"prefetchSize * 0.5"时才会发送此ACK指令,若是在此以前,开发者调用了acknowledge()方法,会致使消息直接被确认(STANDARD_ACK_TYPE)。broker端一般会认为“DELIVERED_ACK_TYPE”确认指令是一种“slow consumer”信号,若是consumer不能及时的对消息进行acknowledge而致使broker端阻塞,那么此consumer将会被标记为“slow”,此后queue中的消息将会转发给其余Consumer。
DUPS_OK_ACKNOWLEDGE :
"消息可重复"确认,意思是此模式下,可能会出现重复消息,并非一条消息须要发送屡次ACK才行。它是一种潜在的"AUTO_ACK"确认机制,为批量确认而生,并且具备“延迟”确认的特色。
对于开发者而言,这种模式下的代码结构和AUTO_ACKNOWLEDGE同样,不须要像CLIENT_ACKNOWLEDGE那样调用acknowledge()方法来确认消息。
1) 在ActiveMQ中,若是在Destination是Queue通道,咱们真的能够认为DUPS_OK_ACK就是“AUTO_ACK+ optimizeACK + (prefetch > 0)”这种状况,在确认时机上几乎彻底一致;此外在此模式下,若是prefetchSize =1 或者没有开启optimizeACK,也会致使消息逐条确认,从而失去批量确认的特性。
2) 若是Destination为Topic,DUPS_OK_ACKNOWLEDGE才会产生JMS规范中诠释的意义,即不管optimizeACK是否开启,都会在消费的消息个数>=prefetch * 0.5时,批量确认(STANDARD_ACK_TYPE),在此过程当中,不会发送DELIVERED_ACK_TYPE的确认指令,这是1)和AUTO_ACK的最大的区别。
这也意味着,当consumer故障重启后,那些还没有ACK的消息会从新发送过来。
SESSION_TRANSACTED :
当session使用事务时,就是使用此模式。在事务开启以后,和session.commit()以前,全部消费的消息,要么所有正常确认,要么所有redelivery。这种严谨性,一般在基于GROUP(消息分组)或者其余场景下特别适合。
在SESSION_TRANSACTED模式下,optimizeACK并不能发挥任何效果,由于在此模式下,optimizeACK会被强制设定为false,不过prefetch仍然能够决定DELIVERED_ACK_TYPE的发送时机。
由于Session非线程安全,那么当前session下全部的consumer都会共享同一个transactionContext;同时建议,一个事务类型的Session中只有一个Consumer,以免rollback()或者commit()方法被多个consumer调用而形成的消息混乱。
当consumer接受到消息以后,首先检测TransactionContext是否已经开启,若是没有,就会开启并生成新的transactionId,并把信息发送给broker;此后将检测事务中已经消费的消息个数是否 >= prefetch * 0.5,若是大于则补充发送一“DELIVERED_ACK_TYPE”的确认指令;这时就开始调用onMessage()方法,若是是同步(receive),那么即返回message。上述过程,和其余确认模式没有任何特殊的地方。
当开发者决定事务能够提交时,必须调用session.commit()方法,commit方法将会致使当前session的事务中全部消息当即被确认;事务的确认过程当中,首先把本地的deliveredMessage队列中还没有确认的消息所有确认(STANDARD_ACK_TYPE);此后向broker发送transaction提交指令并等待broker反馈,若是broker端事务操做成功,那么将会把本地deliveredMessage队列清空,新的事务开始;若是broker端事务操做失败(此时broker已经rollback),那么对于session而言,将执行inner-rollback,这个rollback所作的事情,就是将当前事务中的消息清空并要求broker重发(REDELIVERED_ACK_TYPE),同时commit方法将抛出异常。
当session.commit方法异常时,对于开发者而言一般是调用session.rollback()回滚事务(事实上开发者不调用也没有问题),固然你能够在事务开始以后的任什么时候机调用rollback(),rollback意味着当前事务的结束,事务中全部的消息都将被重发。须要注意,不管是inner-rollback仍是调用session.rollback()而致使消息重发,都会致使message.redeliveryCounter计数器增长,最终都会受限于brokerUrl中配置的"jms.redeliveryPolicy.maximumRedeliveries",若是rollback的次数过多,而达到重发次数的上限时,消息将会被DLQ(dead letter)。
INDIVIDUAL_ACKNOWLEDGE :
单条消息确认,这种确认模式,咱们不多使用,它的确认时机和CLIENT_ACKNOWLEDGE几乎同样,当消息消费成功以后,须要调用message.acknowledege来确认此消息(单条),而CLIENT_ACKNOWLEDGE模式先message.acknowledge()方法将致使整个session中全部消息被确认(批量确认)。
咱们须要在建立Session时指定ACK模式,因而可知,ACK模式将是session共享的,意味着一个session下全部的 consumer都使用同一种ACK模式。在建立Session时,开发者不能指定除ACK模式列表以外的其余值。
若是此session为事务类型,用户指定的ACK模式将被忽略,而强制使用"SESSION_TRANSACTED"类型;
若是此session为非事务类型时,也将不能将 ACK模式设定为"SESSION_TRANSACTED",毕竟这是相悖的。
Consumer消费消息的风格有2种: 同步/异步。使用consumer.receive()就是同步,使用messageListener就是异步;在同一个consumer中,咱们不能同时使用这2种风格,好比在使用listener的状况下,当调用receive()方法将会得到一个Exception。两种风格下,消息确认时机有所不一样。
1. 同步消费机制
同步调用时,在消息从receive方法返回以前,就已经调用了ACK;所以若是Client端没有处理成功,此消息将丢失(可能重发,与ACK模式有关)。
Message message = sessionMessageQueue.dequeue(); if(message != null){ ack(message); } return message
2. 异步消费机制
基于异步调用时,消息的确认是在onMessage方法返回以后,若是onMessage方法异常,会致使消息不能被ACK,会触发重发。
//基于listener Session session = connection.getSession(consumerId); sessionQueueBuffer.enqueue(message); Runnable runnable = new Ruannale(){ run(){ Consumer consumer = session.getConsumer(consumerId); Message md = sessionQueueBuffer.dequeue(); try{ consumer.messageListener.onMessage(md); ack(md);// }catch(Exception e){ redelivery();//sometime,not all the time; } } //session中将采起线程池的方式,分发异步消息 //所以同一个session中多个consumer能够并行消费 threadPool.execute(runnable);
JMS规范中支持带事务的消息,也就是说您能够启动一个事务(并由消息发送者的链接会话设置一个事务号Transaction ID),而后在事务中发送多条消息。这个事务提交前这些消息都不会进入队列(不管是Queue仍是Topic)。
不进入队列,并不表明JMS不会在事务提交前将消息发送给ActiveMQ服务端。 实际上这些消息都会发送给服务端,服务端发现这是一条带有Transaction ID的消息,就会将先把这条消息放置在“transaction store”区域中(而且带有redo日志,这样保证在收到rollback指令后能进行取消操做),等待这个Transaction ID被rollback或者commit。
一旦这个Transaction ID被commit,ActiveMQ才会依据自身设置的PERSISTENT Message处理规则或者NON_PERSISTENT Meaage处理规则,将Transaction ID对应的message进行入队操做(不管是Queue仍是Topic)。如下代码示例了如何在生产者端使用事务发送消息:
...... //进行链接 connection = connectionFactory.createQueueConnection(); connection.start(); //创建会话(设置一个带有事务特性的会话) session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //创建queue(固然若是有了就不会重复创建) Queue sendQueue = session.createQueue("/test"); //创建消息发送者对象 MessageProducer sender = session.createProducer(sendQueue); //发送(JMS是支持事务的) for(int index = 0 ; index < 10 ; index++) { TextMessage outMessage = session.createTextMessage(); outMessage.setText("这是发送的消息内容-------------------" + index); // 不管是NON_PERSISTENT message仍是PERSISTENT message // 都要在commit后才能真正的入队 if(index % 2 == 0) { sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } else { sender.setDeliveryMode(DeliveryMode.PERSISTENT); } // 没有commit的消息,也是要先发送给服务端的 sender.send(outMessage); } session.commit(); ......在“connection.createSession”这个方法中一共有两个参数(这句代码在上文中已经出现过屡次)。第一个布尔型参数很好理解,就是标示这个链接会话是否启动事务;第二个整型参数标示了消息消费者的“应答模型”。
JMS规范除了为消息生产者端提供事务支持之外,还为消费服务端准备了事务的支持。您能够经过在消费者端操做事务的commit和rollback方法,向服务器告知一组消息是否处理完成。采用事务的意义在于,一组消息要么被所有处理并确认成功,要么所有被回滚并从新处理。
...... //创建会话(采用commit方式确认一批消息处理完毕) session = connection.createSession(true, Session.SESSION_TRANSACTED); //创建Queue(固然若是有了就不会重复创建) sendQueue = session.createQueue("/test"); //创建消息发送者对象 MessageConsumer consumer = session.createConsumer(sendQueue); consumer.setMessageListener(new MyMessageListener(session)); ...... class MyMessageListener implements MessageListener { private int number = 0; /** * 会话 */ private Session session; public MyMessageListener(Session session) { this.session = session; } @Override public void onMessage(Message message) { // 打印这条消息 System.out.println("Message = " + message); // 若是条件成立,就向服务器确认这批消息处理成功 // 服务器将从队列中删除这些消息 if(number++ % 3 == 0) { try { this.session.commit(); } catch (JMSException e) { e.printStackTrace(System.out); } } } }以上代码演示的是 消费者经过事务commit的方式,向服务器确认一批消息正常处理完成的方式。请注意代码示例中的“session = connection.createSession(true, Session.SESSION_TRANSACTED);”语句。第一个参数表示 链接会话启用事务支持;第二个参数表示 使用commit或者rollback的方式进行向服务器应答。
@Override public void onMessage(Message message) { // 打印这条消息 System.out.println("Message = " + message); // rollback这条消息 this.session.rollback(); }以上代码片断中,咱们不停的回滚正在处理的这条消息,经过打印出来的信息能够看到,这条消息被不停的重发:
Message = ActiveMQTextMessage {...... redeliveryCounter = 0, text = 这是发送的消息内容-------------------20} Message = ActiveMQTextMessage {...... redeliveryCounter = 1, text = 这是发送的消息内容-------------------20} Message = ActiveMQTextMessage {...... redeliveryCounter = 2, text = 这是发送的消息内容-------------------20} Message = ActiveMQTextMessage {...... redeliveryCounter = 3, text = 这是发送的消息内容-------------------20} Message = ActiveMQTextMessage {...... redeliveryCounter = 4, text = 这是发送的消息内容-------------------20}能够看到同一条记录被重复的处理,而且其中的redeliveryCounter属性不断累加。
消息处理失败后,不断的重发消息确定不是一个最好的处理办法:若是一条消息被不断的处理失败,那么最可能的状况就是这条消息承载的业务内容自己就有问题。那么不管重发多少次,这条消息仍是会处理失败。
为了解决这个问题,ActiveMQ中引入了“死信队列”(Dead Letter Queue)的概念。即一条消息再被重发了屡次后(默认为重发6次redeliveryCounter==6),将会被ActiveMQ移入“死信队列”。开发人员能够在这个Queue中查看处理出错的消息,进行人工干预。
默认状况下“死信队列”只接受PERSISTENT Message,若是NON_PERSISTENT Message超过了重发上限,将直接被删除。如下配置信息可让NON_PERSISTENT Message在超太重发上限后,也移入“死信队列”:
<policyEntry queue=">"> <deadLetterStrategy> <sharedDeadLetterStrategy processNonPersistent="true" /> </deadLetterStrategy> </policyEntry>
RedeliveryPolicy redeliveryPolicy = connectionFactory.getRedeliveryPolicy(); // 设置最大重发次数 redeliveryPolicy.setMaximumRedeliveries(3);
2. 在支持事务的消费者链接会话中,使用commit方法明确告知服务器端消息已处理成功前,会话链接就终止了(最多是异常终止)
3. 在须要使用ACK模式的会话中,使用消息的acknowledge方式明确告知服务器端消息已处理成功前,会话链接就终止了(最多是异常终止)
可是以上几种重发机制有一些小小的差别,主要体如今redeliveryCounter属性的做用区域。简而言之,第一种方法redeliveryCounter属性的做用区域是本次链接会话,然后两种redeliveryCounter属性的做用区域是在整个ActiveMQ系统范围。
以上是这篇博文的主要内容,参考了不少文章,想要给出ActiveMQ的一个大概,在其消息协议上还不大清楚,对于消息机制略有涉及,下一篇准备总结下其部署和集群。