从零开始的高并发(四)--- Zookeeper的分布式队列

前言

前情概要

上一篇中咱们提到了zookeeper伪集群的简单搭建,为了提供可靠的zookeeper服务,咱们须要集群的支持。html

集群搭建中该注意的点有两个,一个是zoo.cfg文件的参数配置,咱们往其加入了dataLogDir路径来存放事务日志,还有要给三个集群的zoo.cfg文件都添加上集群节点配置,二是myid文件,myid是一行只包含机器id的文本,id在集群中必须是惟一的,其值应该在1~255之间,注意目录不能放错(dataDir的路径下)且注意编写时别输入错误的字符便可。java

咱们还简单地提到了paxos算法,根据一个小的场景描述了其流程,而且解释了zookeeper中选举算法的步骤,并结合打印出来的日志信息分析了其步骤node

以往连接

从零开始的高并发(一)--- Zookeeper的基础概念算法

从零开始的高并发(二)--- Zookeeper实现分布式锁数据库

从零开始的高并发(三)--- Zookeeper集群的搭建和leader选举apache

内容一:(补充)zookeeper集群的工做原理

zookeeper提供了重要的分布式协调服务,它是如何保证集群数据的一致性的?安全

① ZAB协议的简单描述

ZAB(zookeeper atomic broadcast)---zookeeper 原子消息广播协议是专门为zookeeper设计的数据一致性协议,注意此协议最主要的关注点在于数据一致性,而无关乎于数据的准确性,权威性,实时性。服务器

ZAB协议过程markdown

1.全部事务转发给leader(当咱们的follower接收到事务请求)
2.Leader分配全局单调递增事务id(zxid,也就是相似于paxos算法的编号n),广播协议提议
3.Follower处理提议,做出反馈(也就是承诺只接受比如今的n编号大的
4.leader收到过半数的反馈,广播commit,把数据完全持久化(和2pc不一样的是,2pc是要等待全部小弟反馈赞成)
5.leader对原来转发事务的followe进行响应,follower也顺带把响应返回给客户端
复制代码

还记得咱们说过zookeeper比较适合读比较多,写比较少的场景吗,为何咱们说它效率高,咱们能够知道,全部的事务请求,必须由一个全局惟一的服务器进行协调,这个服务器也就是如今的leader,leader服务器把客户端的一个写请求事务变成一个提议,这个提议经过咱们的原子广播协议广播到咱们服务器的其余节点上去,此时这个协议的编号,也就是zxid确定是最大的。网络

因为咱们的zxid都是由leader管理的,在上一节也是讲过,leader之因此能成为leader,原本就是由于它的zxid最大,此时的事务请求过来,leader的zxid自己最大的基础上再递增,这样新过来的事务的zxid确定就是最大的。那么一连串的事务又是如何在leader中进行处理,leader中会内置一个队列,队列的做用就是用来保证有序性(zxid有序且队列先进先出原则),因此后面来的事务不可能跳过前面来的事务。因此这也是ZAB协议的一个重要特性---有序性

② Leader崩溃时的举措

leader服务器崩溃,或者说因为网络缘由致使leader失去了与过半follower的联系,那么就会进入崩溃恢复模式

咱们回到上一节配置集群节点配置时,提到了在配置各节点时

server.id = host:port:port
        id:经过在各自的dataDir目录下建立一个myId的文件来为每台机器赋予一个服务器id,这个id咱们通常用基数数字表示
        两个port:第一个follower用来链接到leader,第二个用来选举leader
复制代码

此时第二个port,就是崩溃恢复模式要使用到的

1.ZAB协议规定若是一个事务proposal(提案)在一台机器上被处理成功,那么应该在全部的机器上都被处理成功,
哪怕这台机器已经崩溃或者故障
2.ZAB协议确保那些已经在leader服务器上提交的事务最终被全部服务器都提交
3.ZAB协议确保丢弃那些只在leader服务器上被提出的事务
复制代码

因此此时咱们ZAB协议的选举算法应该知足:确保提交已经被leader提交的事务proposal,同时丢弃已经被跳过的事务proposal

若是让leader选举算法可以保证新选举出来的leader拥有集群中全部机器的最高zxid的事务proposal,那么就能够保证这个新选举出来的leader必定具备全部已经提交的提案,同时若是让拥有最高编号的事务proposal的机器来成为leader,就能够省去leader检查事务proposal的提交和丢弃事务proposal的操做。

③ ZAB协议的数据同步

leader选举完成后,须要进行follower和leader的数据同步,当半数的follower完成同步,则能够开始提供服务。

数据同步过程

leader服务器会为每个follower服务器都准备一个队列,并将那些没有被各follower服务器同步的事务
以proposal的形式逐个发送给follower服务器,并在每个proposal消息后面接着发送一个commit消息,
表示该事务已经进行提交,直到follower服务器将全部还没有同步的事务proposal都从leader上同步
并成功提交到本地数据库中,leader就会将该follower加入到可用follower中
复制代码

④ ZAB协议中丢弃事务proposal

zxid=高32位+低32位=leader周期编号+事务proposal编号
复制代码

事务编号zxid是一个64位的数字,低32位是一个简单的单调递增的计数器,针对客户端的每个事务请求,leader产生新的事务proposal的时候都会对该计数器进行+1的操做,高32位表明了leader周期纪元的编号。

每当选举产生一个新的leader,都会从这个leader服务器上取出其本地日志中最大事务proposal的zxid,并从zxid解析出对应的纪元值,而后对其进行+1操做,以后以此编号做为新的纪元,并将低32位重置为0开始生产新的zxid。

基于此策略,当一个包含了上一个leader周期中还没有提交过的事务proposal的服务器启动加入到集群中,发现此时集群中已经存在leader,将自身以follower角色链接上leader服务器后,leader服务器会根据自身最后被提交的proposal和这个follower的proposal进行比对,发现这个follower中有上一个leader周期的事务proposal后,leader会要求follower进行一个回退操做,回到一个确实被集群过半机器提交的最新的事务proposal

⑤ zookeeper的可配置参数

能够从官网上了解zookeeper的可配置参数

zookeeper.apache.org/doc/current…

虽然是全英,可是当你们有须要使用到它们的时候,那英文就天然不成问题了是吧

内容二:zookeeper的典型应用场景

数据发布订阅
命名服务
master选举
集群管理
分布式队列
分布式锁
复制代码

1.分布式队列的应用场景

① 业务解耦

实现应用之间的解耦,这时全部的下游系统都订阅队列,从而得到一份实时完整的数据

解耦的应用很是普遍,好比咱们常见的发货系统和订单系统,之前业务串行的时候,发货系统必定要等订单系统生成完对应的订单才会进行发货。这样若是订单系统崩溃,那发货系统也没法正常运做,引入消息队列后,发货系统是正常处理掉发货的请求,再把已发货的消息存入消息队列,等待订单系统去更新并生成订单,可是此时,订单系统就算崩溃掉,咱们也不会一直不发货。

② 异步处理

能够看到在此场景中队列被用于实现服务的异步处理,这样作的好处在于咱们能够更快地返回结果和减小等待,实现步骤之间的并发,提高了系统的整体性能等

③ 流量削峰

2.zk的分布式队列

① 逻辑分析

顺序节点的应用,相似于咱们在用zookeeper实现分布式锁的时候如何去处理惊群效应的作法。 且根据队列的特色:FIFO(先进先出),入队时咱们建立顺序节点(ps:为何上面咱们是用了顺序节点而不是说是临时顺序节点,是由于咱们根本不考虑客户端挂掉的状况)并把元素传入队列,出队时咱们取出最小的节点。使用watch机制来监听队列的状态,在队列满时进行阻塞,在队列空时进行写入便可。

入队操做

如上图,咱们生产者须要对资源进行访问时,会申请获取一个分布式锁,若是未成功抢占锁,就会进行阻塞,抢到锁的生产者会尝试把任务提交到消息队列,此时又会进行判断,若是队列满了,就监听队列中的消费事件,当有消费队列存在空位时进行入队,未消费时阻塞。入队时它会进行释放锁的操做,唤醒以前抢占锁的请求,并让以后的生产者来获取。

出队操做

出队和入队的机制是十分类似的。

② JDK阻塞队列操做

阻塞队列:BlockingQueue---线程安全的阻塞队列

它以4种形式出现,对于不能当即知足可是在未来某一时刻可能知足的操做,4种形式的处理方式皆不一样

1.抛出一个异常
2.返回一个特殊值,true or false
3.在操做能够成功前,无限阻塞当前线程
4.放弃前只在给定的最大时间限制内阻塞
复制代码

咱们将会实现这个阻塞队列接口来实现咱们的分布式队列

内容三:分布式队列的代码实现

public class ZkDistributeQueue extends AbstractQueue<String> implements BlockingQueue<String> , java.io.Serializable
复制代码

继承了AbstractQueue,能够省略部分基础实现

① 基本的配置信息及使用到的参数

/**
 * zookeeper客户端操做实例
 */
private ZkClient zkClient;

/**
 *  定义在zk上的znode,做为分布式队列的根目录。
 */
private String queueRootNode;
private static final String default_queueRootNode = "/distributeQueue";

/**队列写锁节点*/
private String queueWriteLockNode;
/**队列读锁节点*/
private String queueReadLockNode;
/**
 * 子目录存放队列下的元素,用顺序节点做为子节点。
 */
private String queueElementNode;

/**
 * ZK服务的链接字符串,hostname:port形式的字符串
 */
private String zkConnUrl;

private static final String default_zkConnUrl = "localhost:2181";

/** 
 * 队列容量大小,默认Integer.MAX_VALUE,无界队列。
 * 注意Integer.MAX_VALUE其实也是有界的,存在默认最大值
 **/
private static final int default_capacity = Integer.MAX_VALUE;
private int capacity;

/**
 * 控制进程访问的分布式锁
 */
final Lock distributeWriteLock;
final Lock distributeReadLock;
复制代码

首先咱们须要一个zkClient的客户端,而后queueRootNode是分布式队列的存放元素的位置,指定了一个默认的根目录default_queueRootNode,把队列中的元素存放于/distributeQueue下,写锁节点表明往队列中存放元素,读锁节点表明从队列中去取元素,这个设计简单点来讲就是,queueRootNode做为最大的目录,其下有3个子目录,分别是queueWriteLockNode,queueReadLockNode和queueElementNode,其余的就是一些须要使用到的配置信息

② 构造器

提供两个构造方法,一个为使用默认参数实现,另一个是自定义实现

public ZkDistributeQueue() {
		this(default_zkConnUrl, default_queueRootNode, default_capacity);
	}
	
public ZkDistributeQueue(String zkServerUrl, String rootNodeName, int initCapacity) {
	if (zkServerUrl == null) throw new IllegalArgumentException("zkServerUrl");
	if (rootNodeName == null) throw new IllegalArgumentException("rootNodeName");
	if (initCapacity <= 0) throw new IllegalArgumentException("initCapacity");
	this.zkConnUrl = zkServerUrl;
	this.queueRootNode = rootNodeName;
	this.capacity = initCapacity;
	init();
	distributeWriteLock = new ZkDistributeImproveLock(queueWriteLockNode);
	distributeReadLock = new ZkDistributeImproveLock(queueReadLockNode);
}
复制代码

此时在咱们分布式锁的构造器中,createPersistent()的参数true是指若是我父目录queueRootNode并无事先建立完成,这个方法会自动建立出父目录,这样就不怕咱们在跑程序以前遗漏掉一些建立文件结构的工做

public ZkDistributeImproveLock(String lockPath) {
    if(lockPath == null || lockPath.trim().equals("")) {
        throw new IllegalArgumentException("patch不能为空字符串");
    }
    this.lockPath = lockPath;
    client = new ZkClient("localhost:2181");
    client.setZkSerializer(new MyZkSerializer());
    if (!this.client.exists(lockPath)) {
        try {
            this.client.createPersistent(lockPath, true);
        } catch (ZkNodeExistsException e) {

        }
    }
}
复制代码

③ 初始化队列信息的init()方法

从新定义好读锁写写锁和任务存放路径,而后把zkClient链接上,建立queueElementNode做为任务元素目录,参数true上文做用已经提到了

/**
 * 初始化队列信息
 */
private void init() {
	queueWriteLockNode = queueRootNode+"/writeLock";
	queueReadLockNode = queueRootNode+"/readLock";
	queueElementNode = queueRootNode+"/element";
	zkClient = new ZkClient(zkConnUrl);
	zkClient.setZkSerializer(new MyZkSerializer());
	if (!this.zkClient.exists(queueElementNode)) {
		try {
			this.zkClient.createPersistent(queueElementNode, true);
		} catch (ZkNodeExistsException e) {
			
		}
	}
}
复制代码

④ 使用put()方法进行队列元素入队操做

// 阻塞操做
@Override
public void put(String e) throws InterruptedException {
	checkElement(e);
	
	//尝试去获取分布式锁
	distributeWriteLock.lock();
	try {
		if(size() < capacity) {	// 容量足够
			enqueue(e);
			System.out.println(Thread.currentThread().getName() + "-----往队列放入了元素");
		}else { // 容量不够,阻塞,监听元素出队
			waitForRemove();
			put(e);
		}
	} finally {
	
        //释放锁
		distributeWriteLock.unlock();
	}
}
复制代码

checkElement()方法是一个简单的参数检查,咱们也能够定义有关于znode的命名规范的一些检查,不过通常状况下只要是String类型的参数都是没有问题的

private static void checkElement(String v) {
    if (v == null) throw new NullPointerException();
    if("".equals(v.trim())) {
    	throw new IllegalArgumentException("不能使用空格");
    }
    if(v.startsWith(" ") || v.endsWith(" ")) {
    	throw new IllegalArgumentException("先后不能包含空格");
    }
}
复制代码

size()方法也很简单,就是先取得父目录而后调用zkClient自带的countChildren()方法得出结果返回便可

public int size() {
	int size = zkClient.countChildren(queueElementNode);
	return size;
}
复制代码

从零开始的高并发(二)--- Zookeeper实现分布式锁中已经对等待移除的这个方法进行解释,主要就是经过subscribeChildChanges()监听子节点的数据变化,在size() < capacity条件成立时,就会唤醒等待队列,而当size() >= capacity,就会判断队列已经被填满,从而进行阻塞,

/**
 * 队列容量满了,不能再插入元素,阻塞等待队列移除元素。
 */
private void waitForRemove() {
	CountDownLatch cdl = new CountDownLatch(1);
	// 注册watcher
	IZkChildListener listener = new IZkChildListener() {
		@Override
		public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
			if(currentChilds.size() < capacity) {	// 有任务移除,激活等待的添加操做
				cdl.countDown();
				System.out.println(Thread.currentThread().getName() + "-----监听到队列有元素移除,唤醒阻塞生产者线程");
			}
		}
	};
	zkClient.subscribeChildChanges(queueElementNode, listener);
	
	try {
		// 确保队列是满的
		if(size() >= capacity) {
			System.out.println(Thread.currentThread().getName() + "-----队列已满,阻塞等待队列元素释放");
			cdl.await();	// 阻塞等待元素被移除
		}
	} catch (InterruptedException e) {
		e.printStackTrace();
	}
	zkClient.unsubscribeChildChanges(queueElementNode, listener);
}
复制代码

在waitForRemove()方法执行后,咱们的等待线程被唤醒,这时从新执行put(e),尝试从新入队

入队操做由enqueue(e)来完成,就是建立顺序节点的步骤

/**
 * 往zk中添加元素
 * @param e
 */
private void enqueue(String e) {
	zkClient.createPersistentSequential(queueElementNode+"/", e);
}
复制代码

⑤ 消费操做take

@Override
public String take() throws InterruptedException {

    //老套路,先获取锁
	distributeReadLock.lock();
	try {
		List<String> children = zkClient.getChildren(queueElementNode);
		if(children != null && !children.isEmpty()) {
		
		    //先对children进行一个排序,而后取出第一个,也就是最小编号的节点
			children = children.stream().sorted().collect(Collectors.toList());
			String takeChild = children.get(0);
			String childNode = queueElementNode+"/"+takeChild;
			String elementData = zkClient.readData(childNode);
			
			//进行出队操做
			dequeue(childNode);
			System.out.println(Thread.currentThread().getName() + "-----移除队列元素");
			return elementData;
		}else {
		
		    //若是children原本就是空的,那就是没有元素须要消费,那就继续等待
			waitForAdd();		// 阻塞等待队列有元素加入
			return take();
		} 
	} finally {
		distributeReadLock.unlock();
	}
}

//出队操做

private boolean dequeue(String e) {
	boolean result = zkClient.delete(e);
	return result;
}
复制代码

附:生产者和消费者的模拟

① 生产者

模拟了两台服务器,两个并发,每睡3秒钟就往消息队列put

public class DistributeQueueProducerTest {
	public static final String queueRootNode = "/distributeQueue";
	
	public static final String zkConnUrl = "localhost:2181";
	
	public static final int capacity = 20;
	
	public static void main(String[] args) {
		startProducer();
	}
	
	public static void startProducer() {
		// 服务集群数
		int service = 2;
		// 并发数
		int requestSize = 2;
		
		CyclicBarrier requestBarrier = new CyclicBarrier(requestSize * service);
		// 多线程模拟分布式环境下生产者
		for (int i = 0; i < service; i++) {
			new Thread(new Runnable() {
				public void run() {
					// 模拟分布式集群的场景
					BlockingQueue<String> queue = new ZkDistributeQueue(zkConnUrl, queueRootNode, capacity);
					
					System.out.println(Thread.currentThread().getName() + "---------生产者服务器,已准备好---------------");
					
					for(int i =0; i < requestSize; i++) {
						new Thread(new Runnable() {
							@Override
							public void run() {
								try {
									// 等待service台服务,requestSize个请求 一块儿出发
									requestBarrier.await();
								} catch (InterruptedException | BrokenBarrierException e) {
									e.printStackTrace();
								}
								while(true) {
									try {
										queue.put("123");
										System.out.println(Thread.currentThread().getName() + "-----进入睡眠状态");
										TimeUnit.SECONDS.sleep(3);
										System.out.println(Thread.currentThread().getName() + "-----睡眠状态,醒来");
									} catch (InterruptedException e) {
										e.printStackTrace();
									}
								}
							}
							
						}, Thread.currentThread().getName()+"-request-" + i).start();
					}
				}
			}, "producerServer-" + i).start();
			
		}
		
		try {
			Thread.currentThread().join();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}
复制代码

② 消费者

public class DistributeQueueConsumerTest {

	public static void main(String[] args) {
		satrtConsumer();
	}
	
	public static void satrtConsumer() {
		// 服务集群数
		int service = 2;
		// 并发数
		int requestSize = 2;
		
		CyclicBarrier requestBarrier = new CyclicBarrier(requestSize * service);
		
		// 多线程模拟分布式环境下消费者
		for (int i = 0; i < service; i++) {
			new Thread(new Runnable() {	// 进程模拟线程
				public void run() {
					// 模拟分布式集群的场景
					BlockingQueue<String> queue = new ZkDistributeQueue(zkConnUrl, queueRootNode, capacity);

					System.out.println(Thread.currentThread().getName() + "---------消费者服务器,已准备好---------------");
					
					for(int i =0; i < requestSize; i++) {	// 操做模拟线程
						new Thread(new Runnable() {
							@Override
							public void run() {
								try {
									// 等待service台服务,requestSize个请求 一块儿出发
									requestBarrier.await();
								} catch (InterruptedException | BrokenBarrierException e) {
									e.printStackTrace();
								}
								while(true) {
									try {
										queue.take();
										System.out.println(Thread.currentThread().getName() + "-----进入睡眠状态");
										TimeUnit.SECONDS.sleep(3);
										System.out.println(Thread.currentThread().getName() + "-----睡眠状态,醒来");
									} catch (InterruptedException e) {
										e.printStackTrace();
									}
								}
							}
							
						}, Thread.currentThread().getName()+"-request-" + i).start();
					}
				}
			}, "consummerServer-" + i).start();
		}
		
		try {
			Thread.currentThread().join();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}
复制代码

执行结果

① 先执行生产者

此时没有消费者去进行消费,因此队列没一会儿就满了,咱们须要注意,阻塞的不只仅是队列,分布式锁也被阻塞了。

② 启动消费者

基本上是生产者放入一个消费者就消费一个的状态。从而证实该分布式队列已经正常工做了。


finally

篇幅和上次同样比较长,主要是补充了上次没讲到的集群的ZAB协议和zookeeper的其中一个分布式队列的应用场景,其实在平常开发中使用zookeeper来实现队列是基本不会发生的,比较常见的都是activeMQ,rabbitMQ,kafka等等。不过仍然有必要去了解队列的基本工做思路。咱们也至关于本身手写了一个拥有基础功能的MQ

你必定听过dubbo+zookeeper的万金油组合配置中心究竟是什么?zookeeper又是如何依靠自身机制来实现配置中心的?

下一篇:从零开始的高并发(五)--- Zookeeper的经典应用场景2

相关文章
相关标签/搜索