按部就班ActiveMQ(6)----使用zookeeper实现activemq的主从环境搭建

使用ZooKeeper实现的Master-Slave实现方式,是对ActiveMQ进行高可用的一种有效的解决方案。html

原理:java

1 使用ZooKeeper(集群)注册全部的ActiveMQ Broker。
node

2 只有其中的一个Broker能够对外提供服务(也就是Master节点),其余的Broker处于待机状态,被视为Slave。
linux

3 。若是Master因故障而不能提供服务,则利用ZooKeeper的内部选举机制会从Slave中选举出一个Broker充当Master节
数据库

点,继续对外提供服务。apache

官方文档:http://activemq.apache.org/replicated-leveldb-store.htmlvim


部署方案

(1)首先咱们下载apache-activemq-5.11.1-bin.tar.gz上传到咱们的机器上准备部署。
windows

(2)Zookeeper方案
服务器

主机IP 消息端口 通讯端口 节点目录/root/下
192.168.98.95(hostname:node1 2181 2888:3888 zookeeper-3.4.9
192.168.98.96(hostname:node2 2181 2888:3888 zookeeper-3.4.9
192.168.98.97(hostname:node3 2181 2888:3888 zookeeper-3.4.9

(3)ActiveMQ方案session

主机IP
消息端口
集群通讯端口 控制台端口 节点目录/root/下
192.168.98.95(hostname:node1
51511
62621 8161 activemq-cluster/node1/
192.168.98.96(hostname:node2 51512 62622 8162 activemq-cluster/node2/
192.168.98.97(hostname:node3 51513 62623 8163 activemq-cluster/node3/

1 首先搭建zookeeper环境,

2 继续搭建activemq环境

2.1 在192.168.98.95节点下,建立/root/activemq-cluster文件夹,解压apache-activemq-5.11.1-bin.tar.gz文件,而后对解压好的文件更名,操做以下:

    1 命令:mkdir activemq-cluster

    2 命令:tar -zxvf apache-activemq-5.11.1-bin.tar.gz -C /root/activemq-cluster

    3 命令:cd  /root/activemq-cluster

    4 命令:mv apache-activemq-5.11.1 node1

如此操做,再次反复解压apache-activemq-5.11.1-bin.tar.gz文件到192.168.98.96和192.168.98.97的/root/activemq-cluster/下,分别对应创建node2和node3文件夹。

咱们如今已经解压好了三台机器的三个mq节点也就是node一、node二、node3,下面咱们要作的事情就是更改每一个节点不一样的配置和端口。

2.2 修改配置

2.2.1 修改控制台端口(默认为8161),在mq安装路径下的conf/jetty.xml进 行修改便可。(三个节点都要修改,而且端口都不一样)

# cd /root/activemq-cluster/node1/conf/
# vim /root/activemq-cluster/node1/conf/jetty.xml


node2和node3分别修改成8162和8163


2.2.2 集群配置文件修改:咱们在mq安装路径下的conf/activemq.xml进行修   改其中的持久化适配器,修改其中的bind、zkAddress、hostname、zkPath。而后也须要修改mq的brokerName,而且每一个节点名称都必须相同。

    #vim /root/activemq-cluster/node1/conf/activemq.xml

第一处修改:brokerName=“jeff-activemq-cluster”(三个节点都须要修改)


第二处修改:先注释掉适配器中的kahadb


第三处修改:添加新的leveldb配置以下(三个节点都须要修改):

Node1:

<persistenceAdapter>
    <replicatedLevelDB
        directory="${activemq.data}/leveldb"
        replicas="3"
        bind="tcp://192.168.98.95:62621"
        zkAddress="192.168.98.95:2181,192.168.98.96:2181,192.168.98.97:2181"
        hostname="node1"
        zkPath="/activemq/leveldb-stores" />

</persistenceAdapter>

Node2:

<persistenceAdapter>
    <replicatedLevelDB
        directory="${activemq.data}/leveldb"
        replicas="3"
        bind="tcp://192.168.98.96:62622"
        zkAddress="192.168.98.95:2181,192.168.98.96:2181,192.168.98.97:2181"
        hostname="node2"
        zkPath="/activemq/leveldb-stores" />

</persistenceAdapter>
Node3:

<persistenceAdapter>
    <replicatedLevelDB
        directory="${activemq.data}/leveldb"
        replicas="3"
        bind="tcp://192.168.98.97:62623"
        zkAddress="192.168.98.95:2181,192.168.98.96:2181,192.168.98.97:2181"
        hostname="node3"
        zkPath="/activemq/leveldb-stores" />

</persistenceAdapter>
第四处修改:(修改通讯的端口,避免冲突)
             # vim /root/activemq-cluster/node1/conf/activemq.xml

修改这个文件的通讯端口号,三个节点都须要修改(51511,51512,51513)



Ok,到此为止,咱们的activemq集群环境已经搭建完毕!


3  测试启动activemq集群
第一步:启动zookeeper集群,命令:zkServer.sh start
第二步:启动mq集群:顺序启动mq:命令以下:
/root/activemq-cluster/node1/bin/activemq start(关闭stop)
/root/activemq-cluster/node2/bin/activemq start(关闭stop)
/root/activemq-cluster/node3/bin/activemq start(关闭stop)
第三步:查看日志信息:
tail -f /root/activemq-cluster/node1/data/activemq.log
tail -f /root/activemq-cluster/node2/data/activemq.log
tail -f /root/activemq-cluster/node3/data/activemq.log
若是不报错,咱们的主从配置启动成功,可使用控制台查看!


第四步:在代码中对集群的brokerUrl配置进行修改便可:

failover:(tcp://192.168.98.95:51511,tcp://192.168.98.96:51512,tcp://192.168.98.97:51513)?Randomize=false


查看mq主节点

既然zookeeper+activemq主从环境已经搭建完毕,那么究竟哪一个机器activemq是主节点?哪一个机器activemq是从节点?

咱们打开zookeeper:


能够看到在zk的根节点下有个activemq/leveldb-stores节点,它下边有三个子节点,表示咱们的三个activemq主机注册的节点,而主节点就是含有"eleced":"0000000003"这个属性的节点,而另外的子节点没有这个属性值:


主节点是node1,192.168.98.95,而后咱们打开控制台:

http://192.168.98.96:8162/admin/queues.jsp

http://192.168.98.97:8163/admin/queues.jsp

发现并无界面服务能够提供


打开:http://192.168.98.95:8161/admin/queues.jsp,界面正常访问:



应用测试

咱们链接到activemq的主备集群名称为first的队列去发送50万条消息,每隔1s发送一次,而后启动消费者接收。

Sender.java

package jeff.mq.master;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * @author jeffSheng
 * 2018年7月3日
 */
public class Sender {
	
	public static void main(String[] args) throws Exception {
		final Sender s = new Sender();
		s.sender();
	}
	
	public void sender() throws Exception{
		/**
		 * 第一步:
		 * 创建ConnectionFactory工厂对象,须要填入用户名、密码、及要链接的地址,均
		 * 使用默认便可,默认端口“tcp://localhost:61616”
		 */
			ConnectionFactory connectionFactory = 
					new ActiveMQConnectionFactory(
							ActiveMQConnectionFactory.DEFAULT_USER, 
							ActiveMQConnectionFactory.DEFAULT_PASSWORD, 
							"failover:(tcp://192.168.98.95:51511,tcp://192.168.98.96:51512,tcp://192.168.98.97:51513)?Randomize=false");
		
		/**
		 * 第二步:
		 * 经过ConnectionFactory工厂对象咱们建立一个Connection链接,而且调用Connection的start方法
		 * 开启链接,Connection链接默认是关闭的。
		 */
		Connection connection = connectionFactory.createConnection();
		connection.start();
		
		/**
		 * 第三步:
		 * 经过Connection对象建立Session会话(上下文环境对象),用于接收消息,参数配置1为是否启动事务,
		 * 参数配置2为签收模式,通常咱们设置自动签收。
		 */
		//咱们这里不开启事务
		
//		Session session =connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
		//开启事务
		Session session =connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
		/**
		 * 第四步:
		 * 经过Session建立Destination对象,指的是一个客户端用来指定生产消息目标和消息消息来源的对象,
		 * 在ptp模式中,Destination被称做Queue即队列;在Pub/Sub模式中Destination被称做Topic即主题
		 * 在程序众包给可使用多个Queue和Topic
		 */
		Destination destination = session.createQueue("first");
		
		
		/**
		 * 第五步:
		 * 咱们须要经过Session对象常见消息的发送和接收对象(生产者和消费者)
		 * MessageProcuder/MessageConsumer
		 */
		MessageProducer messageProducer = session.createProducer(null);
		
		/**
		 * 第六步:
		 * 咱们可使用MessageProducer的setDeliverMode方法为其设置持久化特性和非持久化特性(DeliverMode)
		 */
		//若是设置为持久话方式,咱们须要指定具体持久话策略,好比jdbc持久化到数据库
//		messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
		/**
		 * 第七步:
		 * 最后咱们使用JMS规范的TextMessage形式建立数据(经过Session对象),并用MessageProducer的send方法发送数据,
		 * 同理客户端使用receive方法进行接收数据。最后不要忘记关闭Connection
		 */
		for (int i = 0; i < 500000; i++) {
			TextMessage textMessage = session.createTextMessage();
			int n = (int)(Math.random()*10);
			textMessage.setText("我是消息,Id:"+n);
			/**
			 * 参数解释:
			 * 第一个参数:目的地
			 * 第二个参数:消息
			 * 第三个参数:是否持久化
			 * 第四个参数:优先级0~9,0-4是普通消息,5-9是加急消息
			 * 第五个参数:存活时间,这里咱们设置的是2分钟
			 */
			messageProducer.send(destination,textMessage,DeliveryMode.NON_PERSISTENT,0,1000*60);
			System.out.println("发送消息:"+textMessage.getText());
			Thread.sleep(1000);
		}
		
		//关闭方法会递归向下关闭会话等链接
		if(connection!=null){
			connection.close();
		}
	}
}

receiver.java

package jeff.mq.master;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * @author jeffSheng
 * 2018年7月3日
 */
public class Receiver {

	
	public static void main(String[] args) throws Exception {
		/**
		 * 第一步:
		 * 创建ConnectionFactory工厂对象,须要填入用户名、密码、及要链接的地址,均
		 * 使用默认便可,默认端口“tcp://localhost:61616”
		 */
		ConnectionFactory connectionFactory = 
				new ActiveMQConnectionFactory(
						ActiveMQConnectionFactory.DEFAULT_USER, 
						ActiveMQConnectionFactory.DEFAULT_PASSWORD, 
						"failover:(tcp://192.168.98.95:51511,tcp://192.168.98.96:51512,tcp://192.168.98.97:51513)?Randomize=false");
	
		/**
		 * 第二步:
		 * 经过ConnectionFactory工厂对象咱们建立一个Connection链接,而且调用Connection的start方法
		 * 开启链接,Connection链接默认是关闭的。
		 */
		Connection connection = connectionFactory.createConnection();
		connection.start();
		
		/**
		 * 第三步:
		 * 经过Connection对象建立Session会话(上下文环境对象),用于接收消息,参数配置1为是否启动事务,
		 * 参数配置2为签收模式,通常咱们设置自动签收。
		 */
		//咱们这里不开启事务
		Session session =connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
		
		/**
		 * 第四步:
		 * 经过Session建立Destination对象,指的是一个客户端用来指定生产消息目标和消息消息来源的对象,
		 * 在ptp模式中,Destination被称做Queue即队列;在Pub/Sub模式中Destination被称做Topic即主题
		 * 在程序众包给可使用多个Queue和Topic
		 */
		Destination destination = session.createQueue("first");
		
		
		/**
		 * 第五步:
		 * 咱们须要经过Session对象常见消息的发送和接收对象(生产者和消费者)
		 * MessageProcuder/MessageConsumer
		 */
		MessageConsumer messageConsumer = session.createConsumer(destination);
		while(true){
			TextMessage msg = (TextMessage)messageConsumer.receive();
			if(msg==null)break;
			System.out.println("收到内容: "+msg.getText());
		}
		//关闭方法会递归向下关闭会话等链接
		if(connection!=null){
			connection.close();
		}
		
	}
	
}

当node1宕机后,node2和node3中有一个会成为maser,而node1恢复后会进行再次选举!


留个疑问:

      在测试的过程当中发现生产者发送的数据没法入队!

---忙活了半个晚上加一个早上终于找到缘由了,是linux的时间跟我windows的系统时间不一致,linux的时间快了,致使我windows消息设置的存活时间很快就过去了,发送的消息所有过时,我说怎么在topics的activemq.advisor.expire.queue.two中不停增加,而queue界面毛都没有!


调整了linux的时间,重启zk+mq,终于给我出来了!


linux校对时间方法:

 yum install ntp

 ntpdate cn.pool.ntp.org


失败转移

如今zk中的主activemq服务器是node1:


咱们先启动服务让生产者和消费者正常运行,而后咱们停掉node1:


能够看到eclipse控制台消费者打印的消息停滞了一小会儿,停在id=50


接着继续打印了!


咱们看下如今的zk节点:主节点变成了node2,而node1已经不存在了



咱们如今重启node1,



zk多了一个00000000059节点就是node1,可是主节点仍然是node2!activemq的选主逻辑应该是zk监听顺序临时节点的原理。


成功进行了failover失败转移!


负载均衡配置

很简单:

集群1连接集群2:
<networkConnectors>
<networkConnector 
uri="static:(tcp://192.168.1.112:51514,tcp://192.168.1.112:51515,tcp://192.168.1.112:51516)" 
duplex="false"/>
</networkConnectors>
就是在集群1中链接集群2的uri链接.