[转载]关于ActiveMQ集群

转载于 http://blog.csdn.net/nimmy/article/details/6247289java

近日因工做关系,在研究JMS,使用ActiveMQ做为提供者,考虑到消息的重要,拟采用ActiveMQ的集群,网上查询,资料不多,且语焉不详,最后仍是看Apache提供的官方文档(俺E文很差,楞是拿金山词霸一个一个单词的看,累啊),终于作出来了,因而形诸于文,以供有须要的朋友参考mysql

本文阐述了使用 Pure Master Slave 方式 以及 JDBC Master Slave 方式s解决单点故障的问题sql

 关于ActiveMQ集群数据库

 

1 前提

  1. 下载jdk6(update24),解压,安装,下面用 $java_dir$ 表示JDK主目录
  2. 下载ActiveMQ5.4.2,解压,下面用 $activemq_dir$ 表示activeMQ主目录
  3. 下载AapcehANT1.8,解压,下面用 $ant_dir$ 表示ANT主目录
  4. 配置好环境变量

1)         Java_home : 指向 $java_dir$apache

2)         Ant_home :指向 $ant_dir$异步

3)         Path : 应包含 %java_home%/bintcp

  1. 将MySQL驱动包放置到 $activemq_home$/lib 下(本例用到MySQL数据源)
  2. Master:给broker取个名字,修改其持久化KAHADB文件
  3. Slave:给broker取个名字,修改其持久化KAHADB文件,须要配置Master的地址和端口
  4. 一个Master只能带一个Slave
  5. Master工做期间,会将消息情况自动同步到Slave
  6. Master一旦崩溃,Slave自动接替其工做,已发送并还没有消费的消息继续有效
  7. Slave接手后,必须中止Slave才能重启先前的Master
  8. Master:首先复制 $activemq_dir$/conf/activemq.xml,并更名为:pure_master.xml,修改文件

2         解决单点故障:Pure Master Slave

2.1     概述

2.2     MQ配置(这里是一台机器配置)

1)         <broker brokerName="pure_master" …测试

2)         <kahaDB directory="${activemq.base}/data/kahadb_pure_master "/>url

3)         <transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>spa

  1. Slave:首先复制 $activemq_dir$/conf/activemq.xml,并更名为:pure_slave.xml,修改文件

1)         <broker brokerName="pure_slave" masterConnectorURI="tcp://0.0.0.0:61616"

shutdownOnMasterFailure="false" …

2)         <kahaDB directory="${activemq.base}/data/kahadb_pure_slave "/>

3)         <transportConnector name="openwire" uri="tcp://0.0.0.0:61617"/>

  1. 首先启动Master,启动完毕后在另外一个Shell启动Slave,Slave启动后,能够看到Master那个Shell中显示已经Attach上了Slave

1)         启动Master:$activemq_dir$/bin>activemq xbean:file:../conf/pure_master.xml

2)         启动Slave:$activemq_dir$/bin>activemq xbean:file:../conf/pure_slave.xml

2.3     JAVA测试:队列

  1. 生产者

public static void main(String[] args) throws Exception {

         ConnectionFactory cf = new ActiveMQConnectionFactory("failover:(tcp://0.0.0.0:61616,tcp://0.0.0.0:61617)");

         Connection conn = cf.createConnection();

         conn.start();

         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

         Queue qq = new ActiveMQQueue("qq1");

         MessageProducer prod = sess.createProducer(qq);

         Message msg = null;

        

         Scanner scan = new Scanner(System.in);

         String str = scan.next();

         while(true) {

                   msg = sess.createTextMessage(str);

                   prod.send(msg);

                   if(str.equals("exit")) {

                            break;

                   }

                   str = scan.next();

         }

         conn.close();

}

  1. 消费者

public static void main(String[] args) throws Exception {

         ConnectionFactory cf = new ActiveMQConnectionFactory("failover:( tcp://0.0.0.0:61616,tcp://0.0.0.0:61617)");

         Connection conn = cf.createConnection();     

         conn.start();

         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

         Queue qq = new ActiveMQQueue("qq1");

         MessageConsumer cs = sess.createConsumer(qq);

 

         TextMessage msg = (TextMessage)cs.receive();

         String str = msg.getText();

         while(true) {

                   System.out.println("receive msg:/t"+msg.getText());

                   if(str.equals("exit")) {

                            break;

                   }

                   msg = (TextMessage)cs.receive();

                   str = msg.getText();

         }

         conn.close();

}

2.4     测试步骤

  1. 启动生产者,发送几条消息
  2. 启动消费者,可看到接收到的消息
  3. 关闭消费者
  4. 生产者继续发送几条消息—消息A
  5. 中止Master(可看到生产者端显示链接到Slave(tcp://0.0.0.0:61617)了)
  6. 生产者继续发送几条消息—消息B
  7. 启动消费者
  8. 消费者接收了消息A和消息B,可见Slave接替了Master的工做,并且储存了以前生产者通过Master发送的消息

2.5     结论

Pure Master Slave模式实现方式简单,能够实现消息的双机热备功能;队列能够实现消息的异步和点对点发送

3         解决单点故障:JDBC Master Slave

3.1     概述

  1. 配置上,不存在Master和Slave,全部Broder的配置基本是同样的
  2. 多个共享数据源的Broker构成JDBC Master Slave
  3. 给每一个Broker取个名字
  4. 首先抢到资源(数据库锁)的Broker成为Masetr
  5. 其余Broker保持预备状态,按期尝试抢占资源,运行其的Shell中清楚的显示了这一点
  6. 一旦Master崩溃,其余Broker尝试抢占资源,最终只有一台抢到,它马上成为Master
  7. 以前的Master即便重启成功,也只能做为Slave等待
  8. 将 $activemq_dir$/conf/activemq.xml 复制3份,分别更名为:jdbc_broker01.xml、jdbc_broker02.xml、jdbc_broker03.xml
  9. 首先修改jdbc_broker01.xml

3.2     MQ配置(这里是一台机器配置)

1)         <broker brokerName=" jdbc_broker01" …

2)         <!--配置持久适配器-->

<persistenceAdapter>

  <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data"   dataSource="#mysql-ds"/>

</persistenceAdapter>

3)         <transportConnector name="openwire"   uri="tcp://0.0.0.0:61616"/>

4)         <!--配置数据源:注意是在broker标记以外-->

</broker>

<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource"  destroy-method="close">

  <property name="driverClassName" value="com.mysql.jdbc.Driver"/>

  <property name="url" value="jdbc:mysql://localhost/test?relaxAutoCommit=true"/>

  <property name="username" value="root"/>

  <property name="password" value="root"/>

  <property name="maxActive" value="200"/>

  <property name="poolPreparedStatements" value="true"/>

</bean>

  1. 一样修改 jdbc_broker02.xml ,与 jdbc_broker01.xml 不一样之处

1)         <broker brokerName=" jdbc_broker02" …

2)         <transportConnector name="openwire" uri="tcp://0.0.0.0:61617"/>

  1. 一样修改 jdbc_broker03.xml ,与 jdbc_broker01.xml 不一样之处

1)         <broker brokerName=" jdbc_broker03" …

2)         <transportConnector name="openwire" uri="tcp://0.0.0.0:61618"/>

3.3     JAVA测试:队列

  1. 代码基本和前面一致,只是由于这里有3个broker,因此建立链接工厂的代码稍有差异:

ConnectionFactory cf = new ActiveMQConnectionFactory("failover:(tcp://0.0.0.0:61616,tcp://0.0.0.0:61617,tcp://0.0.0.0:61618)");

3.4     测试步骤

  1. 先启动生产者,发送几条消息
  2. 启动消费者,可看到接收到的消息
  3. 关闭消费者
  4. 生产者继续发送几条消息-消息A
  5. 中止broker01(可看到生产者端显示链接到broker02(tcp://0.0.0.0:61617)了,同时运行broker02的Shell也显示其成为了Master)
  6. 生产者继续发送几条消息-消息B
  7. 启动消费者
  8. 消费者接收了消息A和消息B,可见broker02接替了broker01的工做,并且储存了以前生产者通过broker01发送的消息
  9. 关闭消费者
  10. 生产者继续发送几条消息-消息C
  11. 中止broker02(可看到生产者端显示链接到broker03(tcp://0.0.0.0:61618)了,同时运行broker03的Shell也显示其成为了Master)
  12. 生产者继续发送几条消息-消息D
  13. 启动消费者
  14. 消费者接收了消息C和消息D,可见broker03接替了broker02的工做,并且储存了以前生产者通过broker02发送的消息
  15. 再次启动broker01,生产者或消费者均未显示链接到broker01(tcp://0.0.0.0:61616),代表broker01此时只是个Slave

3.5     结论

JDBC Master Slave模式实现方式稍微复杂一点,能够实现消息的多点热备功能,Master、Slave的交替彻底是即时的,自动的,无需重启任何broker;队列能够实现消息的异步和点对点发送

相关文章
相关标签/搜索