ActiveMQ——三、ActiveMQ高可用与集群搭建

1. ActiveMQ的高可用原理
使用ZooKeeper(集群)注册全部的ActiveMQ Broker。只有其中的一个Broker能够提供服务,被视为 Master,其余的 Broker 处于待机状态,被视为Slave。若是Master因故障而不能提供服务,Zookeeper会从Slave中选举出一个Broker充当Master。
Slave链接Master并同步他们的存储状态,Slave不接受客户端链接。全部的存储操做都将被复制到 链接至 Master的Slaves。若是Master宕了,获得了最新更新的Slave会成为 Master。故障节点在恢复后会从新加入到集群中并链接Master进入Slave模式。
是否是以为和Redis Sentinel主从高可用的方式很像,这里的zookeeper起到的做用和reids里的sentinel做用差很少。java

另外,附上官方文档的一则警告,请使用者注意。replicated LevelDB 不支持延迟或者计划任务消息。这 些消息存储在另外的LevelDB文件中,若是使用延迟或者计划任务消息,将不会复制到Slave Broker上,不能实现消息的高可用。json

2.ActiveMQ高可用环境搭建服务器

本人是在一台CentOS虚拟机上进行测试的,在开发中须要根据本身的实际状况做出相应的调整。在这台服务器上,配置了3个ActiveMQ,以下图所示:session

后面须要修改的配置文件都在ACTIVEMQ_HOME/conf文件夹下。首先修改每一个ActiveMQ的持久化方式(修改ACTIVEMQ_HOME/bin/activemq.xml文件),ActiveMQ默认使用的是kahaDB做为持久化存储数据的,这里修改为levelDB。以下图所示:负载均衡

接下来修改ActiveMQ的TCP连接端口号,activemq-1使用默认的61616端口,activemq-2修改成61617,activemq-3修改成61618。以下图所示(注意红框部分):dom

修改并保存以后,就是修改jetty的端口号(修改ACTIVEMQ_HOME/bin/jetty.xml文件),由于实在同一台服务器上,不修改的话,第二个和第三个jetty将启动不了。第activemq-1依然使用默认端口8161,activemq-2使用8162,activemq-3使用8163端口,以下图:tcp

到这里,ActiveMQ高可用就配置好了,若是没启动zookeeper的话,先启动zookeeper(能够看个人关于zookeeper的博客),而后分别启动activemq-1,activemq-2,activemq-3。进入ACTIVEMQ_HOME/data下,查看activemq.log文件,若是没有报错则说明启动成功!在zookeeper上,能够看到以下数据:工具

其中elected不为空的节点表示为Master,由该activemq对外提供服务。测试

好了,让咱们来撸点代码测试一下吧!this

ClustorProducer:

public class ClustorProducer {  
    private ConnectionFactory factory;  
    private Connection connection;  
    private Session session;  
    private Destination destination;  
    private MessageProducer producer;  
  
    public ClustorProducer() throws JMSException {  
        this.factory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,  
                ActiveMQConnectionFactory.DEFAULT_PASSWORD,  
                "failover:(tcp://192.168.4.19:61616,tcp://192.168.4.19:61617,tcp://192.168.4.19:61618)?randomize=false");  
        this.connection = factory.createConnection();  
        connection.start();  
        this.session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  
        this.destination = session.createQueue("first");  
        producer = session.createProducer(destination);  
    }  
  
    public void send() throws JMSException, InterruptedException {  
        for(int i=1; i<=50000; i++) {  
            Message message = session.createTextMessage("内容:" + i);  
            producer.send(destination, message);  
            System.out.println(message);  
            Thread.sleep(1000);  
        }  
    }  
  
    public static void main(String[] args) throws JMSException, InterruptedException {  
        ClustorProducer clustorProducer = new ClustorProducer();  
        clustorProducer.send();  
    }  
}

ClustorConsumer:

public class ClustorConsumer {  
    private ConnectionFactory factory;  
    private Connection connection;  
    private Session session;  
    private Destination destination;  
    private MessageConsumer consumer;  
  
    public ClustorConsumer() throws JMSException {  
        this.factory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,  
                ActiveMQConnectionFactory.DEFAULT_PASSWORD,  
                "failover:(tcp://192.168.4.19:61616,tcp://192.168.4.19:61617,tcp://192.168.4.19:61618)?randomize=false");  
        this.connection = factory.createConnection();  
        connection.start();  
        this.session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  
        this.destination = session.createQueue("first");  
        this.consumer = session.createConsumer(destination);  
    }  
  
    public void consume() throws JMSException, InterruptedException {  
        while (true) {  
            Message message = consumer.receive();  
            if(message == null)  
                break;  
            System.out.println(message);  
            Thread.sleep(1000);  
        }  
    }  
  
    public static void main(String[] args) throws JMSException, InterruptedException {  
        ClustorConsumer clustorConsumer = new ClustorConsumer();  
        clustorConsumer.consume();  
    }  
}

其中的brokerUrl参数发生了变化变成了:

failover:(tcp://192.168.4.19:61616,tcp://192.168.4.19:61617,tcp://192.168.4.19:61618)?randomize=false

停掉三个ActiveMQ中任意的一个,咱们能够发现依然能够发送和接收消息。说明ActiveMQ的高可用很成功!

3.ActiveMQ集群负载均衡搭建

以前已经实现了ActiveMQ的高可用部署,单仅仅是高可用集群,没法达到负载均衡的做用,接下来只需简单配置就能完成能够实现负载均衡的集群功能:
在集群1的activemq.xml中连接集群2(在persistenceAdapter标签前配置):

<networkConnectors>
    <networkConnector uri="static:(tcp://192.168.1.103:61616,tcp://192.168.2.103:61617,tcp://192.168.2.103:61618)" duplex="false"/>
</networkConnectors>
在集群2的activemq.xml中连接集群1(在persistenceAdapter标签前配置):
<networkConnectors>
    <networkConnector uri="static:(tcp://192.168.1.104:61616,tcp://192.168.1.104:61617,tcp://192.168.1.104:61618)" duplex="false"/>
</networkConnectors>


这样就实现了ActiveMQ的集群高可用负载均衡功能。
客户端链接:
ActiveMQ 的客户端只能访问Master的Broker,其余处于Slave的Broker不能访问。因此客户端链接Broker应该使用failover协议。
配置文件地址应为:

failover:(tcp://192.168.1.103:61616,tcp://192.168.1.103:61617,tcp://192.168.1.103:61618)?randomize=false

或:

failover:(tcp://192.168.1.104:61616,tcp://192.168.1.104:61617,tcp://192.168.1.104:61618)?randomize=false

使用zookeeper的可视化工具,能够查看activemq在zookeeper中的注册状况:

address有内容的时候,表示该地址的节点为master

当为null的时候,表示该节点为slave

并且,activemq的UI界面也只能在master节点上打开

相关文章
相关标签/搜索