ActiveMQ实现Master-Slave的方式有几种,一种是使用Jdbc Peristent的方式,未被Consumer读取的消息和死消息写在数据库的ActiveMQ_MSGS表中,还有一种是使用LevelDB文件,这些消息将被存储在Level DB的.log文件中。java
如今咱们项目有一个需求是将原先使用Jdbc Persistent的ActiveMQ转换为使用Zk + Replicated Level DB的ActiveMQ,这就产生了一个问题:如何将原先保存在ActiveMQ_MSGS表的消息导入到新ActiveMQ服务器的LevelDB数据文件中?数据库
咱们查看了使用jdbc peristent的ACTIVEMQ_MSGS表数据(以MySQL数据库为例)windows
能够看出存储在数据库里的消息是以Blob形式保存的。服务器
使用leveldb形式保存的消息数据的形式是二进制形式的session
咱们能够看出二者的数据存储格式彻底不同,所以不可能像jdbc peristent迁移那样,将activemq_msgs表数据从一个数据库迁移到另外一个数据库那样简单。只能采用从源ActiveMQ服务器读取队列消息,再将队列消息发送到目标ActiveMQ的方式。架构
咱们搭建了两个CentOS虚拟机,虚拟机A上是源ActiveMQ,使用jdbc persistent(MySQL),有三个Queue:TestQueue1,TestQueue2,TestQueue3.app
虚拟机B使用LevelDB作数据持久化。dom
1.单ActiveMQ,单队列消息迁移async
TestQueue1有三条持久化消息tcp
消息的内容咱们可使用activemq-admin.bat查看(这个只在windows环境下可用),这个命令支持获取远程ActiveMQ信息。
activemq-admin.bat browse --amqurl tcp://xxx.xxx.xxx.xxx:61616 TestQueue1
得到的TestQueue1队列信息以下所示
JMS_HEADER_FIELD:JMSDestination = TestQueue1 JMS_BODY_FIELD:JMSText = Test Queue1's first message. JMS_HEADER_FIELD:JMSType = JMS_HEADER_FIELD:JMSCorrelationID = JMS_HEADER_FIELD:JMSDeliveryMode = persistent JMS_HEADER_FIELD:JMSMessageID = ID:rickhunter.domain-46264-1476266938779-4:1:1:1:1 JMS_HEADER_FIELD:JMSExpiration = 0 JMS_HEADER_FIELD:JMSPriority = 0 JMS_HEADER_FIELD:JMSRedelivered = false JMS_HEADER_FIELD:JMSTimestamp = 1476267006371 JMS_HEADER_FIELD:JMSDestination = TestQueue1 JMS_BODY_FIELD:JMSText = TestQueue's second message JMS_HEADER_FIELD:JMSType = JMS_HEADER_FIELD:JMSCorrelationID = JMS_HEADER_FIELD:JMSDeliveryMode = persistent JMS_HEADER_FIELD:JMSMessageID = ID:rickhunter.domain-44431-1476325780609-4:1:1:1:1 JMS_HEADER_FIELD:JMSExpiration = 0 JMS_HEADER_FIELD:JMSPriority = 0 JMS_HEADER_FIELD:JMSRedelivered = false JMS_HEADER_FIELD:JMSTimestamp = 1476326826475 JMS_HEADER_FIELD:JMSDestination = TestQueue1 JMS_BODY_FIELD:JMSText = TestQueue1's third message. JMS_HEADER_FIELD:JMSType = JMS_HEADER_FIELD:JMSCorrelationID = JMS_HEADER_FIELD:JMSDeliveryMode = persistent JMS_HEADER_FIELD:JMSMessageID = ID:rickhunter.domain-44431-1476325780609-4:1:1:1:2 JMS_HEADER_FIELD:JMSExpiration = 0 JMS_HEADER_FIELD:JMSPriority = 0 JMS_HEADER_FIELD:JMSRedelivered = false JMS_HEADER_FIELD:JMSTimestamp = 1476326873071
咱们数据迁移的基本思路,是从虚拟机A的ActiveMQ上读取TestQueue1的持久化消息,发送给虚拟机B的ActiveMQ的TestQueue1.
读取TestQueue1队列消息的代码片断以下(这段代码参考了王新春的代码, 根据实际作了一点修改,特此感谢。http://wangxinchun.iteye.com/blog/2146120?utm_source=tuicool&utm_medium=referral)
String url = "tcp://xxx.xxx.xxx.xxx:61616"; ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", url); ActiveMQConnection connection = null; try { connection = (ActiveMQConnection)connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue destination = session.createQueue("TestQueue1"); QueueBrowser browser = session.createBrowser(destination); Enumeration<?> enumeration = browser.getEnumeration(); while (enumeration.hasMoreElements()) { TextMessage message = (TextMessage) enumeration.nextElement(); System.out.println("Browsing: " + message.getText()); } session.close(); } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } }
上述代码只是从源ActiveMQ的TestQueue1读取全部持久化消息并将每条消息正文输出到控制台,接下来咱们对这段代码进行改造,将获取的消息发送到目标ActiveMQ的TestQueue1。
咱们将每一条消息放到一个消息List中。而后链接目标ActiveMQ,将消息List中的每一条消息发送到TestQueue1。代码片断以下:
List<TextMessage> messageList = new ArrayList<TextMessage>(); .......... Enumeration<?> enumeration = browser.getEnumeration(); while (enumeration.hasMoreElements()) { TextMessage message = (TextMessage) enumeration.nextElement(); messageList.add(message); } .......... sendMessages(messageList); .......... private static void sendMessages(List<TextMessage> messageList) { try { String sendActiveMQUrl = "tcp://xxx.xxx.xxx.xxx:61616"; ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(sendActiveMQUrl); connectionFactory.setUserName("admin"); connectionFactory.setPassword("admin"); ActiveMQConnection connection = (ActiveMQConnection)connectionFactory.createConnection(); connection.setDisableTimeStampsByDefault(true); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立目标,就建立主题也能够建立队列 Destination destination = session.createQueue("TestQueue1"); //建立消息生产者 MessageProducer producer = session.createProducer(destination); //设置持久化,DeliveryMode.PERSISTENT和DeliveryMode.NON_PERSISTENT producer.setDeliveryMode(DeliveryMode.PERSISTENT); for(TextMessage message:messageList) { // 发送消息到ActiveMQ producer.send(message); TimeUnit.MILLISECONDS.sleep(10); } // 关闭资源 session.close(); connection.close(); System.out.println("The messages import is completed."); } catch(Exception ex) { ex.printStackTrace(); } }
上述代码中有一句代码请特别注意
connection.setDisableTimeStampsByDefault(true);
这句代码是控制是否使用JmsMessage自己自带的TimeStamp属性。ActiveMQConnection类的disableTimeStampsByDefault属性的API描述是这样的(基于ActiveMQ 5.14.0的代码)
/** * Sets whether or not timestamps on messages should be disabled or not. If * you disable them it adds a small performance boost. */ public void setDisableTimeStampsByDefault(boolean timeStampsDisableByDefault) { this.disableTimeStampsByDefault = timeStampsDisableByDefault; }
从API描述看出来,它是设置JMS Message自身所带的timestamp是否被禁用。具体使用这个属性的代码是在ActiveMQConnection的send方法
void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId messageId, int deliveryMode, int priority, long timeToLive, boolean async) throws JMSException { ............... if (!isDisableTimeStampsByDefault()) { long timeStamp = System.currentTimeMillis(); msg.setJMSTimestamp(timeStamp); if (timeToLive > 0) { expiration = timeToLive + timeStamp; } } ............... }
从上述代码能够看出,若是disableTimeStampsByDefault属性是false,ActiveMQ在发送Jms Message对象时,将无视消息对象自带的timestamp,使用JmsMessage发送时的时间。这样对于Jms Message对象timestamp属性敏感的应用就会出现问题,而设置这个属性为true,将保留Jms Message对象原先的timestamp信息。
补充一点,若是不设置ActiveMQConnection的disableTimeStampsByDefault属性,设置MessageProducer的disableMessageTimestamp属性能够起到相同的效果。MessageProducer的disableMessageTimestamp属性的API描述是
public abstract class ActiveMQMessageProducerSupport implements MessageProducer, Closeable { ............... /** * Sets whether message timestamps are disabled. * <P> * Since timestamps take some effort to create and increase a message's * size, some JMS providers may be able to optimize message overhead if * they are given a hint that the timestamp is not used by an application. * By calling the <CODE>setDisableMessageTimestamp</CODE> method on this * message producer, a JMS client enables this potential optimization for * all messages sent by this message producer. If the JMS provider accepts * this hint, these messages must have the timestamp set to zero; if the * provider ignores the hint, the timestamp must be set to its normal * value. * <P> * Message timestamps are enabled by default. * * @param value indicates if message timestamps are disabled * @throws javax.jms.JMSException if the JMS provider fails to close the producer due to * some internal error. */ public void setDisableMessageTimestamp(boolean value) throws JMSException { ............ }
当MessageProducer在发送消息时使用到了这个属性
public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher { .............. protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message,int deliveryMode, int priority, long timeToLive, MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException { .............. synchronized (sendMutex) { .............. if (!producer.getDisableMessageTimestamp()) { long timeStamp = System.currentTimeMillis(); message.setJMSTimestamp(timeStamp); if (timeToLive > 0) { expiration = timeToLive + timeStamp; } } ....... }
对照ActiveMQConnection的send方法,能够看出两段代码的做用彻底同样,所以咱们能够设置MessageProducer的disableMessageTimestamp属性达到相同效果,具体修改代码以下:
ActiveMQMessageProducer producer = (ActiveMQMessageProducer)session.createProducer(destination); producer.setDisableMessageTimestamp(true);
代码修改完成后运行,咱们能够看到目标ActiveMQ的TestQueue1中已经有迁移的三条消息。
JMS_HEADER_FIELD:JMSDestination = TestQueue1 JMS_BODY_FIELD:JMSText = Test Queue1's first message. JMS_HEADER_FIELD:JMSType = JMS_HEADER_FIELD:JMSCorrelationID = JMS_HEADER_FIELD:JMSDeliveryMode = persistent JMS_HEADER_FIELD:JMSMessageID = ID:rick.domain-4065-1476339886463-3:1:1:1:1 JMS_HEADER_FIELD:JMSExpiration = 0 JMS_HEADER_FIELD:JMSPriority = 4 JMS_HEADER_FIELD:JMSRedelivered = false JMS_HEADER_FIELD:JMSTimestamp = 1476267006371 JMS_HEADER_FIELD:JMSDestination = TestQueue1 JMS_BODY_FIELD:JMSText = TestQueue's second message JMS_HEADER_FIELD:JMSType = JMS_HEADER_FIELD:JMSCorrelationID = JMS_HEADER_FIELD:JMSDeliveryMode = persistent JMS_HEADER_FIELD:JMSMessageID = ID:rick.domain-4065-1476339886463-3:1:1:1:2 JMS_HEADER_FIELD:JMSExpiration = 0 JMS_HEADER_FIELD:JMSPriority = 4 JMS_HEADER_FIELD:JMSRedelivered = false JMS_HEADER_FIELD:JMSTimestamp = 1476326826475 JMS_HEADER_FIELD:JMSDestination = TestQueue1 JMS_BODY_FIELD:JMSText = TestQueue1's third message. JMS_HEADER_FIELD:JMSType = JMS_HEADER_FIELD:JMSCorrelationID = JMS_HEADER_FIELD:JMSDeliveryMode = persistent JMS_HEADER_FIELD:JMSMessageID = ID:rick.domain-4065-1476339886463-3:1:1:1:3 JMS_HEADER_FIELD:JMSExpiration = 0 JMS_HEADER_FIELD:JMSPriority = 4 JMS_HEADER_FIELD:JMSRedelivered = false JMS_HEADER_FIELD:JMSTimestamp = 1476326873071
与源ActiveMQ的三条消息比较,除了JMSMessageID属性不同之外(MessageId是由发送Message的Client机器ID和随机数生成的,不可能保证和原消息彻底一致),其余属性彻底一致。若是要保留源消息的MessageId信息,能够将其设置到JMSCorrelationID属性。
2.单ActiveMQ,多队列消息迁移
在1中咱们实现单队列消息的迁移。但若是源ActiveMQ有多个队列,每一个队列都有持久化消息,如何实现这些队列消息的迁移?
咱们最初的思路是将全部包含持久化消息的队列名称写入一个配置文件里,在迁移的时候读取这些队列名称。可是这样作的问题就是不灵活,有可能在迁移时一些原先没有持久化消息的队列包含了消息,一些原先有持久化消息的队列的消息被消费掉了,使得不是全部队列的消息都能被迁移。
经过查看ActiveMQ的API文档,咱们决定使用ActiveMQConnection类的destinationSource属性,这个属性的API描述是
/** * Returns the {@link DestinationSource} object which can be used to listen to destinations * being created or destroyed or to enquire about the current destinations available on the broker * * @return a lazily created destination source * @throws JMSException */ @Override public DestinationSource getDestinationSource() throws JMSException { ....... }
从API描述上能够看出这个属性是一个源对象,是对当前ActiveMQ链接上的全部Destination(包括Queue和Topic)的一个侦听对象。从DestinationSource对象,咱们能够获取源ActiveMQ的全部Queue(也能够得到全部Topic,这里再也不赘述).
public class DestinationSource implements MessageListener { .............. /** * Returns the current queues available on the broker */ public Set<ActiveMQQueue> getQueues() { return queues; } .............. }
得到了源ActiveMQ的Queue集合,接下来的操做就和1同样了,只是过滤掉那些不不包含持久化消息的Queue。实现代码的片断以下:
HashMap<String, List<TextMessage>> queueMessageMap = new HashMap<String, List<TextMessage>>(); ................. Set<ActiveMQQueue> activeMQQueues = connection.getDestinationSource().getQueues(); for(ActiveMQQueue destination: activeMQQueues) { QueueBrowser browser = session.createBrowser(destination); Enumeration<?> enumeration = browser.getEnumeration(); List<TextMessage> messageList = new ArrayList<TextMessage>(); while (enumeration.hasMoreElements()) { TextMessage message = (TextMessage) enumeration.nextElement(); messageList.add(message); } if (messageList.size() > 0) { queueMessageMap.put(destination.getQueueName(), messageList); } ................. if(queueMessageMap.size() > 0) { sendMessages(queueMessageMap); } ................. private static void sendMessages(HashMap<String, List<TextMessage>> queueMessageMap) { ................. for(String queueName:queueMessageMap.keySet()) { // 建立目标,就建立主题也能够建立队列 Destination destination = session.createQueue(queueName); // 建立消息生产者 MessageProducer producer = session.createProducer(destination); // 设置持久化,DeliveryMode.PERSISTENT和DeliveryMode.NON_PERSISTENT producer.setDeliveryMode(DeliveryMode.PERSISTENT); List<TextMessage> messageList = queueMessageMap.get(queueName); for(TextMessage message:messageList) { // 发送消息到ActiveMQ producer.send(message); TimeUnit.MILLISECONDS.sleep(10); } }
咱们在源ActiveMQ的TestQueue1,TestQueue2,TestQueue3队列里分别建立三条持久化消息(以下图所示)
删除目标ActiveMQ TestQueue1队列里1中迁移的三条消息
运行代码,咱们能够看到源ActiveMQ中的三个队列里的九条消息被迁移到了目标ActiveMQ
3.多ActiveMQ,多队列消息迁移
在这个场景中目标ActiveMQ是使用zk + Replicated LevelDB结构的三台ActiveMQ。代码仍然使用和2相同的代码,只是链接目标ActiveMQ的brokerUrl形式改成failover:(tcp://xxx.xxx.xxx, tcp://xxx.xxx.xxx, tcp://xxx.xxx.xxx)
咱们使用的三台ActiveMQ的清单以下:
ActiveMQ服务器号 | tcp端口号 | zk端口号 | 管理界面端口号 |
ActiveMQ-1 | 61616 | 2181 | 8161 |
ActiveMQ-2 | 62616 | 2181 | 8261 |
ActiveMQ-3 | 63616 | 2181 | 8361 |
最初三台ActiveMQ中没有队列,也没有消息(此时ActiveMQ-1为Master,ActiveMQ-2,ActiveMQ-3为Slave)
运行2的程序后,咱们能够看到ActiveMQ-1中有了TestQueue1,TestQueue2,TestQueue3三个队列,而且有了9条消息。
当咱们停掉ActiveMQ-1后,ActiveMQ-2成为Master,咱们能够从它的管理界面看到ActiveMQ-2也有三个队列,9条消息。
这说明2的程序对于Zookeeper + Replicated LevelDB架构的多ActiveMQ 数据迁移仍然有效。
4.结语
本文实现了从数据库介质的ActiveMQ到使用Replicated LevelDB文件介质的ActiveMQ的消息迁移,但并不只限于此种场景,任何两个ActiveMQ之间均可以实现相似的消息迁移。在实际运行中,还可使用这样的程序实现不一样ActiveMQ之间的数据同步,以及在一个ActiveMQ内,消息在不一样队列间的迁移。这须要开发者根据实际须要进行修改和调整。