在Hadoop生态系统中,许多项目的Logo都采用了动物,好比 Hadoop 和 Hive 采用了大象的形象,HBase 采用了海豚的形象,而从字面上来看 ZooKeeper 表示动物园管理员,因此你们能够理解为 ZooKeeper就是对这些动物(项目组件)进行一些管理工做的。java
对于单机环境多线程的竞态资源协调方法,咱们通常经过线程锁来协调对共享数据的访问以保证状态的一致性。 可是分布式环境如何进行协调呢?因而,Google创造了Chubby,而ZooKeeper则是对于Chubby的一个开源实现。
ZooKeeper是一种为分布式应用所设计的高可用、高性能且一致的开源协调服务,它提供了一项基本服务:分布式锁服务。因为ZooKeeper的开源特性,后来咱们的开发者在分布式锁的基础上,摸索了出了其余的使用方法:配置维护、组服务、分布式消息队列、分布式通知/协调等。它被设计为易于编程,使用文件系统目录树做为数据模型。node
Zookeeper服务自身组成一个集群(2n+1个服务容许n>=1个失效)。Zookeeper集群是一个基于主从复制的高可用集群,每一个服务器承担以下三种角色中的一种python
顺序一致性:按照客户端发送请求的顺序更新数据。mysql
原子性:更新要么成功,要么失败,不会出现部分更新。sql
单一性 :不管客户端链接哪一个server,都会看到同一个视图。apache
可靠性:一旦数据更新成功,将一直保持,直到新的更新。编程
及时性:客户端会在一个肯定的时间内获得最新的数据。安全
Zookeeper表现为一个分层的文件系统目录树结构(不一样于文件系统的是,节点能够有本身的数据,而文件系统中的目录节点只有子节点)。服务器
为了保证写操做的一致性与可用性,Zookeeper专门设计了一种名为原子广播(ZAB)的支持崩溃恢复的一致性协议。基于该协议,Zookeeper实现了一种主从模式的系统架构来保持集群中各个副本之间的数据一致性。根据ZAB协议,全部的写操做都必须经过Leader完成,leader收到写请求,会将请求转为proposal并广播给全部其它节点,其余节点根据协议进行批准或经过。broadcast阶段事实上就是一个两阶段提交的简化版。其全部过程都跟两阶段提交一致,惟一不一致的是不能作事务的回滚。若是实现完整的两阶段提交,那就解决了一致性问题,不必发明新协议了,因此zab实际上抛弃了两阶段提交的事务回滚,因而一台follower只能回复ACK或者干脆就不回复了,leader只要收到过半的机器回复即经过proposal。网络
一旦Leader节点没法工做,ZAB协议可以自动从Follower节点中从新选出一个合适的替代者,即新的Leader,该过程即为领导选举。该领导选举过程,是ZAB协议中最为重要和复杂的过程。
def test_create(): nodename = '/zk_test/service' index = 1 while index < 10: zk.create(nodename, b'192.168.187.215_{index}'. format(index=index), ephemeral=True, sequence=True) time.sleep(3) index += 1 @zk.ChildrenWatch('/zk_test') def my_func(children): print children children = zk.get_children('/zk_test',watch=my_func) while True: time.sleep(3)
my_id = uuid.uuid4() lock = zk.Lock("/lockpath", str(my_id)) print "I am {}".format(str(my_id)) def work(): time.sleep(3) print "{} is working! ".format(str(my_id)) while True: with lock: work() zk.stop()
@zk.DataWatch('/zk_test/service') def my_func(data, stat): print("Data is %s, Version is %s" %(data, stat.version)) while True: time.sleep(2) print '------'
my_id = uuid.uuid4() def leader_func(): print "I am the leader {}".format(str(my_id)) while True: print "{} is working! ".format(str(my_id)) time.sleep(3) election = zk.Election("/electionpath") # blocks until the election is won, then calls # leader_func() election.run(leader_func)
#coding=utf-8 from kazoo.client import KazooClient import time import logging logging.basicConfig() zk = KazooClient(hosts='bjdhj-187-215.58os.org:2181') zk.start() # Determine if a node exists while True: if zk.exists("/zk_test/service0000000054"): print "the worker is alive!" else: print "the worker is dead!" break time.sleep(3) zk.stop()
建立锁节点
在ZooKeeper上会有一个/yarn-leader-election/appcluster-yarn
的锁节点,全部的ResourceManager在启动的时候,都会去竞争写一个Lock子节点:/yarn-leader-election/appcluster-yarn/ActiveBreadCrumb
,该节点是临时节点。ZooKeepr可以为咱们保证最终只有一个ResourceManager可以建立成功。建立成功的那个ResourceManager就切换为Active状态,没有成功的那些ResourceManager则切换为Standby状态。
全部Standby状态的ResourceManager都会向/yarn-leader-election/appcluster-yarn/ActiveBreadCrumb
节点注册一个节点变动的Watcher监听,利用临时节点的特性,可以快速感知到Active状态的ResourceManager的运行状况。
当Active状态的ResourceManager出现诸如宕机或重启的异常状况时,其在ZooKeeper上链接的客户端会话就会失效,所以/yarn-leader-election/appcluster-yarn/ActiveBreadCrumb
节点就会被删除。此时其他各个Standby状态的ResourceManager就都会接收到来自ZooKeeper服务端的Watcher事件通知,而后会重复进行步骤1的操做。
以上就是利用ZooKeeper来实现ResourceManager的主备切换的过程,实现了ResourceManager的HA。
HDFS中NameNode的HA的实现原理跟YARN中ResourceManager的HA的实现原理相同。其锁节点为/hadoop-ha/mycluster/ActiveBreadCrumb
。
在 ResourceManager 中,RMStateStore 可以存储一些 RM 的内部状态信息,包括 Application 以及它们的 Attempts 信息、Delegation Token 及 Version Information 等。须要注意的是,RMStateStore 中的绝大多数状态信息都是不须要持久化存储的,由于很容易从上下文信息中将其重构出来,如资源的使用状况。在存储的设计方案中,提供了三种可能的实现,分别以下。
因为这些状态信息的数据量都不是很大,所以Hadoop官方建议基于ZooKeeper来实现状态信息的存储。在ZooKeepr上,ResourceManager 的状态信息都被存储在/rmstore
这个根节点下面。
RMAppRoot 节点下存储的是与各个 Application 相关的信息,RMDTSecretManagerRoot 存储的是与安全相关的 Token 等信息。每一个 Active 状态的 ResourceManager 在初始化阶段都会从 ZooKeeper 上读取到这些状态信息,并根据这些状态信息继续进行相应的处理。
小结:
ZooKeepr在Hadoop中的应用主要有:
HBase主要用ZooKeeper来实现HMaster选举与主备切换、系统容错、RootRegion管理、Region状态管理和分布式SplitWAL任务管理等。
HMaster选举与主备切换的原理和HDFS中NameNode及YARN中ResourceManager的HA原理相同。
当HBase启动时,每一个RegionServer都会到ZooKeeper的/hbase/rs
节点下建立一个信息节点(下文中,咱们称该节点为”rs状态节点”),例如/hbase/rs/[Hostname]
,同时,HMaster会对这个节点注册监听。当某个 RegionServer 挂掉的时候,ZooKeeper会由于在一段时间内没法接受其心跳(即 Session 失效),而删除掉该 RegionServer 服务器对应的 rs 状态节点。与此同时,HMaster 则会接收到 ZooKeeper 的 NodeDelete 通知,从而感知到某个节点断开,并当即开始容错工做。
HBase为何不直接让HMaster来负责RegionServer的监控呢?若是HMaster直接经过心跳机制等来管理RegionServer的状态,随着集群愈来愈大,HMaster的管理负担会愈来愈重,另外它自身也有挂掉的可能,所以数据还须要持久化。在这种状况下,ZooKeeper就成了理想的选择。
对应HBase集群来讲,数据存储的位置信息是记录在元数据region,也就是RootRegion上的。每次客户端发起新的请求,须要知道数据的位置,就会去查询RootRegion,而RootRegion自身位置则是记录在ZooKeeper上的(默认状况下,是记录在ZooKeeper的/hbase/meta-region-server
节点中)。当RootRegion发生变化,好比Region的手工移动、从新负载均衡或RootRegion所在服务器发生了故障等是,就可以经过ZooKeeper来感知到这一变化并作出一系列相应的容灾措施,从而保证客户端老是可以拿到正确的RootRegion信息。
HBase里的Region会常常发生变动,这些变动的缘由来自于系统故障、负载均衡、配置修改、Region分裂与合并等。一旦Region发生移动,它就会经历下线(offline)和从新上线(online)的过程。
在下线期间数据是不能被访问的,而且Region的这个状态变化必须让全局知晓,不然可能会出现事务性的异常。对于大的HBase集群来讲,Region的数量可能会多达十万级别,甚至更多,这样规模的Region状态管理交给ZooKeeper来作也是一个很好的选择。
当某台RegionServer服务器挂掉时,因为总有一部分新写入的数据尚未持久化到HFile中,所以在迁移该RegionServer的服务时,一个重要的工做就是从WAL中恢复这部分还在内存中的数据,而这部分工做最关键的一步就是SplitWAL,即HMaster须要遍历该RegionServer服务器的WAL,并按Region切分红小块移动到新的地址下,并进行日志的回放(replay)。
因为单个RegionServer的日志量相对庞大(可能有上千个Region,上GB的日志),而用户又每每但愿系统可以快速完成日志的恢复工做。所以一个可行的方案是将这个处理WAL的任务分给多台RegionServer服务器来共同处理,而这就又须要一个持久化组件来辅助HMaster完成任务的分配。当前的作法是,HMaster会在ZooKeeper上建立一个SplitWAL节点(默认状况下,是/hbase/SplitWAL
节点),将“哪一个RegionServer处理哪一个Region”这样的信息以列表的形式存放到该节点上,而后由各个RegionServer服务器自行到该节点上去领取任务并在任务执行成功或失败后再更新该节点的信息,以通知HMaster继续进行后面的步骤。ZooKeeper在这里担负起了分布式集群中相互通知和信息持久化的角色。
小结:
以上就是一些HBase中依赖ZooKeeper完成分布式协调功能的典型场景。但事实上,HBase对ZooKeepr的依赖还不止这些,好比HMaster还依赖ZooKeeper来完成Table的enable/disable状态记录,以及HBase中几乎全部的元数据存储都是放在ZooKeeper上的。
因为ZooKeeper出色的分布式协调能力及良好的通知机制,HBase在各版本的演进过程当中愈来愈多地增长了ZooKeeper的应用场景,从趋势上来看二者的交集愈来愈多。HBase中全部对ZooKeeper的操做都封装在了org.apache.hadoop.hbase.zookeeper这个包中,感兴趣的同窗能够自行研究。
CAS
AtomicInteger atomicInteger = new AtomicInteger(); for(int b = 0; b < numThreads; b++) { new Thread(() -> { for(int a = 0; a < iteration; a++) { atomicInteger.incrementAndGet(); } }).start(); } public final int getAndIncrement() { for( ; ; ) { int current = get(); int next = current + 1; if ( compareAndSet(current, next) ) return current; } }
private final Map<String, Long> urlCounter = new ConcurrentHashMap<>(); //接口调用次数+1 public long increase(String url) { Long oldValue = urlCounter.get(url); Long newValue = (oldValue == null) ? 1L : oldValue + 1; urlCounter.put(url, newValue); return newValue; } //获取调用次数 public Long getCount(String url){ return urlCounter.get(url); } //模拟并发状况下的接口调用统计 for(int i=0;i<callTime;i++){ executor.execute(new Runnable() { @Override public void run() { counterDemo.increase(url); countDownLatch.countDown(); } }); } // CAS: while (true) { private final Map<String, Long> urlCounter = new ConcurrentHashMap<>(); oldValue = urlCounter.get(url); if (oldValue == null) { newValue = 1l; //初始化成功,退出循环 if (urlCounter.putIfAbsent(url, 1l) == null) break; //若是初始化失败,说明其余线程已经初始化过了 } else { newValue = oldValue + 1; //+1成功,退出循环,原子操做 if (urlCounter.replace(url, oldValue, newValue)) break; //若是+1失败,说明其余线程已经修改过了旧值 } } return newValue;
[1] ZooKeeper 学习笔记之扫盲
https://my.oschina.net/leejun2005/blog/67250
[2] ZooKeeper 原理及其在 Hadoop 和 HBase 中的应用
http://blog.jobbole.com/110388/
[3] Java 并发实践 — ConcurrentHashMap 与 CAS