ActiveMQ集群搭建

1、 Activemq主备搭建

  Shared Filesystem Master-Slave方式
  shared filesystem Master-Slave部署方式主要是经过共享存储目录来实现master和slave的热备,全部的ActiveMQ应用都在不断地获取共享目录的控制权,哪一个应用抢到了控制权,它就成为master。
  多个共享存储目录的应用,谁先启动,谁就能够最先取得共享目录的控制权成为master,其余的应用就只能做为slave。html

1  搭建配置步骤搭建 master-slave (一主 一备份)

准备mq的1节点 activemq-1
准备mq的2节点 activemq-2node

  特色:
    只能本地不能分布式 和 集群。web

针对每个activemq的节点进行配置:数据库

1.1 配置节点1:
首先建立共享目录,并建立两个节点
如图:apache

  •  配置activemq-1,须要修改持久数据库位置,修改:activemq-1/conf/activemq.xml 

  • 配置activemq-1,须要修改activemq-1/conf/activemq.xml

  如图:修改为61617服务器

  • 配置activemq-1 ,须要修改activemq-1/conf/jetty.xml

2.2 配置节点2:session

  • 配置activemq-2,须要修改 持久目录文件,修改:activemq-2/conf/activemq.xml 

  • 配置activemq-2,须要修改activemq-2/conf/activemq.xml

  如图:修改为61618tcp

  • 配置activemq-1 ,须要修改activemq-2/conf/jetty.xml

2 测试master-slave
2.1 生产者分布式

 1 public class ProduceQueue {


 2     @Test

 3     public void sendMessage() throws Exception{

 4         //1.建立一个链接工厂 connectionfactory 参数:就是要链接的服务器的地址

 5         ConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://192.168.25.130:61617,tcp://192.168.25.130:61618)");

 6         //2.经过工厂获取链接对象 建立链接 7         Connection connection = factory.createConnection();

 8         //3.开启链接 9         connection.start();

10         //4.建立一个session对象  提供发送消息等方法

11         // 第一个参数:表示是否开启分布式事务(JTA)  通常是false 不开启。

12         // 第二个参数:就是设置消息的应答模式
13         // 若是 第一个参数为false时,第二个参数设置才有意义。用的是自动应答14         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
15         //5.建立目的地 (destination)  queue 参数:目的地的名称

16         Queue queue = session.createQueue("queue-test-cluster");

17         //6.建立个生产者18         MessageProducer producer = session.createProducer(queue);

19         //7.构建消息的内容20         TextMessage textMessage = session.createTextMessage("queue测试发送的消息");
21         // 8.发送消息22         producer.send(textMessage);

23         //9.关闭资源24         producer.close();

25         session.close();

26         connection.close();

27     }
28 
}

2.2 消费者ide

 1 public class ConsumerQueue {


 2     @Test

 3     public void consumer() throws Exception{

 4         //1.建立链接的工厂 5         ConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://192.168.25.130:61617,tcp://192.168.25.130:61618)");

 6         //2.建立链接 7         Connection connection = factory.createConnection();

 8         //3.开启链接 9         connection.start();

10         //4.建立session

11         // 第一个参数:表示是否开启分布式事务(JTA)  通常是false 不开启。

12         // 第二个参数:就是设置消息的应答模式   若是 第一个参数为false时,第二个参数设置才有意义。用的是自动应答13         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

14         // 5.建立接收消息的一个目的地15         Queue queue = session.createQueue("queue-test-cluster");

16         // 6.建立消费者17         MessageConsumer consumer = session.createConsumer(queue);

18         // 7.接收消息 打印

19         // 第一种20         /*while(true){

21             Message message = consumer.receive(1000000);//设置接收消息的超时时间

22             //没有接收到消息就跳出循环

23             if(message==null){

24                 break;

25             }

26             if(message instanceof TextMessage){

27                 TextMessage message2 = (TextMessage) message;

28                 System.out.println("接收的消息为"+message2.getText());

29                 }

30          }*/31         //第二种


32         // 设置一个监听器

33         // System.out.println("start");

34         // 这里其实开辟了一个新的线程35         consumer.setMessageListener(new MessageListener() {


36             //当有消息的时候会执行如下的逻辑37             @Override

38             public void onMessage(Message message) {

39                 if(message instanceof TextMessage){

40                     TextMessage message2 = (TextMessage) message;

41                     try {

42                         System.out.println("接收的消息为"+message2.getText());

43                     } catch (JMSException e) {

44                         e.printStackTrace();

45                     }

46                 }

47             }

48         });

49         //System.out.println("end");50         Thread.sleep(199999);

51         // 8.关闭资源52         consumer.close();

53         session.close();

54         connection.close();

55     }


56 }

先启动的成为主节点,日常主节点工做,slave不工做可是一直作监听。当主节点挂掉,slave接手工做。

2、 基于zookeeper的activemq集群搭建(推荐)

2.1 基于可复制的 LevelDB
  LevelDB 是 Google 开发的一套用于持久化数据的高性能类库。 LevelDB 并非一种服务,用户须要自行实现 Server。 是单进程的服务,可以处理十亿级别规模 Key-Value 型数据,占用内存小。
  http://activemq.apache.org/replicated-leveldb-store.html

 

  高可用的原理:使用 ZooKeeper(集群)注册全部的 ActiveMQ Broker。只有其中的一个 Broker 能够提供服务,被视为 Master,其余的 Broker 处于待机状态,被视为 Slave。若是 Master 因故障而不能提供服务,ZooKeeper 会从 Slave 中选举出一个 Broker 充当 Master。Slave 链接 Master 并同步他们的存储状态, Slave 不接受客户端链接。全部的存储操做都将被复制到链接至 Master 的 Slaves。 若是 Master 宕了,获得了最新更新的 Slave 会成为 Master。 故障节点在恢复后会从新加入到集群中并链接 Master 进入 Slave 模式。全部须要同步的 disk 的消息操做都将等待存储状态被复制到其余法定节点的操做完成才能完成。因此,若是你配置了 replicas=3,那么法定大小是(3/2)+1=2。 Master 将会存储并更新而后等待 (2-1)=1 个Slave 存储和更新完成,才汇报 success。 至于为何是 2-1,熟悉 Zookeeper 的应该知道,有一个 node要做为观擦者存在。当一个新的 Master 被选中,你须要至少保障一个法定 node 在线以可以找到拥有最新状态的 node。这个 node 能够成为新的 Master。所以,推荐运行至少 3 个 replica nodes,以防止一个 node失败了,服务中断。(原理与 ZooKeeper 集群的高可用实现方式相似)。

2.2 集群单机环境规划
  定义环境:3个activemq节点(node01 node02 node03)

Ip 集群通讯端口 节点消息链接端口 Jetty后台运行端口
192.168.25.130 63631 51515 8361
192.168.25.130 63632 51516 8362
192.168.25.130 63633 51517 8363

2.3 配置zookeeper集群

见相关教程;

2.4 节点1的配置

  在activemq.xml中配置:

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerCluster" dataDirectory="${activemq.data}">

  brokerName指定一个名字:任意便可。可是整个集群的全部的配置项名称都应该是一致的。

  broker标签下的配置:

<persistenceAdapter> 
  <!-- kahaDB directory="${activemq.data}/kahadb"/ --> 
  <replicatedLevelDB 
  directory="${activemq.data}/leveldb" 
  replicas="3" 
  bind="tcp://0.0.0.0:63631" 
  zkAddress="192.168.25.130:2181,192.168.25.130:2182,192.168.25.130:2183" 
  hostname="localhost" 
  zkPath="/activemq2/leveldb-stores"/> 
</persistenceAdapter>

  jetty.xml:配置项:

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
  <!-- the default port number for the web console -->
  <property name="host" value="0.0.0.0"/>
  <property name="port" value="8361"/>
</bean>

2.5 节点2的配置
  参考节点1搭建便可,注意:端口不要同样。

  搭建后的截图:


  node01 -node03 是activemq的3个节点。