使用ZooKeeper实现的Master-Slave实现方式,是对ActiveMQ进行高可用的一种有效的解决方案。html
原理:java
1 使用ZooKeeper(集群)注册全部的ActiveMQ Broker。
node
2 只有其中的一个Broker能够对外提供服务(也就是Master节点),其余的Broker处于待机状态,被视为Slave。
linux
3 。若是Master因故障而不能提供服务,则利用ZooKeeper的内部选举机制会从Slave中选举出一个Broker充当Master节
数据库
点,继续对外提供服务。apache
官方文档:http://activemq.apache.org/replicated-leveldb-store.htmlvim
(1)首先咱们下载apache-activemq-5.11.1-bin.tar.gz上传到咱们的机器上准备部署。
windows
(2)Zookeeper方案
服务器
主机IP | 消息端口 | 通讯端口 | 节点目录/root/下 |
192.168.98.95(hostname:node1) | 2181 | 2888:3888 | zookeeper-3.4.9 |
192.168.98.96(hostname:node2) | 2181 | 2888:3888 | zookeeper-3.4.9 |
192.168.98.97(hostname:node3) | 2181 | 2888:3888 | zookeeper-3.4.9 |
(3)ActiveMQ方案session
主机IP | 消息端口 |
集群通讯端口 | 控制台端口 | 节点目录/root/下 |
192.168.98.95(hostname:node1) | 51511 |
62621 | 8161 | activemq-cluster/node1/ |
192.168.98.96(hostname:node2) | 51512 | 62622 | 8162 | activemq-cluster/node2/ |
192.168.98.97(hostname:node3) | 51513 | 62623 | 8163 | activemq-cluster/node3/ |
1 首先搭建zookeeper环境,
2 继续搭建activemq环境
2.1 在192.168.98.95节点下,建立/root/activemq-cluster文件夹,解压apache-activemq-5.11.1-bin.tar.gz文件,而后对解压好的文件更名,操做以下:
1 命令:mkdir activemq-cluster
2 命令:tar -zxvf apache-activemq-5.11.1-bin.tar.gz -C /root/activemq-cluster
3 命令:cd /root/activemq-cluster
4 命令:mv apache-activemq-5.11.1 node1
如此操做,再次反复解压apache-activemq-5.11.1-bin.tar.gz文件到192.168.98.96和192.168.98.97的/root/activemq-cluster/下,分别对应创建node2和node3文件夹。
咱们如今已经解压好了三台机器的三个mq节点也就是node一、node二、node3,下面咱们要作的事情就是更改每一个节点不一样的配置和端口。
2.2 修改配置
2.2.1 修改控制台端口(默认为8161),在mq安装路径下的conf/jetty.xml进 行修改便可。(三个节点都要修改,而且端口都不一样)
# cd /root/activemq-cluster/node1/conf/
# vim /root/activemq-cluster/node1/conf/jetty.xml
node2和node3分别修改成8162和8163
2.2.2 集群配置文件修改:咱们在mq安装路径下的conf/activemq.xml进行修 改其中的持久化适配器,修改其中的bind、zkAddress、hostname、zkPath。而后也须要修改mq的brokerName,而且每一个节点名称都必须相同。
#vim /root/activemq-cluster/node1/conf/activemq.xml
第一处修改:brokerName=“jeff-activemq-cluster”(三个节点都须要修改)
第二处修改:先注释掉适配器中的kahadb
第三处修改:添加新的leveldb配置以下(三个节点都须要修改):
Node1:
<persistenceAdapter> <replicatedLevelDB directory="${activemq.data}/leveldb" replicas="3" bind="tcp://192.168.98.95:62621" zkAddress="192.168.98.95:2181,192.168.98.96:2181,192.168.98.97:2181" hostname="node1" zkPath="/activemq/leveldb-stores" /> </persistenceAdapter>
Node2:
<persistenceAdapter> <replicatedLevelDB directory="${activemq.data}/leveldb" replicas="3" bind="tcp://192.168.98.96:62622" zkAddress="192.168.98.95:2181,192.168.98.96:2181,192.168.98.97:2181" hostname="node2" zkPath="/activemq/leveldb-stores" /> </persistenceAdapter>Node3:
<persistenceAdapter> <replicatedLevelDB directory="${activemq.data}/leveldb" replicas="3" bind="tcp://192.168.98.97:62623" zkAddress="192.168.98.95:2181,192.168.98.96:2181,192.168.98.97:2181" hostname="node3" zkPath="/activemq/leveldb-stores" /> </persistenceAdapter>第四处修改:(修改通讯的端口,避免冲突)
修改这个文件的通讯端口号,三个节点都须要修改(51511,51512,51513)
Ok,到此为止,咱们的activemq集群环境已经搭建完毕!
3 测试启动activemq集群
第一步:启动zookeeper集群,命令:zkServer.sh start
第二步:启动mq集群:顺序启动mq:命令以下:
/root/activemq-cluster/node1/bin/activemq start(关闭stop)
/root/activemq-cluster/node2/bin/activemq start(关闭stop)
/root/activemq-cluster/node3/bin/activemq start(关闭stop)
第三步:查看日志信息:
tail -f /root/activemq-cluster/node1/data/activemq.log
tail -f /root/activemq-cluster/node2/data/activemq.log
tail -f /root/activemq-cluster/node3/data/activemq.log
若是不报错,咱们的主从配置启动成功,可使用控制台查看!
第四步:在代码中对集群的brokerUrl配置进行修改便可:
failover:(tcp://192.168.98.95:51511,tcp://192.168.98.96:51512,tcp://192.168.98.97:51513)?Randomize=false
既然zookeeper+activemq主从环境已经搭建完毕,那么究竟哪一个机器activemq是主节点?哪一个机器activemq是从节点?
咱们打开zookeeper:
能够看到在zk的根节点下有个activemq/leveldb-stores节点,它下边有三个子节点,表示咱们的三个activemq主机注册的节点,而主节点就是含有"eleced":"0000000003"这个属性的节点,而另外的子节点没有这个属性值:
主节点是node1,192.168.98.95,而后咱们打开控制台:
http://192.168.98.96:8162/admin/queues.jsp
http://192.168.98.97:8163/admin/queues.jsp
发现并无界面服务能够提供
打开:http://192.168.98.95:8161/admin/queues.jsp,界面正常访问:
咱们链接到activemq的主备集群名称为first的队列去发送50万条消息,每隔1s发送一次,而后启动消费者接收。
Sender.java
package jeff.mq.master; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author jeffSheng * 2018年7月3日 */ public class Sender { public static void main(String[] args) throws Exception { final Sender s = new Sender(); s.sender(); } public void sender() throws Exception{ /** * 第一步: * 创建ConnectionFactory工厂对象,须要填入用户名、密码、及要链接的地址,均 * 使用默认便可,默认端口“tcp://localhost:61616” */ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "failover:(tcp://192.168.98.95:51511,tcp://192.168.98.96:51512,tcp://192.168.98.97:51513)?Randomize=false"); /** * 第二步: * 经过ConnectionFactory工厂对象咱们建立一个Connection链接,而且调用Connection的start方法 * 开启链接,Connection链接默认是关闭的。 */ Connection connection = connectionFactory.createConnection(); connection.start(); /** * 第三步: * 经过Connection对象建立Session会话(上下文环境对象),用于接收消息,参数配置1为是否启动事务, * 参数配置2为签收模式,通常咱们设置自动签收。 */ //咱们这里不开启事务 // Session session =connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); //开启事务 Session session =connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); /** * 第四步: * 经过Session建立Destination对象,指的是一个客户端用来指定生产消息目标和消息消息来源的对象, * 在ptp模式中,Destination被称做Queue即队列;在Pub/Sub模式中Destination被称做Topic即主题 * 在程序众包给可使用多个Queue和Topic */ Destination destination = session.createQueue("first"); /** * 第五步: * 咱们须要经过Session对象常见消息的发送和接收对象(生产者和消费者) * MessageProcuder/MessageConsumer */ MessageProducer messageProducer = session.createProducer(null); /** * 第六步: * 咱们可使用MessageProducer的setDeliverMode方法为其设置持久化特性和非持久化特性(DeliverMode) */ //若是设置为持久话方式,咱们须要指定具体持久话策略,好比jdbc持久化到数据库 // messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); /** * 第七步: * 最后咱们使用JMS规范的TextMessage形式建立数据(经过Session对象),并用MessageProducer的send方法发送数据, * 同理客户端使用receive方法进行接收数据。最后不要忘记关闭Connection */ for (int i = 0; i < 500000; i++) { TextMessage textMessage = session.createTextMessage(); int n = (int)(Math.random()*10); textMessage.setText("我是消息,Id:"+n); /** * 参数解释: * 第一个参数:目的地 * 第二个参数:消息 * 第三个参数:是否持久化 * 第四个参数:优先级0~9,0-4是普通消息,5-9是加急消息 * 第五个参数:存活时间,这里咱们设置的是2分钟 */ messageProducer.send(destination,textMessage,DeliveryMode.NON_PERSISTENT,0,1000*60); System.out.println("发送消息:"+textMessage.getText()); Thread.sleep(1000); } //关闭方法会递归向下关闭会话等链接 if(connection!=null){ connection.close(); } } }
receiver.java
package jeff.mq.master; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * @author jeffSheng * 2018年7月3日 */ public class Receiver { public static void main(String[] args) throws Exception { /** * 第一步: * 创建ConnectionFactory工厂对象,须要填入用户名、密码、及要链接的地址,均 * 使用默认便可,默认端口“tcp://localhost:61616” */ ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "failover:(tcp://192.168.98.95:51511,tcp://192.168.98.96:51512,tcp://192.168.98.97:51513)?Randomize=false"); /** * 第二步: * 经过ConnectionFactory工厂对象咱们建立一个Connection链接,而且调用Connection的start方法 * 开启链接,Connection链接默认是关闭的。 */ Connection connection = connectionFactory.createConnection(); connection.start(); /** * 第三步: * 经过Connection对象建立Session会话(上下文环境对象),用于接收消息,参数配置1为是否启动事务, * 参数配置2为签收模式,通常咱们设置自动签收。 */ //咱们这里不开启事务 Session session =connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); /** * 第四步: * 经过Session建立Destination对象,指的是一个客户端用来指定生产消息目标和消息消息来源的对象, * 在ptp模式中,Destination被称做Queue即队列;在Pub/Sub模式中Destination被称做Topic即主题 * 在程序众包给可使用多个Queue和Topic */ Destination destination = session.createQueue("first"); /** * 第五步: * 咱们须要经过Session对象常见消息的发送和接收对象(生产者和消费者) * MessageProcuder/MessageConsumer */ MessageConsumer messageConsumer = session.createConsumer(destination); while(true){ TextMessage msg = (TextMessage)messageConsumer.receive(); if(msg==null)break; System.out.println("收到内容: "+msg.getText()); } //关闭方法会递归向下关闭会话等链接 if(connection!=null){ connection.close(); } } }
当node1宕机后,node2和node3中有一个会成为maser,而node1恢复后会进行再次选举!
留个疑问:
在测试的过程当中发现生产者发送的数据没法入队!
---忙活了半个晚上加一个早上终于找到缘由了,是linux的时间跟我windows的系统时间不一致,linux的时间快了,致使我windows消息设置的存活时间很快就过去了,发送的消息所有过时,我说怎么在topics的activemq.advisor.expire.queue.two中不停增加,而queue界面毛都没有!
调整了linux的时间,重启zk+mq,终于给我出来了!
linux校对时间方法:
yum install ntp
ntpdate cn.pool.ntp.org
如今zk中的主activemq服务器是node1:
咱们先启动服务让生产者和消费者正常运行,而后咱们停掉node1:
能够看到eclipse控制台消费者打印的消息停滞了一小会儿,停在id=50
接着继续打印了!
咱们看下如今的zk节点:主节点变成了node2,而node1已经不存在了
咱们如今重启node1,
zk多了一个00000000059节点就是node1,可是主节点仍然是node2!activemq的选主逻辑应该是zk监听顺序临时节点的原理。
成功进行了failover失败转移!
很简单:
集群1连接集群2: <networkConnectors> <networkConnector uri="static:(tcp://192.168.1.112:51514,tcp://192.168.1.112:51515,tcp://192.168.1.112:51516)" duplex="false"/> </networkConnectors>就是在集群1中链接集群2的uri链接.